feat(metrics): integrate comprehensive metrics tracking across handlers

- Enhanced metrics collection in ProxyHandler, MirrorProxyHandler, and FixedPathProxyMiddleware.
- Introduced a centralized metrics collector to streamline request tracking and statistics reporting.
- Updated MetricsHandler to utilize new metrics structure, including total bytes, requests per second, and error rates.
- Improved documentation in readme.md to reflect new features and usage instructions.
- Added support for metrics monitoring at `/metrics/ui` for better visibility into proxy performance.
This commit is contained in:
wood chen 2024-11-30 22:26:11 +08:00
parent b8d7659aa9
commit 040b01f4a4
7 changed files with 310 additions and 224 deletions

View File

@ -2,11 +2,10 @@ package handler
import ( import (
"encoding/json" "encoding/json"
"fmt" "log"
"net/http" "net/http"
"runtime" "proxy-go/internal/metrics"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
@ -27,100 +26,45 @@ type Metrics struct {
RequestsPerSecond float64 `json:"requests_per_second"` RequestsPerSecond float64 `json:"requests_per_second"`
// 新增字段 // 新增字段
TotalBytes int64 `json:"total_bytes"` TotalBytes int64 `json:"total_bytes"`
BytesPerSecond float64 `json:"bytes_per_second"` BytesPerSecond float64 `json:"bytes_per_second"`
StatusCodeStats map[string]int64 `json:"status_code_stats"` StatusCodeStats map[string]int64 `json:"status_code_stats"`
LatencyPercentiles map[string]float64 `json:"latency_percentiles"` LatencyPercentiles map[string]float64 `json:"latency_percentiles"`
TopPaths []PathMetrics `json:"top_paths"` TopPaths []metrics.PathMetrics `json:"top_paths"`
RecentRequests []RequestLog `json:"recent_requests"` RecentRequests []metrics.RequestLog `json:"recent_requests"`
}
type PathMetrics struct {
Path string `json:"path"`
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
AvgLatency string `json:"avg_latency"`
BytesTransferred int64 `json:"bytes_transferred"`
}
// 添加格式化字节的辅助函数
func formatBytes(bytes uint64) string {
const (
MB = 1024 * 1024
KB = 1024
)
switch {
case bytes >= MB:
return fmt.Sprintf("%.2f MB", float64(bytes)/MB)
case bytes >= KB:
return fmt.Sprintf("%.2f KB", float64(bytes)/KB)
default:
return fmt.Sprintf("%d Bytes", bytes)
}
} }
func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) { func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
totalRequests := atomic.LoadInt64(&h.metrics.totalRequests)
totalErrors := atomic.LoadInt64(&h.metrics.totalErrors)
uptime := time.Since(h.startTime) uptime := time.Since(h.startTime)
collector := metrics.GetCollector()
stats := collector.GetStats()
// 获取状态码统计 if stats == nil {
statusStats := make(map[string]int64) http.Error(w, "Failed to get metrics", http.StatusInternalServerError)
for i, v := range h.metrics.statusStats { return
statusStats[fmt.Sprintf("%dxx", i+1)] = v.Load()
} }
// 获取Top 10路径统计
var pathMetrics []PathMetrics
h.metrics.pathStats.Range(func(key, value interface{}) bool {
stats := value.(*PathStats)
if stats.requests.Load() == 0 {
return true
}
pathMetrics = append(pathMetrics, PathMetrics{
Path: key.(string),
RequestCount: stats.requests.Load(),
ErrorCount: stats.errors.Load(),
AvgLatency: formatDuration(time.Duration(stats.latencySum.Load()) / time.Duration(stats.requests.Load())),
BytesTransferred: stats.bytes.Load(),
})
return len(pathMetrics) < 10
})
metrics := Metrics{ metrics := Metrics{
Uptime: uptime.String(), Uptime: uptime.String(),
ActiveRequests: atomic.LoadInt64(&h.metrics.activeRequests), ActiveRequests: stats["active_requests"].(int64),
TotalRequests: totalRequests, TotalRequests: stats["total_requests"].(int64),
TotalErrors: totalErrors, TotalErrors: stats["total_errors"].(int64),
ErrorRate: getErrorRate(totalErrors, totalRequests), ErrorRate: float64(stats["total_errors"].(int64)) / float64(stats["total_requests"].(int64)),
NumGoroutine: runtime.NumGoroutine(), NumGoroutine: stats["num_goroutine"].(int),
MemoryUsage: formatBytes(m.Alloc), MemoryUsage: stats["memory_usage"].(string),
AverageResponseTime: metrics.FormatDuration(time.Duration(stats["avg_latency"].(int64))),
TotalBytes: h.metrics.totalBytes.Load(), TotalBytes: stats["total_bytes"].(int64),
BytesPerSecond: float64(h.metrics.totalBytes.Load()) / max(uptime.Seconds(), 1), BytesPerSecond: float64(stats["total_bytes"].(int64)) / metrics.Max(uptime.Seconds(), 1),
RequestsPerSecond: float64(totalRequests) / max(uptime.Seconds(), 1), RequestsPerSecond: float64(stats["total_requests"].(int64)) / metrics.Max(uptime.Seconds(), 1),
StatusCodeStats: statusStats, StatusCodeStats: stats["status_code_stats"].(map[string]int64),
TopPaths: pathMetrics, TopPaths: stats["top_paths"].([]metrics.PathMetrics),
RecentRequests: getRecentRequests(h), RecentRequests: stats["recent_requests"].([]metrics.RequestLog),
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics) if err := json.NewEncoder(w).Encode(metrics); err != nil {
} log.Printf("Error encoding metrics: %v", err)
// 添加格式化时间的辅助函数
func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%.2f μs", float64(d.Microseconds()))
} }
if d < time.Second {
return fmt.Sprintf("%.2f ms", float64(d.Milliseconds()))
}
return fmt.Sprintf("%.2f s", d.Seconds())
} }
// 修改模板,添加登录页面 // 修改模板,添加登录页面
@ -599,34 +543,3 @@ func (h *ProxyHandler) MetricsAuthHandler(w http.ResponseWriter, r *http.Request
"token": token, "token": token,
}) })
} }
// 添加辅助函数
func getErrorRate(errors, total int64) float64 {
if total == 0 {
return 0
}
return float64(errors) / float64(total)
}
func getRecentRequests(h *ProxyHandler) []RequestLog {
var recentReqs []RequestLog
h.recentRequests.RLock()
defer h.recentRequests.RUnlock()
cursor := h.recentRequests.cursor.Load()
for i := 0; i < 10; i++ {
idx := (cursor - int64(i) + 1000) % 1000
if h.recentRequests.items[idx] != nil {
recentReqs = append(recentReqs, *h.recentRequests.items[idx])
}
}
return recentReqs
}
// 添加辅助函数
func max(a, b float64) float64 {
if a > b {
return a
}
return b
}

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"net/http" "net/http"
"net/url" "net/url"
"proxy-go/internal/metrics"
"proxy-go/internal/utils" "proxy-go/internal/utils"
"strings" "strings"
"time" "time"
@ -32,6 +33,9 @@ func NewMirrorProxyHandler() *MirrorProxyHandler {
func (h *MirrorProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *MirrorProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
startTime := time.Now() startTime := time.Now()
collector := metrics.GetCollector()
collector.BeginRequest()
defer collector.EndRequest()
// 设置 CORS 头 // 设置 CORS 头
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
@ -132,4 +136,7 @@ func (h *MirrorProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Method, resp.StatusCode, time.Since(startTime), r.Method, resp.StatusCode, time.Since(startTime),
utils.GetClientIP(r), utils.FormatBytes(bytesCopied), utils.GetClientIP(r), utils.FormatBytes(bytesCopied),
utils.GetRequestSource(r), actualURL) utils.GetRequestSource(r), actualURL)
// 记录统计信息
collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(startTime), bytesCopied, utils.GetClientIP(r))
} }

View File

@ -8,10 +8,10 @@ import (
"net/url" "net/url"
"path" "path"
"proxy-go/internal/config" "proxy-go/internal/config"
"proxy-go/internal/metrics"
"proxy-go/internal/utils" "proxy-go/internal/utils"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -34,38 +34,6 @@ type ProxyHandler struct {
startTime time.Time startTime time.Time
config *config.Config config *config.Config
auth *authManager auth *authManager
metrics struct {
activeRequests int64
totalRequests int64
totalErrors int64
totalBytes atomic.Int64 // 总传输字节数
pathStats sync.Map // 路径统计 map[string]*PathStats
statusStats [6]atomic.Int64 // HTTP状态码统计(1xx-5xx)
latencyBuckets [10]atomic.Int64 // 延迟分布(0-100ms, 100-200ms...)
}
recentRequests struct {
sync.RWMutex
items [1000]*RequestLog // 固定大小的环形缓冲区
cursor atomic.Int64 // 当前位置
}
}
// 单个请求的统计信息
type RequestLog struct {
Time time.Time
Path string
Status int
Latency time.Duration
BytesSent int64
ClientIP string
}
// 路径统计
type PathStats struct {
requests atomic.Int64
errors atomic.Int64
bytes atomic.Int64
latencySum atomic.Int64
} }
// 修改参数类型 // 修改参数类型
@ -90,9 +58,9 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler {
} }
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&h.metrics.activeRequests, 1) collector := metrics.GetCollector()
atomic.AddInt64(&h.metrics.totalRequests, 1) collector.BeginRequest()
defer atomic.AddInt64(&h.metrics.activeRequests, -1) defer collector.EndRequest()
if !h.limiter.Allow() { if !h.limiter.Allow() {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests) http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
@ -141,7 +109,7 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
// 确定标基础URL // 确定<EFBFBD><EFBFBD><EFBFBD>标基础URL
targetBase := pathConfig.DefaultTarget targetBase := pathConfig.DefaultTarget
// 检查文件扩展名 // 检查文件扩展名
@ -260,7 +228,7 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
written, _ := w.Write(body) written, _ := w.Write(body)
h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), r) collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), utils.GetClientIP(r))
} else { } else {
// 大响应使用流式传输 // 大响应使用流式传输
var bytesCopied int64 var bytesCopied int64
@ -306,58 +274,8 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
targetURL, // 目标URL targetURL, // 目标URL
) )
h.recordStats(r.URL.Path, resp.StatusCode, time.Since(start), bytesCopied, r) collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(start), bytesCopied, utils.GetClientIP(r))
} }
if err != nil {
atomic.AddInt64(&h.metrics.totalErrors, 1)
}
}
func (h *ProxyHandler) recordStats(path string, status int, latency time.Duration, bytes int64, r *http.Request) {
// 更新总字节数
h.metrics.totalBytes.Add(bytes)
// 更新状态码统计
if status >= 100 && status < 600 {
h.metrics.statusStats[status/100-1].Add(1)
}
// 更新延迟分布
bucket := int(latency.Milliseconds() / 100)
if bucket < 10 {
h.metrics.latencyBuckets[bucket].Add(1)
}
// 更新路径统计
if stats, ok := h.metrics.pathStats.Load(path); ok {
pathStats := stats.(*PathStats)
pathStats.requests.Add(1)
pathStats.bytes.Add(bytes)
pathStats.latencySum.Add(int64(latency))
} else {
// 首次遇到该路径
newStats := &PathStats{}
newStats.requests.Add(1)
newStats.bytes.Add(bytes)
newStats.latencySum.Add(int64(latency))
h.metrics.pathStats.Store(path, newStats)
}
// 记录最近的请求
log := &RequestLog{
Time: time.Now(),
Path: path,
Status: status,
Latency: latency,
BytesSent: bytes,
ClientIP: utils.GetClientIP(r),
}
cursor := h.recentRequests.cursor.Add(1) % 1000
h.recentRequests.Lock()
h.recentRequests.items[cursor] = log
h.recentRequests.Unlock()
} }
func copyHeader(dst, src http.Header) { func copyHeader(dst, src http.Header) {

View File

@ -0,0 +1,223 @@
package metrics
import (
"fmt"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
)
type Collector struct {
startTime time.Time
activeRequests int64
totalRequests int64
totalErrors int64
totalBytes atomic.Int64
latencySum atomic.Int64
pathStats sync.Map
statusStats [6]atomic.Int64
latencyBuckets [10]atomic.Int64
recentRequests struct {
sync.RWMutex
items [1000]*RequestLog
cursor atomic.Int64
}
}
var globalCollector = &Collector{
startTime: time.Now(),
pathStats: sync.Map{},
statusStats: [6]atomic.Int64{},
latencyBuckets: [10]atomic.Int64{},
}
func GetCollector() *Collector {
return globalCollector
}
func (c *Collector) BeginRequest() {
atomic.AddInt64(&c.activeRequests, 1)
}
func (c *Collector) EndRequest() {
atomic.AddInt64(&c.activeRequests, -1)
}
func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string) {
// 更新总请求数
atomic.AddInt64(&c.totalRequests, 1)
// 更新总字节数
c.totalBytes.Add(bytes)
// 更新状态码统计
if status >= 100 && status < 600 {
c.statusStats[status/100-1].Add(1)
}
// 更新错误数
if status >= 400 {
atomic.AddInt64(&c.totalErrors, 1)
}
// 更新延迟分布
bucket := int(latency.Milliseconds() / 100)
if bucket < 10 {
c.latencyBuckets[bucket].Add(1)
}
// 更新路径统计
if stats, ok := c.pathStats.Load(path); ok {
pathStats := stats.(*PathStats)
pathStats.requests.Add(1)
if status >= 400 {
pathStats.errors.Add(1)
}
pathStats.bytes.Add(bytes)
pathStats.latencySum.Add(int64(latency))
} else {
newStats := &PathStats{}
newStats.requests.Add(1)
if status >= 400 {
newStats.errors.Add(1)
}
newStats.bytes.Add(bytes)
newStats.latencySum.Add(int64(latency))
c.pathStats.Store(path, newStats)
}
// 记录最近的请求
log := &RequestLog{
Time: time.Now(),
Path: path,
Status: status,
Latency: latency,
BytesSent: bytes,
ClientIP: clientIP,
}
c.recentRequests.Lock()
cursor := c.recentRequests.cursor.Add(1) % 1000
c.recentRequests.items[cursor] = log
c.recentRequests.Unlock()
c.latencySum.Add(int64(latency))
}
func (c *Collector) GetStats() map[string]interface{} {
var m runtime.MemStats
runtime.ReadMemStats(&m)
uptime := time.Since(c.startTime)
totalRequests := atomic.LoadInt64(&c.totalRequests)
totalErrors := atomic.LoadInt64(&c.totalErrors)
// 获取状态码统计
statusStats := make(map[string]int64)
for i, v := range c.statusStats {
statusStats[fmt.Sprintf("%dxx", i+1)] = v.Load()
}
// 获取Top 10路径统计
var pathMetrics []PathMetrics
var allPaths []PathMetrics
c.pathStats.Range(func(key, value interface{}) bool {
stats := value.(*PathStats)
if stats.requests.Load() == 0 {
return true
}
allPaths = append(allPaths, PathMetrics{
Path: key.(string),
RequestCount: stats.requests.Load(),
ErrorCount: stats.errors.Load(),
AvgLatency: FormatDuration(time.Duration(stats.latencySum.Load() / stats.requests.Load())),
BytesTransferred: stats.bytes.Load(),
})
return true
})
// 按请求数排序
sort.Slice(allPaths, func(i, j int) bool {
return allPaths[i].RequestCount > allPaths[j].RequestCount
})
// 取前10个
if len(allPaths) > 10 {
pathMetrics = allPaths[:10]
} else {
pathMetrics = allPaths
}
return map[string]interface{}{
"uptime": uptime.String(),
"active_requests": atomic.LoadInt64(&c.activeRequests),
"total_requests": totalRequests,
"total_errors": totalErrors,
"error_rate": float64(totalErrors) / float64(totalRequests),
"num_goroutine": runtime.NumGoroutine(),
"memory_usage": FormatBytes(m.Alloc),
"total_bytes": c.totalBytes.Load(),
"bytes_per_second": float64(c.totalBytes.Load()) / Max(uptime.Seconds(), 1),
"avg_latency": func() int64 {
if totalRequests > 0 {
return int64(c.latencySum.Load() / totalRequests)
}
return 0
}(),
"status_code_stats": statusStats,
"top_paths": pathMetrics,
"recent_requests": c.getRecentRequests(),
}
}
func (c *Collector) getRecentRequests() []RequestLog {
var recentReqs []RequestLog
c.recentRequests.RLock()
defer c.recentRequests.RUnlock()
cursor := c.recentRequests.cursor.Load()
for i := 0; i < 10; i++ {
idx := (cursor - int64(i) + 1000) % 1000
if c.recentRequests.items[idx] != nil {
recentReqs = append(recentReqs, *c.recentRequests.items[idx])
}
}
return recentReqs
}
// 辅助函数
func FormatDuration(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%.2f μs", float64(d.Microseconds()))
}
if d < time.Second {
return fmt.Sprintf("%.2f ms", float64(d.Milliseconds()))
}
return fmt.Sprintf("%.2f s", d.Seconds())
}
func FormatBytes(bytes uint64) string {
const (
MB = 1024 * 1024
KB = 1024
)
switch {
case bytes >= MB:
return fmt.Sprintf("%.2f MB", float64(bytes)/MB)
case bytes >= KB:
return fmt.Sprintf("%.2f KB", float64(bytes)/KB)
default:
return fmt.Sprintf("%d Bytes", bytes)
}
}
func Max(a, b float64) float64 {
if a > b {
return a
}
return b
}

33
internal/metrics/types.go Normal file
View File

@ -0,0 +1,33 @@
package metrics
import (
"sync/atomic"
"time"
)
// RequestLog 记录单个请求的信息
type RequestLog struct {
Time time.Time
Path string
Status int
Latency time.Duration
BytesSent int64
ClientIP string
}
// PathStats 记录路径统计信息
type PathStats struct {
requests atomic.Int64
errors atomic.Int64
bytes atomic.Int64
latencySum atomic.Int64
}
// PathMetrics 用于API返回的路径统计信息
type PathMetrics struct {
Path string `json:"path"`
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
AvgLatency string `json:"avg_latency"`
BytesTransferred int64 `json:"bytes_transferred"`
}

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"net/http" "net/http"
"proxy-go/internal/config" "proxy-go/internal/config"
"proxy-go/internal/metrics"
"proxy-go/internal/utils" "proxy-go/internal/utils"
"strings" "strings"
"syscall" "syscall"
@ -21,7 +22,11 @@ type FixedPathConfig struct {
func FixedPathProxyMiddleware(configs []config.FixedPathConfig) func(http.Handler) http.Handler { func FixedPathProxyMiddleware(configs []config.FixedPathConfig) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now() // 添加时间记录 startTime := time.Now()
collector := metrics.GetCollector()
collector.BeginRequest()
defer collector.EndRequest()
// 检查是否匹配任何固定路径 // 检查是否匹配任何固定路径
for _, cfg := range configs { for _, cfg := range configs {
if strings.HasPrefix(r.URL.Path, cfg.Path) { if strings.HasPrefix(r.URL.Path, cfg.Path) {
@ -77,17 +82,8 @@ func FixedPathProxyMiddleware(configs []config.FixedPathConfig) func(http.Handle
log.Printf("[%s] Error copying response: %v", utils.GetClientIP(r), err) log.Printf("[%s] Error copying response: %v", utils.GetClientIP(r), err)
} }
// 记录成功的请求 // 记录统计信息
log.Printf("| %-6s | %3d | %12s | %15s | %10s | %-30s | %-50s -> %s", collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(startTime), bytesCopied, utils.GetClientIP(r))
r.Method, // HTTP方法左对齐占6位
resp.StatusCode, // 状态码占3位
time.Since(startTime), // 处理时间占12位
utils.GetClientIP(r), // IP地址占15位
utils.FormatBytes(bytesCopied), // 传输大小占10位
utils.GetRequestSource(r), // 请求来源
r.URL.Path, // 请求路径左对齐占50位
targetURL, // 目标URL
)
return return
} }

View File

@ -1,6 +1,6 @@
# Proxy-Go # Proxy-Go
A simple reverse proxy server written in Go. A 'simple' reverse proxy server written in Go.
使用方法: https://q58.org/t/topic/165?u=wood 使用方法: https://q58.org/t/topic/165?u=wood
@ -11,12 +11,8 @@ A simple reverse proxy server written in Go.
3. 回源Host修改 3. 回源Host修改
4. 大文件使用流式传输, 小文件直接提供 4. 大文件使用流式传输, 小文件直接提供
5. 可以按照文件后缀名代理不同站点, 方便图片处理等 5. 可以按照文件后缀名代理不同站点, 方便图片处理等
6. 适配Cloudflare Images的图片自适应功能, 支持`format=auto` 6. 适配Cloudflare Images的图片自适应功能, 透传`Accept`头, 支持`format=auto`
7. 支持metrics监控, 在`/metrics/ui`查看, 具体可以看帖子里写的用法
## TIPS
写的比较潦草, 希望有能力的同学帮忙完善优化一下