refactor(metrics): Simplify metrics collection and improve performance

- Restructure Collector to use more efficient atomic operations
- Remove complex caching and monitoring dependencies
- Simplify path and status code tracking with sync.Map
- Optimize request logging with a dedicated request queue
- Remove unnecessary utility functions and pools
- Streamline stats generation and reduce complexity
This commit is contained in:
wood chen 2025-02-15 11:51:01 +08:00
parent 33d6a51416
commit f758db3531
3 changed files with 163 additions and 344 deletions

View File

@ -1,17 +1,12 @@
package metrics package metrics
import ( import (
"encoding/json"
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"os"
"path"
"proxy-go/internal/cache"
"proxy-go/internal/config" "proxy-go/internal/config"
"proxy-go/internal/constants"
"proxy-go/internal/models" "proxy-go/internal/models"
"proxy-go/internal/monitor" "proxy-go/internal/utils"
"runtime" "runtime"
"sort" "sort"
"sync" "sync"
@ -19,219 +14,125 @@ import (
"time" "time"
) )
// Collector 指标收集器
type Collector struct { type Collector struct {
startTime time.Time startTime time.Time
activeRequests int64 activeRequests int64
totalRequests int64 totalRequests int64
totalErrors int64 totalErrors int64
totalBytes atomic.Int64 totalBytes int64
latencySum atomic.Int64 latencySum int64
pathStats sync.Map pathStats sync.Map
refererStats sync.Map statusCodeStats sync.Map
statusStats [6]atomic.Int64 recentRequests *models.RequestQueue
latencyBuckets [10]atomic.Int64 config *config.Config
recentRequests struct {
sync.RWMutex
items [1000]*models.RequestLog
cursor atomic.Int64
}
cache *cache.Cache
monitor *monitor.Monitor
statsPool sync.Pool
} }
var globalCollector *Collector var (
instance *Collector
once sync.Once
)
func InitCollector(config *config.Config) error { // InitCollector 初始化收集器
globalCollector = &Collector{ func InitCollector(cfg *config.Config) error {
once.Do(func() {
instance = &Collector{
startTime: time.Now(), startTime: time.Now(),
pathStats: sync.Map{}, recentRequests: models.NewRequestQueue(1000),
statusStats: [6]atomic.Int64{}, config: cfg,
latencyBuckets: [10]atomic.Int64{},
} }
})
// 初始化 cache
globalCollector.cache = cache.NewCache(constants.CacheTTL)
// 初始化监控器
globalCollector.monitor = monitor.NewMonitor(globalCollector)
// 初始化对象池
globalCollector.statsPool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{}, 20)
},
}
// 设置最后保存时间
lastSaveTime = time.Now()
return nil return nil
} }
// GetCollector 获取收集器实例
func GetCollector() *Collector { func GetCollector() *Collector {
return globalCollector return instance
} }
// BeginRequest 开始请求
func (c *Collector) BeginRequest() { func (c *Collector) BeginRequest() {
atomic.AddInt64(&c.activeRequests, 1) atomic.AddInt64(&c.activeRequests, 1)
} }
// EndRequest 结束请求
func (c *Collector) EndRequest() { func (c *Collector) EndRequest() {
atomic.AddInt64(&c.activeRequests, -1) atomic.AddInt64(&c.activeRequests, -1)
} }
// RecordRequest 记录请求
func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) { func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) {
// 更新总请求数
atomic.AddInt64(&c.totalRequests, 1) atomic.AddInt64(&c.totalRequests, 1)
atomic.AddInt64(&c.totalBytes, bytes)
atomic.AddInt64(&c.latencySum, int64(latency))
// 更新总字节数
c.totalBytes.Add(bytes)
// 更新状态码统计
if status >= 100 && status < 600 {
c.statusStats[status/100-1].Add(1)
}
// 更新错误数
if status >= 400 { if status >= 400 {
atomic.AddInt64(&c.totalErrors, 1) atomic.AddInt64(&c.totalErrors, 1)
} }
// 更新延迟分布 // 更新状态码统计
bucket := int(latency.Milliseconds() / 100) statusKey := fmt.Sprintf("%d", status)
if bucket < 10 { if value, ok := c.statusCodeStats.Load(statusKey); ok {
c.latencyBuckets[bucket].Add(1) atomic.AddInt64(value.(*int64), 1)
} else {
var count int64 = 1
c.statusCodeStats.Store(statusKey, &count)
} }
// 更新路径统计 // 更新路径统计
if stats, ok := c.pathStats.Load(path); ok { if pathStats, ok := c.pathStats.Load(path); ok {
pathStats := stats.(*models.PathStats) stats := pathStats.(models.PathMetrics)
pathStats.Requests.Add(1) atomic.AddInt64(&stats.RequestCount, 1)
if status >= 400 { if status >= 400 {
pathStats.Errors.Add(1) atomic.AddInt64(&stats.ErrorCount, 1)
} }
pathStats.Bytes.Add(bytes) atomic.AddInt64(&stats.TotalLatency, int64(latency))
pathStats.LatencySum.Add(int64(latency)) atomic.AddInt64(&stats.BytesTransferred, bytes)
} else { } else {
newStats := &models.PathStats{} stats := models.PathMetrics{
newStats.Requests.Add(1) Path: path,
if status >= 400 { RequestCount: 1,
newStats.Errors.Add(1) ErrorCount: int64(map[bool]int{true: 1, false: 0}[status >= 400]),
TotalLatency: int64(latency),
BytesTransferred: bytes,
} }
newStats.Bytes.Add(bytes) c.pathStats.Store(path, stats)
newStats.LatencySum.Add(int64(latency))
c.pathStats.Store(path, newStats)
} }
// 更新引用来源统计 // 记录最近请求
if referer := r.Header.Get("Referer"); referer != "" { c.recentRequests.Push(models.RequestLog{
if stats, ok := c.refererStats.Load(referer); ok {
stats.(*models.PathStats).Requests.Add(1)
} else {
newStats := &models.PathStats{}
newStats.Requests.Add(1)
c.refererStats.Store(referer, newStats)
}
}
// 记录最近的请求
log := &models.RequestLog{
Time: time.Now(), Time: time.Now(),
Path: path, Path: path,
Status: status, Status: status,
Latency: latency, Latency: int64(latency),
BytesSent: bytes, BytesSent: bytes,
ClientIP: clientIP, ClientIP: clientIP,
}
c.recentRequests.Lock()
cursor := c.recentRequests.cursor.Add(1) % 1000
c.recentRequests.items[cursor] = log
c.recentRequests.Unlock()
c.latencySum.Add(int64(latency))
// 更新错误统计
if status >= 400 {
c.monitor.RecordError()
}
c.monitor.RecordRequest()
// 检查延迟
c.monitor.CheckLatency(latency, bytes)
}
func (c *Collector) GetStats() map[string]interface{} {
// 先查缓存
if stats, ok := c.cache.Get("stats"); ok {
if statsMap, ok := stats.(map[string]interface{}); ok {
return statsMap
}
}
stats := c.statsPool.Get().(map[string]interface{})
defer c.statsPool.Put(stats)
var m runtime.MemStats
runtime.ReadMemStats(&m)
uptime := time.Since(c.startTime)
currentRequests := atomic.LoadInt64(&c.totalRequests)
currentErrors := atomic.LoadInt64(&c.totalErrors)
currentBytes := c.totalBytes.Load()
// 计算错误率
var errorRate float64
if currentRequests > 0 {
errorRate = float64(currentErrors) / float64(currentRequests)
}
// 基础指标
stats["uptime"] = uptime.String()
stats["active_requests"] = atomic.LoadInt64(&c.activeRequests)
stats["total_requests"] = currentRequests
stats["total_errors"] = currentErrors
stats["error_rate"] = errorRate
stats["total_bytes"] = currentBytes
stats["bytes_per_second"] = float64(currentBytes) / Max(uptime.Seconds(), 1)
stats["requests_per_second"] = float64(currentRequests) / Max(uptime.Seconds(), 1)
// 系统指标
stats["num_goroutine"] = runtime.NumGoroutine()
stats["memory_usage"] = FormatBytes(m.Alloc)
// 延迟指标
latencySum := c.latencySum.Load()
if currentRequests > 0 {
stats["avg_response_time"] = FormatDuration(time.Duration(latencySum / currentRequests))
} else {
stats["avg_response_time"] = FormatDuration(0)
}
// 状态码统计
statusStats := make(map[string]int64)
for i := range c.statusStats {
statusStats[fmt.Sprintf("%dxx", i+1)] = c.statusStats[i].Load()
}
stats["status_code_stats"] = statusStats
// 延迟百分位数
stats["latency_percentiles"] = make([]float64, 0)
// 路径统计
var pathMetrics []models.PathMetrics
c.pathStats.Range(func(key, value interface{}) bool {
stats := value.(*models.PathStats)
if stats.Requests.Load() > 0 {
pathMetrics = append(pathMetrics, models.PathMetrics{
Path: key.(string),
RequestCount: stats.Requests.Load(),
ErrorCount: stats.Errors.Load(),
AvgLatency: formatAvgLatency(stats.LatencySum.Load(), stats.Requests.Load()),
BytesTransferred: stats.Bytes.Load(),
}) })
} }
// GetStats 获取统计数据
func (c *Collector) GetStats() map[string]interface{} {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
// 计算平均延迟
avgLatency := float64(0)
if c.totalRequests > 0 {
avgLatency = float64(c.latencySum) / float64(c.totalRequests)
}
// 收集状态码统计
statusCodeStats := make(map[string]int64)
c.statusCodeStats.Range(func(key, value interface{}) bool {
statusCodeStats[key.(string)] = atomic.LoadInt64(value.(*int64))
return true
})
// 收集路径统计
var pathMetrics []models.PathMetrics
c.pathStats.Range(func(key, value interface{}) bool {
stats := value.(models.PathMetrics)
pathMetrics = append(pathMetrics, stats)
return true return true
}) })
@ -240,95 +141,32 @@ func (c *Collector) GetStats() map[string]interface{} {
return pathMetrics[i].RequestCount > pathMetrics[j].RequestCount return pathMetrics[i].RequestCount > pathMetrics[j].RequestCount
}) })
// 只保留前10个
if len(pathMetrics) > 10 { if len(pathMetrics) > 10 {
stats["top_paths"] = pathMetrics[:10] pathMetrics = pathMetrics[:10]
} else {
stats["top_paths"] = pathMetrics
} }
// 最近请求 // 计算每个路径的平均延迟
stats["recent_requests"] = c.getRecentRequests() for i := range pathMetrics {
if pathMetrics[i].RequestCount > 0 {
// 引用来源统计 avgLatencyMs := float64(pathMetrics[i].TotalLatency) / float64(pathMetrics[i].RequestCount) / float64(time.Millisecond)
var refererMetrics []models.PathMetrics pathMetrics[i].AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs)
c.refererStats.Range(func(key, value interface{}) bool {
stats := value.(*models.PathStats)
if stats.Requests.Load() > 0 {
refererMetrics = append(refererMetrics, models.PathMetrics{
Path: key.(string),
RequestCount: stats.Requests.Load(),
})
}
return true
})
// 按请求数排序
sort.Slice(refererMetrics, func(i, j int) bool {
return refererMetrics[i].RequestCount > refererMetrics[j].RequestCount
})
if len(refererMetrics) > 10 {
stats["top_referers"] = refererMetrics[:10]
} else {
stats["top_referers"] = refererMetrics
}
// 检查告警
c.monitor.CheckMetrics(stats)
// 写入缓存
c.cache.Set("stats", stats)
return stats
}
func (c *Collector) getRecentRequests() []models.RequestLog {
var recentReqs []models.RequestLog
c.recentRequests.RLock()
defer c.recentRequests.RUnlock()
cursor := c.recentRequests.cursor.Load()
for i := 0; i < 10; i++ {
idx := (cursor - int64(i) + 1000) % 1000
if c.recentRequests.items[idx] != nil {
recentReqs = append(recentReqs, *c.recentRequests.items[idx])
}
}
return recentReqs
}
// 辅助函数
func FormatDuration(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%.2f μs", float64(d.Microseconds()))
}
if d < time.Second {
return fmt.Sprintf("%.2f ms", float64(d.Milliseconds()))
}
return fmt.Sprintf("%.2f s", d.Seconds())
}
func FormatBytes(bytes uint64) string {
const (
MB = 1024 * 1024
KB = 1024
)
switch {
case bytes >= MB:
return fmt.Sprintf("%.2f MB", float64(bytes)/MB)
case bytes >= KB:
return fmt.Sprintf("%.2f KB", float64(bytes)/KB)
default:
return fmt.Sprintf("%d Bytes", bytes)
} }
} }
func Max(a, b float64) float64 { return map[string]interface{}{
if a > b { "uptime": time.Since(c.startTime).String(),
return a "active_requests": atomic.LoadInt64(&c.activeRequests),
"total_requests": atomic.LoadInt64(&c.totalRequests),
"total_errors": atomic.LoadInt64(&c.totalErrors),
"total_bytes": atomic.LoadInt64(&c.totalBytes),
"num_goroutine": runtime.NumGoroutine(),
"memory_usage": utils.FormatBytes(int64(mem.Alloc)),
"avg_response_time": fmt.Sprintf("%.2fms", avgLatency/float64(time.Millisecond)),
"status_code_stats": statusCodeStats,
"top_paths": pathMetrics,
"recent_requests": c.recentRequests.GetAll(),
} }
return b
} }
func (c *Collector) SaveMetrics(stats map[string]interface{}) error { func (c *Collector) SaveMetrics(stats map[string]interface{}) error {
@ -354,7 +192,7 @@ func (c *Collector) validateLoadedData() error {
// 验证基础指标 // 验证基础指标
if atomic.LoadInt64(&c.totalRequests) < 0 || if atomic.LoadInt64(&c.totalRequests) < 0 ||
atomic.LoadInt64(&c.totalErrors) < 0 || atomic.LoadInt64(&c.totalErrors) < 0 ||
c.totalBytes.Load() < 0 { atomic.LoadInt64(&c.totalBytes) < 0 {
return fmt.Errorf("invalid stats values") return fmt.Errorf("invalid stats values")
} }
@ -364,20 +202,18 @@ func (c *Collector) validateLoadedData() error {
} }
// 验证状态码统计 // 验证状态码统计
for i := range c.statusStats { c.statusCodeStats.Range(func(key, value interface{}) bool {
if c.statusStats[i].Load() < 0 { return atomic.LoadInt64(value.(*int64)) >= 0
return fmt.Errorf("invalid status code count at index %d", i) })
}
}
// 验证路径统计 // 验证路径统计
var totalPathRequests int64 var totalPathRequests int64
c.pathStats.Range(func(_, value interface{}) bool { c.pathStats.Range(func(_, value interface{}) bool {
stats := value.(*models.PathStats) stats := value.(models.PathMetrics)
if stats.Requests.Load() < 0 || stats.Errors.Load() < 0 { if stats.RequestCount < 0 || stats.ErrorCount < 0 {
return false return false
} }
totalPathRequests += stats.Requests.Load() totalPathRequests += stats.RequestCount
return true return true
}) })
@ -389,69 +225,6 @@ func (c *Collector) validateLoadedData() error {
return nil return nil
} }
func formatAvgLatency(latencySum, requests int64) string {
if requests <= 0 || latencySum <= 0 {
return "0 ms"
}
return FormatDuration(time.Duration(latencySum / requests))
}
// SaveBackup 保存数据备份
func (c *Collector) SaveBackup() error {
stats := c.GetStats()
backupFile := fmt.Sprintf("backup_%s.json", time.Now().Format("20060102_150405"))
data, err := json.MarshalIndent(stats, "", " ")
if err != nil {
return err
}
return os.WriteFile(path.Join("data/backup", backupFile), data, 0644)
}
// LoadBackup 加载备份数据
func (c *Collector) LoadBackup(backupFile string) error {
data, err := os.ReadFile(path.Join("data/backup", backupFile))
if err != nil {
return err
}
var stats map[string]interface{}
if err := json.Unmarshal(data, &stats); err != nil {
return err
}
return c.RestoreFromBackup(stats)
}
// RestoreFromBackup 从备份恢复数据
func (c *Collector) RestoreFromBackup(stats map[string]interface{}) error {
// 恢复基础指标
if totalReqs, ok := stats["total_requests"].(int64); ok {
atomic.StoreInt64(&c.totalRequests, totalReqs)
}
if totalErrs, ok := stats["total_errors"].(int64); ok {
atomic.StoreInt64(&c.totalErrors, totalErrs)
}
if totalBytes, ok := stats["total_bytes"].(int64); ok {
c.totalBytes.Store(totalBytes)
}
// 恢复状态码统计
if statusStats, ok := stats["status_code_stats"].(map[string]int64); ok {
for group, count := range statusStats {
if len(group) > 0 {
idx := (int(group[0]) - '0') - 1
if idx >= 0 && idx < len(c.statusStats) {
c.statusStats[idx].Store(count)
}
}
}
}
return nil
}
// GetLastSaveTime 实现 interfaces.MetricsCollector 接口 // GetLastSaveTime 实现 interfaces.MetricsCollector 接口
var lastSaveTime time.Time var lastSaveTime time.Time

View File

@ -2,18 +2,8 @@ package models
import ( import (
"sync/atomic" "sync/atomic"
"time"
) )
type RequestLog struct {
Time time.Time
Path string
Status int
Latency time.Duration
BytesSent int64
ClientIP string
}
type PathStats struct { type PathStats struct {
Requests atomic.Int64 Requests atomic.Int64
Errors atomic.Int64 Errors atomic.Int64
@ -29,11 +19,3 @@ type HistoricalMetrics struct {
ErrorRate float64 `json:"error_rate"` ErrorRate float64 `json:"error_rate"`
AvgLatency float64 `json:"avg_latency"` AvgLatency float64 `json:"avg_latency"`
} }
type PathMetrics struct {
Path string `json:"path"`
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
AvgLatency string `json:"avg_latency"`
BytesTransferred int64 `json:"bytes_transferred"`
}

View File

@ -0,0 +1,64 @@
package models
import (
"sync"
"time"
)
// RequestLog 请求日志
type RequestLog struct {
Time time.Time `json:"time"`
Path string `json:"path"`
Status int `json:"status"`
Latency int64 `json:"latency"`
BytesSent int64 `json:"bytes_sent"`
ClientIP string `json:"client_ip"`
}
// PathMetrics 路径指标
type PathMetrics struct {
Path string `json:"path"`
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
TotalLatency int64 `json:"-"`
AvgLatency string `json:"avg_latency"`
BytesTransferred int64 `json:"bytes_transferred"`
}
// RequestQueue 请求队列
type RequestQueue struct {
sync.RWMutex
items []RequestLog
size int
cursor int
}
// NewRequestQueue 创建新的请求队列
func NewRequestQueue(size int) *RequestQueue {
return &RequestQueue{
items: make([]RequestLog, size),
size: size,
}
}
// Push 添加请求日志
func (q *RequestQueue) Push(log RequestLog) {
q.Lock()
defer q.Unlock()
q.items[q.cursor] = log
q.cursor = (q.cursor + 1) % q.size
}
// GetAll 获取所有请求日志
func (q *RequestQueue) GetAll() []RequestLog {
q.RLock()
defer q.RUnlock()
result := make([]RequestLog, 0, q.size)
for i := 0; i < q.size; i++ {
idx := (q.cursor - i - 1 + q.size) % q.size
if !q.items[idx].Time.IsZero() {
result = append(result, q.items[idx])
}
}
return result
}