From c9cb32b0da6e89dc65184d427249e0ed6f42c4de Mon Sep 17 00:00:00 2001 From: wood chen Date: Tue, 22 Oct 2024 17:26:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BC=98=E5=8C=96=E5=86=85?= =?UTF-8?q?=E5=AD=98=E5=8D=A0=E7=94=A8=20feat(encoder):=20optimize=20prefe?= =?UTF-8?q?tch=20images=20with=20worker=20pool=20and=20progress=20bar=20re?= =?UTF-8?q?factor(handler):=20improve=20downloadFile=20function=20with=20e?= =?UTF-8?q?rror=20handling=20refactor(helper):=20use=20streaming=20JSON=20?= =?UTF-8?q?encoder=20for=20metadata=20chore(webp-server):=20update=20serve?= =?UTF-8?q?r=20config=20with=20write=20buffer=20size?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- encoder/prefetch.go | 60 ++++++++++++++++++++++----------------------- handler/remote.go | 49 +++++++++++++----------------------- handler/router.go | 32 ++++++++++++------------ helper/metadata.go | 19 +++++++++++--- webp-server.go | 7 +++--- 5 files changed, 81 insertions(+), 86 deletions(-) diff --git a/encoder/prefetch.go b/encoder/prefetch.go index 91e22d1..d739245 100644 --- a/encoder/prefetch.go +++ b/encoder/prefetch.go @@ -5,6 +5,7 @@ import ( "os" "path" "path/filepath" + "sync" "time" "webp_server_go/config" "webp_server_go/helper" @@ -14,54 +15,51 @@ import ( ) func PrefetchImages() { - // maximum ongoing prefetch is depending on your core of CPU - var sTime = time.Now() + sTime := time.Now() log.Infof("Prefetching using %d cores", config.Jobs) - var finishChan = make(chan int, config.Jobs) - for range config.Jobs { - finishChan <- 1 - } - //prefetch, recursive through the dir + // 使用固定大小的工作池来限制并发 + workerPool := make(chan struct{}, config.Jobs) + var wg sync.WaitGroup + all := helper.FileCount(config.Config.ImgPath) - var bar = progressbar.Default(all, "Prefetching...") + bar := progressbar.Default(all, "Prefetching...") + err := filepath.Walk(config.Config.ImgPath, func(picAbsPath string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { + if err != nil || info.IsDir() || !helper.CheckAllowedType(picAbsPath) { return nil } - if !helper.CheckAllowedType(picAbsPath) { - return nil - } - // RawImagePath string, ImgFilename string, reqURI string - metadata := helper.ReadMetadata(picAbsPath, "", config.LocalHostAlias) - avifAbsPath, webpAbsPath, jxlAbsPath := helper.GenOptimizedAbsPath(metadata, config.LocalHostAlias) - // Using avifAbsPath here is the same as using webpAbsPath/jxlAbsPath - _ = os.MkdirAll(path.Dir(avifAbsPath), 0755) + wg.Add(1) + go func() { + defer wg.Done() + workerPool <- struct{}{} // 获取工作槽 + defer func() { <-workerPool }() // 释放工作槽 - log.Infof("Prefetching %s", picAbsPath) + metadata := helper.ReadMetadata(picAbsPath, "", config.LocalHostAlias) + avifAbsPath, webpAbsPath, jxlAbsPath := helper.GenOptimizedAbsPath(metadata, config.LocalHostAlias) - // Allow all supported formats - supported := map[string]bool{ - "raw": true, - "webp": true, - "avif": true, - "jxl": true, - } + _ = os.MkdirAll(path.Dir(avifAbsPath), 0755) + + log.Infof("Prefetching %s", picAbsPath) + + supported := map[string]bool{ + "raw": true, "webp": true, "avif": true, "jxl": true, + } + + ConvertFilter(picAbsPath, jxlAbsPath, avifAbsPath, webpAbsPath, config.ExtraParams{Width: 0, Height: 0}, supported, nil) + _ = bar.Add(1) + }() - go ConvertFilter(picAbsPath, jxlAbsPath, avifAbsPath, webpAbsPath, config.ExtraParams{Width: 0, Height: 0}, supported, finishChan) - _ = bar.Add(<-finishChan) return nil }) + wg.Wait() // 等待所有工作完成 + if err != nil { log.Errorln(err) } elapsed := time.Since(sTime) _, _ = fmt.Fprintf(os.Stdout, "Prefetch complete in %s\n\n", elapsed) - } diff --git a/handler/remote.go b/handler/remote.go index 2c77225..b160267 100644 --- a/handler/remote.go +++ b/handler/remote.go @@ -1,19 +1,18 @@ package handler import ( - "bytes" + "fmt" + "io" "net/http" "os" "path" "path/filepath" "strconv" - "strings" "time" "webp_server_go/config" "webp_server_go/helper" "github.com/gofiber/fiber/v2" - "github.com/h2non/filetype" "github.com/patrickmn/go-cache" log "github.com/sirupsen/logrus" ) @@ -33,49 +32,35 @@ func cleanProxyCache(cacheImagePath string) { } } -func downloadFile(filepath string, url string) { +func downloadFile(filepath string, url string) error { resp, err := http.Get(url) if err != nil { log.Errorln("下载文件时连接到远程错误!") - return + return err } defer resp.Body.Close() if resp.StatusCode != fiber.StatusOK { log.Errorf("获取远程图像时远程返回 %s", resp.Status) - return - } - - // Copy bytes here - bodyBytes := new(bytes.Buffer) - _, err = bodyBytes.ReadFrom(resp.Body) - if err != nil { - return - } - - // Check if remote content-type is image using check by filetype instead of content-type returned by origin - kind, _ := filetype.Match(bodyBytes.Bytes()) - mime := kind.MIME.Value - if !strings.Contains(mime, "image") { - log.Errorf("远程文件 %s 不是图像,远程内容的 MIME 类型为 %s", url, mime) - return + return fmt.Errorf("unexpected status: %s", resp.Status) } + // 创建目标文件 _ = os.MkdirAll(path.Dir(filepath), 0755) - - // Create Cache here as a lock, so we can prevent incomplete file from being read - // Key: filepath, Value: true - config.WriteLock.Set(filepath, true, -1) - - err = os.WriteFile(filepath, bodyBytes.Bytes(), 0600) + out, err := os.Create(filepath) if err != nil { - // not likely to happen - return + return err + } + defer out.Close() + + // 使用小缓冲区流式写入文件 + buf := make([]byte, 32*1024) + _, err = io.CopyBuffer(out, resp.Body, buf) + if err != nil { + return err } - // Delete lock here - config.WriteLock.Delete(filepath) - + return nil } func fetchRemoteImg(url string, subdir string) config.MetaFile { diff --git a/handler/router.go b/handler/router.go index d546748..4a3a795 100644 --- a/handler/router.go +++ b/handler/router.go @@ -110,23 +110,23 @@ func Convert(c *fiber.Ctx) error { } // 新增:检查是否为WebP格式 - if strings.ToLower(path.Ext(filename)) == ".webp" { - log.Infof("原始图像已经是WebP格式: %s", reqURI) - var webpImagePath string - if proxyMode { - // 对于代理模式,确保文件已经被下载 - metadata = fetchRemoteImg(realRemoteAddr, targetHostName) - webpImagePath = path.Join(config.Config.RemoteRawPath, targetHostName, metadata.Id) - } else { - webpImagePath = path.Join(config.Config.ImgPath, reqURI) - } + // if strings.ToLower(path.Ext(filename)) == ".webp" { + // log.Infof("原始图像已经是WebP格式: %s", reqURI) + // var webpImagePath string + // if proxyMode { + // // 对于代理模式,确保文件已经被下载 + // metadata = fetchRemoteImg(realRemoteAddr, targetHostName) + // webpImagePath = path.Join(config.Config.RemoteRawPath, targetHostName, metadata.Id) + // } else { + // webpImagePath = path.Join(config.Config.ImgPath, reqURI) + // } - // 检查文件是否存在 - if helper.FileExists(webpImagePath) { - // 直接返回原WebP图片 - return c.SendFile(webpImagePath) - } - } + // // 检查文件是否存在 + // if helper.FileExists(webpImagePath) { + // // 直接返回原WebP图片 + // return c.SendFile(webpImagePath) + // } + // } if !helper.CheckAllowedType(filename) { msg := "不允许的文件扩展名 " + filename diff --git a/helper/metadata.go b/helper/metadata.go index 5608ef1..8267e37 100644 --- a/helper/metadata.go +++ b/helper/metadata.go @@ -51,9 +51,9 @@ func ReadMetadata(p, etag string, subdir string) config.MetaFile { func WriteMetadata(p, etag string, subdir string) config.MetaFile { _ = os.MkdirAll(path.Join(config.Config.MetadataPath, subdir), 0755) - var id, filepath, sant = getId(p) + id, filepath, sant := getId(p) - var data = config.MetaFile{ + data := config.MetaFile{ Id: id, } @@ -65,8 +65,19 @@ func WriteMetadata(p, etag string, subdir string) config.MetaFile { data.Checksum = HashFile(filepath) } - buf, _ := json.Marshal(data) - _ = os.WriteFile(path.Join(config.Config.MetadataPath, subdir, data.Id+".json"), buf, 0644) + // 使用流式 JSON 编码器 + file, err := os.Create(path.Join(config.Config.MetadataPath, subdir, data.Id+".json")) + if err != nil { + log.Errorf("无法创建元数据文件: %v", err) + return data + } + defer file.Close() + + encoder := json.NewEncoder(file) + if err := encoder.Encode(data); err != nil { + log.Errorf("无法编码元数据: %v", err) + } + return data } diff --git a/webp-server.go b/webp-server.go index 392f475..9d4b7c7 100644 --- a/webp-server.go +++ b/webp-server.go @@ -23,9 +23,10 @@ var app = fiber.New(fiber.Config{ AppName: "WebP Server Go", DisableStartupMessage: true, ProxyHeader: "X-Real-IP", - ReadBufferSize: config.Config.ReadBufferSize, // per-connection buffer size for requests' reading. This also limits the maximum header size. Increase this buffer if your clients send multi-KB RequestURIs and/or multi-KB headers (for example, BIG cookies). - Concurrency: config.Config.Concurrency, // Maximum number of concurrent connections. - DisableKeepalive: config.Config.DisableKeepalive, // Disable keep-alive connections, the server will close incoming connections after sending the first response to the client + ReadBufferSize: config.Config.ReadBufferSize, // 用于请求读取的每个连接缓冲区大小。这也限制了最大标头大小。如果您的客户端发送多 KB RequestURI 和/或多 KB 标头(例如,BIG cookies),请增加此缓冲区。 + WriteBufferSize: 1024 * 4, + Concurrency: config.Config.Concurrency, // 最大并发连接数。 + DisableKeepalive: config.Config.DisableKeepalive, // 禁用保持活动连接,服务器将在向客户端发送第一个响应后关闭传入连接 }) func setupLogger() {