mirror of
https://github.com/woodchen-ink/proxy-go.git
synced 2025-07-18 08:31:55 +08:00
- Optimize latency bucket counter initialization using pointer creation - Add type checking when loading latency distribution metrics - Simplify counter initialization and access in metrics collector - Enhance type safety for atomic counter operations
458 lines
13 KiB
Go
458 lines
13 KiB
Go
package metrics
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"net/http"
|
|
"proxy-go/internal/config"
|
|
"proxy-go/internal/models"
|
|
"proxy-go/internal/utils"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Collector 指标收集器
|
|
type Collector struct {
|
|
startTime time.Time
|
|
activeRequests int64
|
|
totalRequests int64
|
|
totalErrors int64
|
|
totalBytes int64
|
|
latencySum int64
|
|
maxLatency int64 // 最大响应时间
|
|
minLatency int64 // 最小响应时间
|
|
clientErrors int64 // 4xx错误
|
|
serverErrors int64 // 5xx错误
|
|
pathStats sync.Map
|
|
statusCodeStats sync.Map
|
|
latencyBuckets sync.Map // 响应时间分布
|
|
bandwidthStats sync.Map // 带宽统计
|
|
errorTypes sync.Map // 错误类型统计
|
|
recentRequests []models.RequestLog
|
|
recentRequestsMutex sync.RWMutex
|
|
pathStatsMutex sync.RWMutex
|
|
config *config.Config
|
|
lastMinute time.Time // 用于计算每分钟带宽
|
|
minuteBytes int64 // 当前分钟的字节数
|
|
|
|
// 新增:时间段统计
|
|
lastStatsTime time.Time
|
|
intervalRequests int64
|
|
intervalErrors int64
|
|
intervalBytes int64
|
|
intervalLatencySum int64
|
|
intervalStatusCodes sync.Map
|
|
}
|
|
|
|
var (
|
|
instance *Collector
|
|
once sync.Once
|
|
)
|
|
|
|
// InitCollector 初始化收集器
|
|
func InitCollector(cfg *config.Config) error {
|
|
once.Do(func() {
|
|
instance = &Collector{
|
|
startTime: time.Now(),
|
|
lastMinute: time.Now(),
|
|
lastStatsTime: time.Now(),
|
|
recentRequests: make([]models.RequestLog, 0, 1000),
|
|
config: cfg,
|
|
minLatency: math.MaxInt64, // 初始化为最大值
|
|
}
|
|
|
|
// 初始化延迟分布桶
|
|
instance.latencyBuckets.Store("<10ms", new(int64))
|
|
instance.latencyBuckets.Store("10-50ms", new(int64))
|
|
instance.latencyBuckets.Store("50-200ms", new(int64))
|
|
instance.latencyBuckets.Store("200-1000ms", new(int64))
|
|
instance.latencyBuckets.Store(">1s", new(int64))
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// GetCollector 获取收集器实例
|
|
func GetCollector() *Collector {
|
|
return instance
|
|
}
|
|
|
|
// BeginRequest 开始请求
|
|
func (c *Collector) BeginRequest() {
|
|
atomic.AddInt64(&c.activeRequests, 1)
|
|
}
|
|
|
|
// EndRequest 结束请求
|
|
func (c *Collector) EndRequest() {
|
|
atomic.AddInt64(&c.activeRequests, -1)
|
|
}
|
|
|
|
// RecordRequest 记录请求
|
|
func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) {
|
|
// 更新总体统计
|
|
atomic.AddInt64(&c.totalRequests, 1)
|
|
atomic.AddInt64(&c.totalBytes, bytes)
|
|
atomic.AddInt64(&c.latencySum, int64(latency))
|
|
|
|
// 更新时间段统计
|
|
atomic.AddInt64(&c.intervalRequests, 1)
|
|
atomic.AddInt64(&c.intervalBytes, bytes)
|
|
atomic.AddInt64(&c.intervalLatencySum, int64(latency))
|
|
if status >= 400 {
|
|
atomic.AddInt64(&c.intervalErrors, 1)
|
|
}
|
|
|
|
// 更新带宽统计
|
|
atomic.AddInt64(&c.minuteBytes, bytes)
|
|
now := time.Now()
|
|
if now.Sub(c.lastMinute) >= time.Minute {
|
|
currentMinute := now.Format("15:04")
|
|
c.bandwidthStats.Store(currentMinute, atomic.SwapInt64(&c.minuteBytes, 0))
|
|
c.lastMinute = now
|
|
}
|
|
|
|
// 更新时间段状态码统计
|
|
statusKey := fmt.Sprintf("%d", status)
|
|
if counter, ok := c.intervalStatusCodes.Load(statusKey); ok {
|
|
atomic.AddInt64(counter.(*int64), 1)
|
|
} else {
|
|
var count int64 = 1
|
|
c.intervalStatusCodes.Store(statusKey, &count)
|
|
}
|
|
|
|
// 更新最小和最大响应时间
|
|
latencyNanos := int64(latency)
|
|
for {
|
|
oldMin := atomic.LoadInt64(&c.minLatency)
|
|
if oldMin <= latencyNanos {
|
|
break
|
|
}
|
|
if atomic.CompareAndSwapInt64(&c.minLatency, oldMin, latencyNanos) {
|
|
break
|
|
}
|
|
}
|
|
for {
|
|
oldMax := atomic.LoadInt64(&c.maxLatency)
|
|
if oldMax >= latencyNanos {
|
|
break
|
|
}
|
|
if atomic.CompareAndSwapInt64(&c.maxLatency, oldMax, latencyNanos) {
|
|
break
|
|
}
|
|
}
|
|
|
|
// 更新延迟分布
|
|
latencyMs := latency.Milliseconds()
|
|
var bucketKey string
|
|
switch {
|
|
case latencyMs < 10:
|
|
bucketKey = "<10ms"
|
|
case latencyMs < 50:
|
|
bucketKey = "10-50ms"
|
|
case latencyMs < 200:
|
|
bucketKey = "50-200ms"
|
|
case latencyMs < 1000:
|
|
bucketKey = "200-1000ms"
|
|
default:
|
|
bucketKey = ">1s"
|
|
}
|
|
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
|
|
atomic.AddInt64(counter.(*int64), 1)
|
|
} else {
|
|
counter := new(int64)
|
|
*counter = 1
|
|
c.latencyBuckets.Store(bucketKey, counter)
|
|
}
|
|
|
|
// 更新错误统计
|
|
if status >= 400 {
|
|
atomic.AddInt64(&c.totalErrors, 1)
|
|
if status >= 500 {
|
|
atomic.AddInt64(&c.serverErrors, 1)
|
|
} else {
|
|
atomic.AddInt64(&c.clientErrors, 1)
|
|
}
|
|
errKey := fmt.Sprintf("%d %s", status, http.StatusText(status))
|
|
if counter, ok := c.errorTypes.Load(errKey); ok {
|
|
atomic.AddInt64(counter.(*int64), 1)
|
|
} else {
|
|
var count int64 = 1
|
|
c.errorTypes.Store(errKey, &count)
|
|
}
|
|
}
|
|
|
|
// 更新状态码统计
|
|
statusKey = fmt.Sprintf("%d", status)
|
|
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
|
|
atomic.AddInt64(counter.(*int64), 1)
|
|
} else {
|
|
var count int64 = 1
|
|
c.statusCodeStats.Store(statusKey, &count)
|
|
}
|
|
|
|
// 更新路径统计
|
|
c.pathStatsMutex.Lock()
|
|
if value, ok := c.pathStats.Load(path); ok {
|
|
stat := value.(*models.PathMetrics)
|
|
atomic.AddInt64(&stat.RequestCount, 1)
|
|
if status >= 400 {
|
|
atomic.AddInt64(&stat.ErrorCount, 1)
|
|
}
|
|
atomic.AddInt64(&stat.TotalLatency, int64(latency))
|
|
atomic.AddInt64(&stat.BytesTransferred, bytes)
|
|
} else {
|
|
c.pathStats.Store(path, &models.PathMetrics{
|
|
Path: path,
|
|
RequestCount: 1,
|
|
ErrorCount: map[bool]int64{true: 1, false: 0}[status >= 400],
|
|
TotalLatency: int64(latency),
|
|
BytesTransferred: bytes,
|
|
})
|
|
}
|
|
c.pathStatsMutex.Unlock()
|
|
|
|
// 更新最近请求记录
|
|
c.recentRequestsMutex.Lock()
|
|
c.recentRequests = append([]models.RequestLog{{
|
|
Time: time.Now(),
|
|
Path: path,
|
|
Status: status,
|
|
Latency: int64(latency),
|
|
BytesSent: bytes,
|
|
ClientIP: clientIP,
|
|
}}, c.recentRequests...)
|
|
if len(c.recentRequests) > 100 { // 只保留最近100条记录
|
|
c.recentRequests = c.recentRequests[:100]
|
|
}
|
|
c.recentRequestsMutex.Unlock()
|
|
}
|
|
|
|
// FormatUptime 格式化运行时间
|
|
func FormatUptime(d time.Duration) string {
|
|
days := int(d.Hours()) / 24
|
|
hours := int(d.Hours()) % 24
|
|
minutes := int(d.Minutes()) % 60
|
|
seconds := int(d.Seconds()) % 60
|
|
|
|
if days > 0 {
|
|
return fmt.Sprintf("%d天%d时%d分%d秒", days, hours, minutes, seconds)
|
|
}
|
|
if hours > 0 {
|
|
return fmt.Sprintf("%d时%d分%d秒", hours, minutes, seconds)
|
|
}
|
|
if minutes > 0 {
|
|
return fmt.Sprintf("%d分%d秒", minutes, seconds)
|
|
}
|
|
return fmt.Sprintf("%d秒", seconds)
|
|
}
|
|
|
|
// GetStats 获取统计数据
|
|
func (c *Collector) GetStats() map[string]interface{} {
|
|
var mem runtime.MemStats
|
|
runtime.ReadMemStats(&mem)
|
|
|
|
now := time.Now()
|
|
interval := now.Sub(c.lastStatsTime)
|
|
c.lastStatsTime = now
|
|
|
|
// 获取并重置时间段统计
|
|
intervalRequests := atomic.SwapInt64(&c.intervalRequests, 0)
|
|
intervalBytes := atomic.SwapInt64(&c.intervalBytes, 0)
|
|
intervalLatencySum := atomic.SwapInt64(&c.intervalLatencySum, 0)
|
|
intervalErrors := atomic.SwapInt64(&c.intervalErrors, 0)
|
|
|
|
// 计算时间段平均延迟
|
|
avgLatency := float64(0)
|
|
if intervalRequests > 0 {
|
|
avgLatency = float64(intervalLatencySum) / float64(intervalRequests)
|
|
}
|
|
|
|
// 收集并重置时间段状态码统计
|
|
intervalStatusStats := make(map[string]int64)
|
|
c.intervalStatusCodes.Range(func(key, value interface{}) bool {
|
|
intervalStatusStats[key.(string)] = atomic.SwapInt64(value.(*int64), 0)
|
|
return true
|
|
})
|
|
|
|
// 收集状态码统计
|
|
statusCodeStats := make(map[string]int64)
|
|
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
|
statusCodeStats[key.(string)] = atomic.LoadInt64(value.(*int64))
|
|
return true
|
|
})
|
|
|
|
// 收集路径统计
|
|
var pathMetrics []models.PathMetrics
|
|
c.pathStats.Range(func(key, value interface{}) bool {
|
|
stats := value.(*models.PathMetrics)
|
|
if stats.RequestCount > 0 {
|
|
avgLatencyMs := float64(stats.TotalLatency) / float64(stats.RequestCount) / float64(time.Millisecond)
|
|
stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs)
|
|
}
|
|
pathMetrics = append(pathMetrics, *stats)
|
|
return true
|
|
})
|
|
|
|
// 按请求数降序排序
|
|
sort.Slice(pathMetrics, func(i, j int) bool {
|
|
return pathMetrics[i].RequestCount > pathMetrics[j].RequestCount
|
|
})
|
|
|
|
// 只保留前10个
|
|
if len(pathMetrics) > 10 {
|
|
pathMetrics = pathMetrics[:10]
|
|
}
|
|
|
|
// 收集延迟分布
|
|
latencyDistribution := make(map[string]int64)
|
|
c.latencyBuckets.Range(func(key, value interface{}) bool {
|
|
if counter, ok := value.(*int64); ok {
|
|
latencyDistribution[key.(string)] = atomic.LoadInt64(counter)
|
|
} else {
|
|
latencyDistribution[key.(string)] = value.(int64)
|
|
}
|
|
return true
|
|
})
|
|
|
|
// 收集错误类型统计
|
|
errorTypeStats := make(map[string]int64)
|
|
c.errorTypes.Range(func(key, value interface{}) bool {
|
|
errorTypeStats[key.(string)] = atomic.LoadInt64(value.(*int64))
|
|
return true
|
|
})
|
|
|
|
// 收集最近5分钟的带宽统计
|
|
bandwidthHistory := make(map[string]string)
|
|
var times []string
|
|
c.bandwidthStats.Range(func(key, value interface{}) bool {
|
|
times = append(times, key.(string))
|
|
return true
|
|
})
|
|
sort.Strings(times)
|
|
if len(times) > 5 {
|
|
times = times[len(times)-5:]
|
|
}
|
|
for _, t := range times {
|
|
if bytes, ok := c.bandwidthStats.Load(t); ok {
|
|
bandwidthHistory[t] = utils.FormatBytes(atomic.LoadInt64(bytes.(*int64))) + "/min"
|
|
}
|
|
}
|
|
|
|
// 获取最小和最大响应时间
|
|
minLatency := atomic.LoadInt64(&c.minLatency)
|
|
maxLatency := atomic.LoadInt64(&c.maxLatency)
|
|
if minLatency == math.MaxInt64 {
|
|
minLatency = 0
|
|
}
|
|
|
|
// 获取最近请求记录(加锁)
|
|
c.recentRequestsMutex.RLock()
|
|
recentRequests := make([]models.RequestLog, len(c.recentRequests))
|
|
copy(recentRequests, c.recentRequests)
|
|
c.recentRequestsMutex.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"uptime": FormatUptime(time.Since(c.startTime)),
|
|
"active_requests": atomic.LoadInt64(&c.activeRequests),
|
|
"total_requests": atomic.LoadInt64(&c.totalRequests),
|
|
"total_errors": atomic.LoadInt64(&c.totalErrors),
|
|
"interval_errors": intervalErrors,
|
|
"total_bytes": atomic.LoadInt64(&c.totalBytes),
|
|
"num_goroutine": runtime.NumGoroutine(),
|
|
"memory_usage": utils.FormatBytes(int64(mem.Alloc)),
|
|
"avg_response_time": fmt.Sprintf("%.2fms", avgLatency/float64(time.Millisecond)),
|
|
"requests_per_second": float64(intervalRequests) / interval.Seconds(),
|
|
"bytes_per_second": float64(intervalBytes) / interval.Seconds(),
|
|
"status_code_stats": intervalStatusStats,
|
|
"top_paths": pathMetrics,
|
|
"recent_requests": recentRequests,
|
|
"latency_stats": map[string]interface{}{
|
|
"min": fmt.Sprintf("%.2fms", float64(minLatency)/float64(time.Millisecond)),
|
|
"max": fmt.Sprintf("%.2fms", float64(maxLatency)/float64(time.Millisecond)),
|
|
"distribution": latencyDistribution,
|
|
},
|
|
"error_stats": map[string]interface{}{
|
|
"client_errors": atomic.LoadInt64(&c.clientErrors),
|
|
"server_errors": atomic.LoadInt64(&c.serverErrors),
|
|
"types": errorTypeStats,
|
|
},
|
|
"bandwidth_history": bandwidthHistory,
|
|
"current_bandwidth": utils.FormatBytes(atomic.LoadInt64(&c.minuteBytes)) + "/min",
|
|
}
|
|
}
|
|
|
|
func (c *Collector) SaveMetrics(stats map[string]interface{}) error {
|
|
lastSaveTime = time.Now()
|
|
return nil
|
|
}
|
|
|
|
// LoadRecentStats 简化为只进行数据验证
|
|
func (c *Collector) LoadRecentStats() error {
|
|
start := time.Now()
|
|
log.Printf("[Metrics] Loading stats...")
|
|
|
|
if err := c.validateLoadedData(); err != nil {
|
|
return fmt.Errorf("data validation failed: %v", err)
|
|
}
|
|
|
|
log.Printf("[Metrics] Loaded stats in %v", time.Since(start))
|
|
return nil
|
|
}
|
|
|
|
// validateLoadedData 验证当前数据的有效性
|
|
func (c *Collector) validateLoadedData() error {
|
|
// 验证基础指标
|
|
if c.totalRequests < 0 ||
|
|
c.totalErrors < 0 ||
|
|
c.totalBytes < 0 {
|
|
return fmt.Errorf("invalid stats values")
|
|
}
|
|
|
|
// 验证错误数不能大于总请求数
|
|
if c.totalErrors > c.totalRequests {
|
|
return fmt.Errorf("total errors exceeds total requests")
|
|
}
|
|
|
|
// 验证状态码统计
|
|
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
|
return value.(int64) >= 0
|
|
})
|
|
|
|
// 验证路径统计
|
|
var totalPathRequests int64
|
|
c.pathStats.Range(func(_, value interface{}) bool {
|
|
stats := value.(*models.PathMetrics)
|
|
if stats.RequestCount < 0 || stats.ErrorCount < 0 {
|
|
return false
|
|
}
|
|
totalPathRequests += stats.RequestCount
|
|
return true
|
|
})
|
|
|
|
// 验证总数一致性
|
|
if totalPathRequests > c.totalRequests {
|
|
return fmt.Errorf("path stats total exceeds total requests")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetLastSaveTime 实现 interfaces.MetricsCollector 接口
|
|
var lastSaveTime time.Time
|
|
|
|
func (c *Collector) GetLastSaveTime() time.Time {
|
|
return lastSaveTime
|
|
}
|
|
|
|
// CheckDataConsistency 实现 interfaces.MetricsCollector 接口
|
|
func (c *Collector) CheckDataConsistency() error {
|
|
// 简单的数据验证
|
|
if c.totalErrors > c.totalRequests {
|
|
return fmt.Errorf("total errors exceeds total requests")
|
|
}
|
|
return nil
|
|
}
|