wood chen 8770d79bb8 feat(docker-compose, config, handler): enhance deployment configuration, add metrics support, and improve proxy handling
- Updated docker-compose.yml to include resource limits and health checks for the service.
- Modified go.mod and go.sum to include the new dependency on golang.org/x/time.
- Enhanced main.go to add new metrics routes for monitoring.
- Updated config.json to include metrics configuration with password and token expiry.
- Refactored internal/config to implement a ConfigManager for dynamic configuration loading.
- Improved internal/handler to utilize a shared HTTP client and added metrics tracking for requests.
2024-11-30 21:11:05 +08:00

370 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"fmt"
"io"
"log"
"net/http"
"net/url"
"path"
"proxy-go/internal/config"
"proxy-go/internal/utils"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
const (
defaultBufferSize = 32 * 1024 // 32KB
)
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, defaultBufferSize)
},
}
type ProxyHandler struct {
pathMap map[string]config.PathConfig
client *http.Client
limiter *rate.Limiter
startTime time.Time
config *config.Config
auth *authManager
metrics struct {
activeRequests int64
totalRequests int64
totalErrors int64
totalBytes atomic.Int64 // 总传输字节数
pathStats sync.Map // 路径统计 map[string]*PathStats
statusStats [6]atomic.Int64 // HTTP状态码统计(1xx-5xx)
latencyBuckets [10]atomic.Int64 // 延迟分布(0-100ms, 100-200ms...)
}
recentRequests struct {
sync.RWMutex
items [1000]*RequestLog // 固定大小的环形缓冲区
cursor atomic.Int64 // 当前位置
}
}
// 单个请求的统计信息
type RequestLog struct {
Time time.Time
Path string
Status int
Latency time.Duration
BytesSent int64
ClientIP string
}
// 路径统计
type PathStats struct {
requests atomic.Int64
errors atomic.Int64
bytes atomic.Int64
latencySum atomic.Int64
}
// 修改参数类型
func NewProxyHandler(cfg *config.Config) *ProxyHandler {
transport := &http.Transport{
MaxIdleConns: 100, // 最大空闲连接数
MaxIdleConnsPerHost: 10, // 每个 host 的最大空闲连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时时间
}
return &ProxyHandler{
pathMap: cfg.MAP,
client: &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
},
limiter: rate.NewLimiter(rate.Limit(5000), 10000),
startTime: time.Now(),
config: cfg,
auth: newAuthManager(),
}
}
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&h.metrics.activeRequests, 1)
atomic.AddInt64(&h.metrics.totalRequests, 1)
defer atomic.AddInt64(&h.metrics.activeRequests, -1)
if !h.limiter.Allow() {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
start := time.Now()
// 处理根路径请求
if r.URL.Path == "/" {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "Welcome to CZL proxy.")
log.Printf("[%s] %s %s -> %d (root path) [%v]",
utils.GetClientIP(r), r.Method, r.URL.Path, http.StatusOK, time.Since(start))
return
}
// 查找匹配的代理路径
var matchedPrefix string
var pathConfig config.PathConfig
for prefix, cfg := range h.pathMap {
if strings.HasPrefix(r.URL.Path, prefix) {
matchedPrefix = prefix
pathConfig = cfg
break
}
}
// 如果没有匹配的路径,返回 404
if matchedPrefix == "" {
http.NotFound(w, r)
log.Printf("[%s] %s %s -> 404 (not found) [%v]",
utils.GetClientIP(r), r.Method, r.URL.Path, time.Since(start))
return
}
// 构建目标 URL
targetPath := strings.TrimPrefix(r.URL.Path, matchedPrefix)
// URL 解码,然后重新编码,确保特殊字符被正确处理
decodedPath, err := url.QueryUnescape(targetPath)
if err != nil {
http.Error(w, "Error decoding path", http.StatusInternalServerError)
log.Printf("[%s] %s %s -> 500 (error decoding path: %v) [%v]",
utils.GetClientIP(r), r.Method, r.URL.Path, err, time.Since(start))
return
}
// 确定目标基础URL
targetBase := pathConfig.DefaultTarget
// 检查文件扩展名
if pathConfig.ExtensionMap != nil {
ext := strings.ToLower(path.Ext(decodedPath))
if ext != "" {
ext = ext[1:] // 移除开头的点
targetBase = pathConfig.GetTargetForExt(ext)
}
}
// 重新编码路径,保留 '/'
parts := strings.Split(decodedPath, "/")
for i, part := range parts {
parts[i] = url.PathEscape(part)
}
encodedPath := strings.Join(parts, "/")
targetURL := targetBase + encodedPath
// 解析目标 URL 以获取 host
parsedURL, err := url.Parse(targetURL)
if err != nil {
http.Error(w, "Error parsing target URL", http.StatusInternalServerError)
log.Printf("[%s] %s %s -> 500 (error parsing URL: %v) [%v]",
utils.GetClientIP(r), r.Method, r.URL.Path, err, time.Since(start))
return
}
// 创建新的请求
proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body)
if err != nil {
http.Error(w, "Error creating proxy request", http.StatusInternalServerError)
return
}
// 复制原始请求头
copyHeader(proxyReq.Header, r.Header)
// 特别处理图片请求
// if utils.IsImageRequest(r.URL.Path) {
// // 设置优化的 Accept 头
// accept := r.Header.Get("Accept")
// if accept != "" {
// proxyReq.Header.Set("Accept", accept)
// } else {
// proxyReq.Header.Set("Accept", "image/avif,image/webp,image/jpeg,image/png,*/*;q=0.8")
// }
// // 设置 Cloudflare 特定的头部
// proxyReq.Header.Set("CF-Accept-Content", "image/avif,image/webp")
// proxyReq.Header.Set("CF-Optimize-Images", "on")
// // 删除可能影响缓存的头部
// proxyReq.Header.Del("If-None-Match")
// proxyReq.Header.Del("If-Modified-Since")
// proxyReq.Header.Set("Cache-Control", "no-cache")
// }
// 特别处理图片请求
if utils.IsImageRequest(r.URL.Path) {
// 获取 Accept 头
accept := r.Header.Get("Accept")
// 根据 Accept 头设置合适的图片格式
if strings.Contains(accept, "image/avif") {
proxyReq.Header.Set("Accept", "image/avif")
} else if strings.Contains(accept, "image/webp") {
proxyReq.Header.Set("Accept", "image/webp")
}
// 设置 Cloudflare 特定的头部
proxyReq.Header.Set("CF-Image-Format", "auto") // 让 Cloudflare 根据 Accept 头自动选择格式
}
// 设置其他必要的头部
proxyReq.Host = parsedURL.Host
proxyReq.Header.Set("Host", parsedURL.Host)
proxyReq.Header.Set("X-Real-IP", utils.GetClientIP(r))
proxyReq.Header.Set("X-Forwarded-Host", r.Host)
proxyReq.Header.Set("X-Forwarded-Proto", r.URL.Scheme)
// 添加或更新 X-Forwarded-For
if clientIP := utils.GetClientIP(r); clientIP != "" {
if prior := proxyReq.Header.Get("X-Forwarded-For"); prior != "" {
proxyReq.Header.Set("X-Forwarded-For", prior+", "+clientIP)
} else {
proxyReq.Header.Set("X-Forwarded-For", clientIP)
}
}
// 发送代理请求
resp, err := h.client.Do(proxyReq)
if err != nil {
http.Error(w, "Error forwarding request", http.StatusBadGateway)
log.Printf("[%s] %s %s -> 502 (proxy error: %v) [%v]",
utils.GetClientIP(r), r.Method, r.URL.Path, err, time.Since(start))
return
}
defer resp.Body.Close()
copyHeader(w.Header(), resp.Header)
// 删除严格的 CSP
w.Header().Del("Content-Security-Policy")
// 设置响应状态码
w.WriteHeader(resp.StatusCode)
// 根据响应大小选择不同的处理策略
contentLength := resp.ContentLength
if contentLength > 0 && contentLength < 1<<20 { // 1MB 以下的小响应
// 直接读取到内存并一次性写入
body, err := io.ReadAll(resp.Body)
if err != nil {
http.Error(w, "Error reading response", http.StatusInternalServerError)
return
}
written, _ := w.Write(body)
h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), r)
} else {
// 大响应使用流式传输
var bytesCopied int64
if f, ok := w.(http.Flusher); ok {
buf := bufferPool.Get().([]byte)
defer bufferPool.Put(buf)
for {
n, rerr := resp.Body.Read(buf)
if n > 0 {
bytesCopied += int64(n)
_, werr := w.Write(buf[:n])
if werr != nil {
log.Printf("Error writing response: %v", werr)
return
}
f.Flush()
}
if rerr == io.EOF {
break
}
if rerr != nil {
log.Printf("Error reading response: %v", rerr)
break
}
}
} else {
// 如果不支持 Flusher使用普通的 io.Copy
bytesCopied, err = io.Copy(w, resp.Body)
if err != nil {
log.Printf("Error copying response: %v", err)
}
}
// 记录访问日志
log.Printf("| %-6s | %3d | %12s | %15s | %10s | %-30s | %-50s -> %s",
r.Method, // HTTP方法左对齐占6位
resp.StatusCode, // 状态码占3位
time.Since(start), // 处理时间占12位
utils.GetClientIP(r), // IP地址占15位
utils.FormatBytes(bytesCopied), // 传输大小占10位
utils.GetRequestSource(r), // 请求来源
r.URL.Path, // 请求路径左对齐占50位
targetURL, // 目标URL
)
h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), bytesCopied, r)
}
if err != nil {
atomic.AddInt64(&h.metrics.totalErrors, 1)
}
}
func (h *ProxyHandler) recordStats(path string, status int, latency time.Duration, bytes int64, r *http.Request) {
// 更新总字节数
h.metrics.totalBytes.Add(bytes)
// 更新状态码统计
if status >= 100 && status < 600 {
h.metrics.statusStats[status/100-1].Add(1)
}
// 更新延迟分布
bucket := int(latency.Milliseconds() / 100)
if bucket < 10 {
h.metrics.latencyBuckets[bucket].Add(1)
}
// 更新路径统计
if stats, ok := h.metrics.pathStats.Load(path); ok {
pathStats := stats.(*PathStats)
pathStats.requests.Add(1)
pathStats.bytes.Add(bytes)
pathStats.latencySum.Add(int64(latency))
} else {
// 首次遇到该路径
newStats := &PathStats{}
newStats.requests.Add(1)
newStats.bytes.Add(bytes)
newStats.latencySum.Add(int64(latency))
h.metrics.pathStats.Store(path, newStats)
}
// 记录最近的请求
log := &RequestLog{
Time: time.Now(),
Path: path,
Status: status,
Latency: latency,
BytesSent: bytes,
ClientIP: utils.GetClientIP(r),
}
cursor := h.recentRequests.cursor.Add(1) % 1000
h.recentRequests.Lock()
h.recentRequests.items[cursor] = log
h.recentRequests.Unlock()
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}