From 040b01f4a46315f2b079f585702b16d058cacae9 Mon Sep 17 00:00:00 2001 From: wood chen Date: Sat, 30 Nov 2024 22:26:11 +0800 Subject: [PATCH] feat(metrics): integrate comprehensive metrics tracking across handlers - Enhanced metrics collection in ProxyHandler, MirrorProxyHandler, and FixedPathProxyMiddleware. - Introduced a centralized metrics collector to streamline request tracking and statistics reporting. - Updated MetricsHandler to utilize new metrics structure, including total bytes, requests per second, and error rates. - Improved documentation in readme.md to reflect new features and usage instructions. - Added support for metrics monitoring at `/metrics/ui` for better visibility into proxy performance. --- internal/handler/metrics.go | 145 +++------------ internal/handler/mirror_proxy.go | 7 + internal/handler/proxy.go | 96 +--------- internal/metrics/collector.go | 223 ++++++++++++++++++++++++ internal/metrics/types.go | 33 ++++ internal/middleware/fixed_path_proxy.go | 20 +-- readme.md | 10 +- 7 files changed, 310 insertions(+), 224 deletions(-) create mode 100644 internal/metrics/collector.go create mode 100644 internal/metrics/types.go diff --git a/internal/handler/metrics.go b/internal/handler/metrics.go index caf8d86..a5f3f81 100644 --- a/internal/handler/metrics.go +++ b/internal/handler/metrics.go @@ -2,11 +2,10 @@ package handler import ( "encoding/json" - "fmt" + "log" "net/http" - "runtime" + "proxy-go/internal/metrics" "strings" - "sync/atomic" "time" ) @@ -27,100 +26,45 @@ type Metrics struct { RequestsPerSecond float64 `json:"requests_per_second"` // 新增字段 - TotalBytes int64 `json:"total_bytes"` - BytesPerSecond float64 `json:"bytes_per_second"` - StatusCodeStats map[string]int64 `json:"status_code_stats"` - LatencyPercentiles map[string]float64 `json:"latency_percentiles"` - TopPaths []PathMetrics `json:"top_paths"` - RecentRequests []RequestLog `json:"recent_requests"` -} - -type PathMetrics struct { - Path string `json:"path"` - RequestCount int64 `json:"request_count"` - ErrorCount int64 `json:"error_count"` - AvgLatency string `json:"avg_latency"` - BytesTransferred int64 `json:"bytes_transferred"` -} - -// 添加格式化字节的辅助函数 -func formatBytes(bytes uint64) string { - const ( - MB = 1024 * 1024 - KB = 1024 - ) - - switch { - case bytes >= MB: - return fmt.Sprintf("%.2f MB", float64(bytes)/MB) - case bytes >= KB: - return fmt.Sprintf("%.2f KB", float64(bytes)/KB) - default: - return fmt.Sprintf("%d Bytes", bytes) - } + TotalBytes int64 `json:"total_bytes"` + BytesPerSecond float64 `json:"bytes_per_second"` + StatusCodeStats map[string]int64 `json:"status_code_stats"` + LatencyPercentiles map[string]float64 `json:"latency_percentiles"` + TopPaths []metrics.PathMetrics `json:"top_paths"` + RecentRequests []metrics.RequestLog `json:"recent_requests"` } func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) { - var m runtime.MemStats - runtime.ReadMemStats(&m) - - totalRequests := atomic.LoadInt64(&h.metrics.totalRequests) - totalErrors := atomic.LoadInt64(&h.metrics.totalErrors) uptime := time.Since(h.startTime) + collector := metrics.GetCollector() + stats := collector.GetStats() - // 获取状态码统计 - statusStats := make(map[string]int64) - for i, v := range h.metrics.statusStats { - statusStats[fmt.Sprintf("%dxx", i+1)] = v.Load() + if stats == nil { + http.Error(w, "Failed to get metrics", http.StatusInternalServerError) + return } - // 获取Top 10路径统计 - var pathMetrics []PathMetrics - h.metrics.pathStats.Range(func(key, value interface{}) bool { - stats := value.(*PathStats) - if stats.requests.Load() == 0 { - return true - } - pathMetrics = append(pathMetrics, PathMetrics{ - Path: key.(string), - RequestCount: stats.requests.Load(), - ErrorCount: stats.errors.Load(), - AvgLatency: formatDuration(time.Duration(stats.latencySum.Load()) / time.Duration(stats.requests.Load())), - BytesTransferred: stats.bytes.Load(), - }) - return len(pathMetrics) < 10 - }) - metrics := Metrics{ - Uptime: uptime.String(), - ActiveRequests: atomic.LoadInt64(&h.metrics.activeRequests), - TotalRequests: totalRequests, - TotalErrors: totalErrors, - ErrorRate: getErrorRate(totalErrors, totalRequests), - NumGoroutine: runtime.NumGoroutine(), - MemoryUsage: formatBytes(m.Alloc), - - TotalBytes: h.metrics.totalBytes.Load(), - BytesPerSecond: float64(h.metrics.totalBytes.Load()) / max(uptime.Seconds(), 1), - RequestsPerSecond: float64(totalRequests) / max(uptime.Seconds(), 1), - StatusCodeStats: statusStats, - TopPaths: pathMetrics, - RecentRequests: getRecentRequests(h), + Uptime: uptime.String(), + ActiveRequests: stats["active_requests"].(int64), + TotalRequests: stats["total_requests"].(int64), + TotalErrors: stats["total_errors"].(int64), + ErrorRate: float64(stats["total_errors"].(int64)) / float64(stats["total_requests"].(int64)), + NumGoroutine: stats["num_goroutine"].(int), + MemoryUsage: stats["memory_usage"].(string), + AverageResponseTime: metrics.FormatDuration(time.Duration(stats["avg_latency"].(int64))), + TotalBytes: stats["total_bytes"].(int64), + BytesPerSecond: float64(stats["total_bytes"].(int64)) / metrics.Max(uptime.Seconds(), 1), + RequestsPerSecond: float64(stats["total_requests"].(int64)) / metrics.Max(uptime.Seconds(), 1), + StatusCodeStats: stats["status_code_stats"].(map[string]int64), + TopPaths: stats["top_paths"].([]metrics.PathMetrics), + RecentRequests: stats["recent_requests"].([]metrics.RequestLog), } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(metrics) -} - -// 添加格式化时间的辅助函数 -func formatDuration(d time.Duration) string { - if d < time.Millisecond { - return fmt.Sprintf("%.2f μs", float64(d.Microseconds())) + if err := json.NewEncoder(w).Encode(metrics); err != nil { + log.Printf("Error encoding metrics: %v", err) } - if d < time.Second { - return fmt.Sprintf("%.2f ms", float64(d.Milliseconds())) - } - return fmt.Sprintf("%.2f s", d.Seconds()) } // 修改模板,添加登录页面 @@ -599,34 +543,3 @@ func (h *ProxyHandler) MetricsAuthHandler(w http.ResponseWriter, r *http.Request "token": token, }) } - -// 添加辅助函数 -func getErrorRate(errors, total int64) float64 { - if total == 0 { - return 0 - } - return float64(errors) / float64(total) -} - -func getRecentRequests(h *ProxyHandler) []RequestLog { - var recentReqs []RequestLog - h.recentRequests.RLock() - defer h.recentRequests.RUnlock() - - cursor := h.recentRequests.cursor.Load() - for i := 0; i < 10; i++ { - idx := (cursor - int64(i) + 1000) % 1000 - if h.recentRequests.items[idx] != nil { - recentReqs = append(recentReqs, *h.recentRequests.items[idx]) - } - } - return recentReqs -} - -// 添加辅助函数 -func max(a, b float64) float64 { - if a > b { - return a - } - return b -} diff --git a/internal/handler/mirror_proxy.go b/internal/handler/mirror_proxy.go index b790831..2fdb9ba 100644 --- a/internal/handler/mirror_proxy.go +++ b/internal/handler/mirror_proxy.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "net/url" + "proxy-go/internal/metrics" "proxy-go/internal/utils" "strings" "time" @@ -32,6 +33,9 @@ func NewMirrorProxyHandler() *MirrorProxyHandler { func (h *MirrorProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { startTime := time.Now() + collector := metrics.GetCollector() + collector.BeginRequest() + defer collector.EndRequest() // 设置 CORS 头 w.Header().Set("Access-Control-Allow-Origin", "*") @@ -132,4 +136,7 @@ func (h *MirrorProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Method, resp.StatusCode, time.Since(startTime), utils.GetClientIP(r), utils.FormatBytes(bytesCopied), utils.GetRequestSource(r), actualURL) + + // 记录统计信息 + collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(startTime), bytesCopied, utils.GetClientIP(r)) } diff --git a/internal/handler/proxy.go b/internal/handler/proxy.go index b1c3557..9f91d6b 100644 --- a/internal/handler/proxy.go +++ b/internal/handler/proxy.go @@ -8,10 +8,10 @@ import ( "net/url" "path" "proxy-go/internal/config" + "proxy-go/internal/metrics" "proxy-go/internal/utils" "strings" "sync" - "sync/atomic" "time" "golang.org/x/time/rate" @@ -34,38 +34,6 @@ type ProxyHandler struct { startTime time.Time config *config.Config auth *authManager - metrics struct { - activeRequests int64 - totalRequests int64 - totalErrors int64 - totalBytes atomic.Int64 // 总传输字节数 - pathStats sync.Map // 路径统计 map[string]*PathStats - statusStats [6]atomic.Int64 // HTTP状态码统计(1xx-5xx) - latencyBuckets [10]atomic.Int64 // 延迟分布(0-100ms, 100-200ms...) - } - recentRequests struct { - sync.RWMutex - items [1000]*RequestLog // 固定大小的环形缓冲区 - cursor atomic.Int64 // 当前位置 - } -} - -// 单个请求的统计信息 -type RequestLog struct { - Time time.Time - Path string - Status int - Latency time.Duration - BytesSent int64 - ClientIP string -} - -// 路径统计 -type PathStats struct { - requests atomic.Int64 - errors atomic.Int64 - bytes atomic.Int64 - latencySum atomic.Int64 } // 修改参数类型 @@ -90,9 +58,9 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler { } func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - atomic.AddInt64(&h.metrics.activeRequests, 1) - atomic.AddInt64(&h.metrics.totalRequests, 1) - defer atomic.AddInt64(&h.metrics.activeRequests, -1) + collector := metrics.GetCollector() + collector.BeginRequest() + defer collector.EndRequest() if !h.limiter.Allow() { http.Error(w, "Too Many Requests", http.StatusTooManyRequests) @@ -141,7 +109,7 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // 确定目标基础URL + // 确定���标基础URL targetBase := pathConfig.DefaultTarget // 检查文件扩展名 @@ -260,7 +228,7 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } written, _ := w.Write(body) - h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), r) + collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), utils.GetClientIP(r)) } else { // 大响应使用流式传输 var bytesCopied int64 @@ -306,58 +274,8 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { targetURL, // 目标URL ) - h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), bytesCopied, r) + collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(start), bytesCopied, utils.GetClientIP(r)) } - - if err != nil { - atomic.AddInt64(&h.metrics.totalErrors, 1) - } -} - -func (h *ProxyHandler) recordStats(path string, status int, latency time.Duration, bytes int64, r *http.Request) { - // 更新总字节数 - h.metrics.totalBytes.Add(bytes) - - // 更新状态码统计 - if status >= 100 && status < 600 { - h.metrics.statusStats[status/100-1].Add(1) - } - - // 更新延迟分布 - bucket := int(latency.Milliseconds() / 100) - if bucket < 10 { - h.metrics.latencyBuckets[bucket].Add(1) - } - - // 更新路径统计 - if stats, ok := h.metrics.pathStats.Load(path); ok { - pathStats := stats.(*PathStats) - pathStats.requests.Add(1) - pathStats.bytes.Add(bytes) - pathStats.latencySum.Add(int64(latency)) - } else { - // 首次遇到该路径 - newStats := &PathStats{} - newStats.requests.Add(1) - newStats.bytes.Add(bytes) - newStats.latencySum.Add(int64(latency)) - h.metrics.pathStats.Store(path, newStats) - } - - // 记录最近的请求 - log := &RequestLog{ - Time: time.Now(), - Path: path, - Status: status, - Latency: latency, - BytesSent: bytes, - ClientIP: utils.GetClientIP(r), - } - - cursor := h.recentRequests.cursor.Add(1) % 1000 - h.recentRequests.Lock() - h.recentRequests.items[cursor] = log - h.recentRequests.Unlock() } func copyHeader(dst, src http.Header) { diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go new file mode 100644 index 0000000..03fca58 --- /dev/null +++ b/internal/metrics/collector.go @@ -0,0 +1,223 @@ +package metrics + +import ( + "fmt" + "runtime" + "sort" + "sync" + "sync/atomic" + "time" +) + +type Collector struct { + startTime time.Time + activeRequests int64 + totalRequests int64 + totalErrors int64 + totalBytes atomic.Int64 + latencySum atomic.Int64 + pathStats sync.Map + statusStats [6]atomic.Int64 + latencyBuckets [10]atomic.Int64 + recentRequests struct { + sync.RWMutex + items [1000]*RequestLog + cursor atomic.Int64 + } +} + +var globalCollector = &Collector{ + startTime: time.Now(), + pathStats: sync.Map{}, + statusStats: [6]atomic.Int64{}, + latencyBuckets: [10]atomic.Int64{}, +} + +func GetCollector() *Collector { + return globalCollector +} + +func (c *Collector) BeginRequest() { + atomic.AddInt64(&c.activeRequests, 1) +} + +func (c *Collector) EndRequest() { + atomic.AddInt64(&c.activeRequests, -1) +} + +func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string) { + // 更新总请求数 + atomic.AddInt64(&c.totalRequests, 1) + + // 更新总字节数 + c.totalBytes.Add(bytes) + + // 更新状态码统计 + if status >= 100 && status < 600 { + c.statusStats[status/100-1].Add(1) + } + + // 更新错误数 + if status >= 400 { + atomic.AddInt64(&c.totalErrors, 1) + } + + // 更新延迟分布 + bucket := int(latency.Milliseconds() / 100) + if bucket < 10 { + c.latencyBuckets[bucket].Add(1) + } + + // 更新路径统计 + if stats, ok := c.pathStats.Load(path); ok { + pathStats := stats.(*PathStats) + pathStats.requests.Add(1) + if status >= 400 { + pathStats.errors.Add(1) + } + pathStats.bytes.Add(bytes) + pathStats.latencySum.Add(int64(latency)) + } else { + newStats := &PathStats{} + newStats.requests.Add(1) + if status >= 400 { + newStats.errors.Add(1) + } + newStats.bytes.Add(bytes) + newStats.latencySum.Add(int64(latency)) + c.pathStats.Store(path, newStats) + } + + // 记录最近的请求 + log := &RequestLog{ + Time: time.Now(), + Path: path, + Status: status, + Latency: latency, + BytesSent: bytes, + ClientIP: clientIP, + } + + c.recentRequests.Lock() + cursor := c.recentRequests.cursor.Add(1) % 1000 + c.recentRequests.items[cursor] = log + c.recentRequests.Unlock() + + c.latencySum.Add(int64(latency)) +} + +func (c *Collector) GetStats() map[string]interface{} { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + uptime := time.Since(c.startTime) + totalRequests := atomic.LoadInt64(&c.totalRequests) + totalErrors := atomic.LoadInt64(&c.totalErrors) + + // 获取状态码统计 + statusStats := make(map[string]int64) + for i, v := range c.statusStats { + statusStats[fmt.Sprintf("%dxx", i+1)] = v.Load() + } + + // 获取Top 10路径统计 + var pathMetrics []PathMetrics + var allPaths []PathMetrics + + c.pathStats.Range(func(key, value interface{}) bool { + stats := value.(*PathStats) + if stats.requests.Load() == 0 { + return true + } + allPaths = append(allPaths, PathMetrics{ + Path: key.(string), + RequestCount: stats.requests.Load(), + ErrorCount: stats.errors.Load(), + AvgLatency: FormatDuration(time.Duration(stats.latencySum.Load() / stats.requests.Load())), + BytesTransferred: stats.bytes.Load(), + }) + return true + }) + + // 按请求数排序 + sort.Slice(allPaths, func(i, j int) bool { + return allPaths[i].RequestCount > allPaths[j].RequestCount + }) + + // 取前10个 + if len(allPaths) > 10 { + pathMetrics = allPaths[:10] + } else { + pathMetrics = allPaths + } + + return map[string]interface{}{ + "uptime": uptime.String(), + "active_requests": atomic.LoadInt64(&c.activeRequests), + "total_requests": totalRequests, + "total_errors": totalErrors, + "error_rate": float64(totalErrors) / float64(totalRequests), + "num_goroutine": runtime.NumGoroutine(), + "memory_usage": FormatBytes(m.Alloc), + "total_bytes": c.totalBytes.Load(), + "bytes_per_second": float64(c.totalBytes.Load()) / Max(uptime.Seconds(), 1), + "avg_latency": func() int64 { + if totalRequests > 0 { + return int64(c.latencySum.Load() / totalRequests) + } + return 0 + }(), + "status_code_stats": statusStats, + "top_paths": pathMetrics, + "recent_requests": c.getRecentRequests(), + } +} + +func (c *Collector) getRecentRequests() []RequestLog { + var recentReqs []RequestLog + c.recentRequests.RLock() + defer c.recentRequests.RUnlock() + + cursor := c.recentRequests.cursor.Load() + for i := 0; i < 10; i++ { + idx := (cursor - int64(i) + 1000) % 1000 + if c.recentRequests.items[idx] != nil { + recentReqs = append(recentReqs, *c.recentRequests.items[idx]) + } + } + return recentReqs +} + +// 辅助函数 +func FormatDuration(d time.Duration) string { + if d < time.Millisecond { + return fmt.Sprintf("%.2f μs", float64(d.Microseconds())) + } + if d < time.Second { + return fmt.Sprintf("%.2f ms", float64(d.Milliseconds())) + } + return fmt.Sprintf("%.2f s", d.Seconds()) +} + +func FormatBytes(bytes uint64) string { + const ( + MB = 1024 * 1024 + KB = 1024 + ) + + switch { + case bytes >= MB: + return fmt.Sprintf("%.2f MB", float64(bytes)/MB) + case bytes >= KB: + return fmt.Sprintf("%.2f KB", float64(bytes)/KB) + default: + return fmt.Sprintf("%d Bytes", bytes) + } +} + +func Max(a, b float64) float64 { + if a > b { + return a + } + return b +} diff --git a/internal/metrics/types.go b/internal/metrics/types.go new file mode 100644 index 0000000..9c4b9e4 --- /dev/null +++ b/internal/metrics/types.go @@ -0,0 +1,33 @@ +package metrics + +import ( + "sync/atomic" + "time" +) + +// RequestLog 记录单个请求的信息 +type RequestLog struct { + Time time.Time + Path string + Status int + Latency time.Duration + BytesSent int64 + ClientIP string +} + +// PathStats 记录路径统计信息 +type PathStats struct { + requests atomic.Int64 + errors atomic.Int64 + bytes atomic.Int64 + latencySum atomic.Int64 +} + +// PathMetrics 用于API返回的路径统计信息 +type PathMetrics struct { + Path string `json:"path"` + RequestCount int64 `json:"request_count"` + ErrorCount int64 `json:"error_count"` + AvgLatency string `json:"avg_latency"` + BytesTransferred int64 `json:"bytes_transferred"` +} diff --git a/internal/middleware/fixed_path_proxy.go b/internal/middleware/fixed_path_proxy.go index fdc427f..0b6365a 100644 --- a/internal/middleware/fixed_path_proxy.go +++ b/internal/middleware/fixed_path_proxy.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "proxy-go/internal/config" + "proxy-go/internal/metrics" "proxy-go/internal/utils" "strings" "syscall" @@ -21,7 +22,11 @@ type FixedPathConfig struct { func FixedPathProxyMiddleware(configs []config.FixedPathConfig) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() // 添加时间记录 + startTime := time.Now() + collector := metrics.GetCollector() + collector.BeginRequest() + defer collector.EndRequest() + // 检查是否匹配任何固定路径 for _, cfg := range configs { if strings.HasPrefix(r.URL.Path, cfg.Path) { @@ -77,17 +82,8 @@ func FixedPathProxyMiddleware(configs []config.FixedPathConfig) func(http.Handle log.Printf("[%s] Error copying response: %v", utils.GetClientIP(r), err) } - // 记录成功的请求 - log.Printf("| %-6s | %3d | %12s | %15s | %10s | %-30s | %-50s -> %s", - r.Method, // HTTP方法,左对齐占6位 - resp.StatusCode, // 状态码,占3位 - time.Since(startTime), // 处理时间,占12位 - utils.GetClientIP(r), // IP地址,占15位 - utils.FormatBytes(bytesCopied), // 传输大小,占10位 - utils.GetRequestSource(r), // 请求来源 - r.URL.Path, // 请求路径,左对齐占50位 - targetURL, // 目标URL - ) + // 记录统计信息 + collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(startTime), bytesCopied, utils.GetClientIP(r)) return } diff --git a/readme.md b/readme.md index 5c5a1b4..07a68f5 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ # Proxy-Go -A simple reverse proxy server written in Go. +A 'simple' reverse proxy server written in Go. 使用方法: https://q58.org/t/topic/165?u=wood @@ -11,12 +11,8 @@ A simple reverse proxy server written in Go. 3. 回源Host修改 4. 大文件使用流式传输, 小文件直接提供 5. 可以按照文件后缀名代理不同站点, 方便图片处理等 -6. 适配Cloudflare Images的图片自适应功能, 支持`format=auto` - - -## TIPS - -写的比较潦草, 希望有能力的同学帮忙完善优化一下 +6. 适配Cloudflare Images的图片自适应功能, 透传`Accept`头, 支持`format=auto` +7. 支持metrics监控, 在`/metrics/ui`查看, 具体可以看帖子里写的用法