refactor(handler): streamline file streaming and improve concurrency handling

This commit is contained in:
wood chen 2024-10-23 02:05:47 +08:00
parent 85e0534523
commit 0a36dd5284
2 changed files with 85 additions and 100 deletions

View File

@ -3,7 +3,6 @@ package handler
import (
"fmt"
"io"
"mime"
"net/http"
"os"
"path"
@ -126,37 +125,3 @@ func pingURL(url string) (string, int64, time.Time) {
return etag, size, lastModified
}
func streamFile(c *fiber.Ctx, filePath string) error {
file, err := os.Open(filePath)
if err != nil {
log.Errorf("无法打开文件: %s, 错误: %v", filePath, err)
return c.Status(fiber.StatusInternalServerError).SendString("无法打开文件")
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
log.Errorf("无法获取文件信息: %s, 错误: %v", filePath, err)
return c.Status(fiber.StatusInternalServerError).SendString("无法获取文件信息")
}
contentType := mime.TypeByExtension(path.Ext(filePath))
if contentType == "" {
contentType = "application/octet-stream"
}
c.Set(fiber.HeaderContentType, contentType)
c.Set(fiber.HeaderContentLength, strconv.FormatInt(stat.Size(), 10))
log.Infof("开始流式传输文件: %s, 大小: %d bytes", filePath, stat.Size())
err = c.SendStream(file)
if err != nil {
log.Errorf("文件流式传输失败: %s, 错误: %v", filePath, err)
return err
}
log.Infof("文件流式传输完成: %s", filePath)
return nil
}

View File

@ -7,6 +7,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"webp_server_go/config"
"webp_server_go/encoder"
"webp_server_go/helper"
@ -62,7 +63,7 @@ func Convert(c *fiber.Ctx) error {
}
// 构建 EXHAUST_PATH 中的文件路径
exhaustFilename := path.Join(config.Config.ExhaustPath, reqURI)
exhaustFilename := path.Join(config.Config.ExhaustPath, strings.TrimPrefix(reqURI, matchedPrefix))
if extraParams.Width > 0 || extraParams.Height > 0 || extraParams.MaxWidth > 0 || extraParams.MaxHeight > 0 {
ext := path.Ext(exhaustFilename)
extraParamsStr := fmt.Sprintf("_w%d_h%d_mw%d_mh%d", extraParams.Width, extraParams.Height, extraParams.MaxWidth, extraParams.MaxHeight)
@ -72,81 +73,100 @@ func Convert(c *fiber.Ctx) error {
// 检查文件是否已经在 EXHAUST_PATH 中
if helper.FileExists(exhaustFilename) {
log.Infof("文件已存在于 EXHAUST_PATH直接提供服务: %s", exhaustFilename)
return streamFile(c, exhaustFilename)
return c.SendFile(exhaustFilename)
}
// 文件不在 EXHAUST_PATH 中,需要处理
isLocalPath := strings.HasPrefix(matchedTarget, "./") || strings.HasPrefix(matchedTarget, "/")
var rawImageAbs string
var isNewDownload bool
// 使用 sync.Once 确保并发安全
var once sync.Once
var processErr error
processImage := func() {
once.Do(func() {
// 文件不在 EXHAUST_PATH 中,需要处理
isLocalPath := strings.HasPrefix(matchedTarget, "./") || strings.HasPrefix(matchedTarget, "/")
var rawImageAbs string
var isNewDownload bool
if isLocalPath {
// 处理本地路径
localPath := strings.TrimPrefix(reqURI, matchedPrefix)
rawImageAbs = path.Join(matchedTarget, localPath)
if isLocalPath {
// 处理本地路径
localPath := strings.TrimPrefix(reqURI, matchedPrefix)
rawImageAbs = path.Join(matchedTarget, localPath)
// 检查本地文件是否存在
if !helper.FileExists(rawImageAbs) {
log.Errorf("本地文件不存在: %s", rawImageAbs)
return c.SendStatus(fiber.StatusNotFound)
}
isNewDownload = false // 本地文件不需要清理
} else {
// 处理远程URL
targetUrl, err := url.Parse(matchedTarget)
if err != nil {
log.Errorf("解析目标 URL 失败: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
remoteAddr := targetUrl.Scheme + "://" + targetUrl.Host + strings.Replace(reqURI, matchedPrefix, targetUrl.Path, 1)
// 检查本地文件是否存在
if !helper.FileExists(rawImageAbs) {
processErr = fmt.Errorf("本地文件不存在: %s", rawImageAbs)
return
}
isNewDownload = false // 本地文件不需要清理
} else {
// 处理远程URL
targetUrl, err := url.Parse(matchedTarget)
if err != nil {
processErr = fmt.Errorf("解析目标 URL 失败: %v", err)
return
}
remoteAddr := targetUrl.Scheme + "://" + targetUrl.Host + strings.TrimPrefix(reqURI, matchedPrefix)
rawImageAbs, isNewDownload, err = fetchRemoteImg(remoteAddr, targetUrl.Host)
if err != nil {
log.Errorf("获取远程图像失败: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
}
rawImageAbs, isNewDownload, err = fetchRemoteImg(remoteAddr, targetUrl.Host)
if err != nil {
processErr = fmt.Errorf("获取远程图像失败: %v", err)
return
}
}
// 检查是否为允许的图片文件
if !helper.IsAllowedImageFile(filename) {
log.Infof("不允许的文件类型或非图片文件: %s", reqURI)
return streamFile(c, rawImageAbs)
// 检查是否为允许的图片文件
if !helper.IsAllowedImageFile(filename) {
log.Infof("不允许的文件类型或非图片文件: %s", reqURI)
// 直接复制文件到 EXHAUST_PATH
if err := helper.CopyFile(rawImageAbs, exhaustFilename); err != nil {
processErr = fmt.Errorf("复制不允许处理的文件失败: %v", err)
}
return
}
// 处理图片
isSmall, err := helper.IsFileSizeSmall(rawImageAbs, 100*1024) // 100KB
if err != nil {
processErr = fmt.Errorf("检查文件大小时出错: %v", err)
return
}
// 确保目标目录存在
if err := os.MkdirAll(path.Dir(exhaustFilename), 0755); err != nil {
processErr = fmt.Errorf("创建目标目录失败: %v", err)
return
}
if isSmall {
if err := helper.CopyFile(rawImageAbs, exhaustFilename); err != nil {
processErr = fmt.Errorf("复制小文件到 EXHAUST_PATH 失败: %v", err)
return
}
} else {
if err := encoder.ProcessAndSaveImage(rawImageAbs, exhaustFilename, extraParams); err != nil {
processErr = fmt.Errorf("处理图片失败: %v", err)
return
}
}
// 如果是新下载的远程文件,安排清理任务
if !isLocalPath && isNewDownload {
go schedule.ScheduleCleanup(rawImageAbs)
}
})
}
// 处理图片
// 检查文件大小
isSmall, err := helper.IsFileSizeSmall(rawImageAbs, 100*1024) // 100KB
if err != nil {
log.Errorf("检查文件大小时出错: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
processImage()
if processErr != nil {
log.Error(processErr)
return c.Status(fiber.StatusInternalServerError).SendString(processErr.Error())
}
if isSmall {
log.Infof("文件 %s 小于100KB直接复制到 EXHAUST_PATH", rawImageAbs)
// 确保目标目录存在
if err := os.MkdirAll(path.Dir(exhaustFilename), 0755); err != nil {
log.Errorf("创建目标目录失败: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
if err := helper.CopyFile(rawImageAbs, exhaustFilename); err != nil {
log.Errorf("复制小文件到 EXHAUST_PATH 失败: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
} else {
// 处理图片
err := encoder.ProcessAndSaveImage(rawImageAbs, exhaustFilename, extraParams)
if err != nil {
log.Errorf("处理图片失败: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
}
// 如果是新下载的远程文件,安排清理任务
if !isLocalPath && isNewDownload {
go schedule.ScheduleCleanup(rawImageAbs)
// 再次检查文件是否存在(以防并发情况下的竞态条件)
if !helper.FileExists(exhaustFilename) {
return c.Status(fiber.StatusInternalServerError).SendString("处理后的文件未找到")
}
return streamFile(c, exhaustFilename)
// 发送文件
return c.SendFile(exhaustFilename)
}