diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index b5c2cf1..4f6f953 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -1,17 +1,12 @@ package metrics import ( - "encoding/json" "fmt" "log" "net/http" - "os" - "path" - "proxy-go/internal/cache" "proxy-go/internal/config" - "proxy-go/internal/constants" "proxy-go/internal/models" - "proxy-go/internal/monitor" + "proxy-go/internal/utils" "runtime" "sort" "sync" @@ -19,219 +14,125 @@ import ( "time" ) +// Collector 指标收集器 type Collector struct { - startTime time.Time - activeRequests int64 - totalRequests int64 - totalErrors int64 - totalBytes atomic.Int64 - latencySum atomic.Int64 - pathStats sync.Map - refererStats sync.Map - statusStats [6]atomic.Int64 - latencyBuckets [10]atomic.Int64 - recentRequests struct { - sync.RWMutex - items [1000]*models.RequestLog - cursor atomic.Int64 - } - cache *cache.Cache - monitor *monitor.Monitor - statsPool sync.Pool + startTime time.Time + activeRequests int64 + totalRequests int64 + totalErrors int64 + totalBytes int64 + latencySum int64 + pathStats sync.Map + statusCodeStats sync.Map + recentRequests *models.RequestQueue + config *config.Config } -var globalCollector *Collector - -func InitCollector(config *config.Config) error { - globalCollector = &Collector{ - startTime: time.Now(), - pathStats: sync.Map{}, - statusStats: [6]atomic.Int64{}, - latencyBuckets: [10]atomic.Int64{}, - } - - // 初始化 cache - globalCollector.cache = cache.NewCache(constants.CacheTTL) - - // 初始化监控器 - globalCollector.monitor = monitor.NewMonitor(globalCollector) - - // 初始化对象池 - globalCollector.statsPool = sync.Pool{ - New: func() interface{} { - return make(map[string]interface{}, 20) - }, - } - - // 设置最后保存时间 - lastSaveTime = time.Now() +var ( + instance *Collector + once sync.Once +) +// InitCollector 初始化收集器 +func InitCollector(cfg *config.Config) error { + once.Do(func() { + instance = &Collector{ + startTime: time.Now(), + recentRequests: models.NewRequestQueue(1000), + config: cfg, + } + }) return nil } +// GetCollector 获取收集器实例 func GetCollector() *Collector { - return globalCollector + return instance } +// BeginRequest 开始请求 func (c *Collector) BeginRequest() { atomic.AddInt64(&c.activeRequests, 1) } +// EndRequest 结束请求 func (c *Collector) EndRequest() { atomic.AddInt64(&c.activeRequests, -1) } +// RecordRequest 记录请求 func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) { - // 更新总请求数 atomic.AddInt64(&c.totalRequests, 1) + atomic.AddInt64(&c.totalBytes, bytes) + atomic.AddInt64(&c.latencySum, int64(latency)) - // 更新总字节数 - c.totalBytes.Add(bytes) - - // 更新状态码统计 - if status >= 100 && status < 600 { - c.statusStats[status/100-1].Add(1) - } - - // 更新错误数 if status >= 400 { atomic.AddInt64(&c.totalErrors, 1) } - // 更新延迟分布 - bucket := int(latency.Milliseconds() / 100) - if bucket < 10 { - c.latencyBuckets[bucket].Add(1) + // 更新状态码统计 + statusKey := fmt.Sprintf("%d", status) + if value, ok := c.statusCodeStats.Load(statusKey); ok { + atomic.AddInt64(value.(*int64), 1) + } else { + var count int64 = 1 + c.statusCodeStats.Store(statusKey, &count) } // 更新路径统计 - if stats, ok := c.pathStats.Load(path); ok { - pathStats := stats.(*models.PathStats) - pathStats.Requests.Add(1) + if pathStats, ok := c.pathStats.Load(path); ok { + stats := pathStats.(models.PathMetrics) + atomic.AddInt64(&stats.RequestCount, 1) if status >= 400 { - pathStats.Errors.Add(1) + atomic.AddInt64(&stats.ErrorCount, 1) } - pathStats.Bytes.Add(bytes) - pathStats.LatencySum.Add(int64(latency)) + atomic.AddInt64(&stats.TotalLatency, int64(latency)) + atomic.AddInt64(&stats.BytesTransferred, bytes) } else { - newStats := &models.PathStats{} - newStats.Requests.Add(1) - if status >= 400 { - newStats.Errors.Add(1) + stats := models.PathMetrics{ + Path: path, + RequestCount: 1, + ErrorCount: int64(map[bool]int{true: 1, false: 0}[status >= 400]), + TotalLatency: int64(latency), + BytesTransferred: bytes, } - newStats.Bytes.Add(bytes) - newStats.LatencySum.Add(int64(latency)) - c.pathStats.Store(path, newStats) + c.pathStats.Store(path, stats) } - // 更新引用来源统计 - if referer := r.Header.Get("Referer"); referer != "" { - if stats, ok := c.refererStats.Load(referer); ok { - stats.(*models.PathStats).Requests.Add(1) - } else { - newStats := &models.PathStats{} - newStats.Requests.Add(1) - c.refererStats.Store(referer, newStats) - } - } - - // 记录最近的请求 - log := &models.RequestLog{ + // 记录最近请求 + c.recentRequests.Push(models.RequestLog{ Time: time.Now(), Path: path, Status: status, - Latency: latency, + Latency: int64(latency), BytesSent: bytes, ClientIP: clientIP, - } - - c.recentRequests.Lock() - cursor := c.recentRequests.cursor.Add(1) % 1000 - c.recentRequests.items[cursor] = log - c.recentRequests.Unlock() - - c.latencySum.Add(int64(latency)) - - // 更新错误统计 - if status >= 400 { - c.monitor.RecordError() - } - c.monitor.RecordRequest() - - // 检查延迟 - c.monitor.CheckLatency(latency, bytes) + }) } +// GetStats 获取统计数据 func (c *Collector) GetStats() map[string]interface{} { - // 先查缓存 - if stats, ok := c.cache.Get("stats"); ok { - if statsMap, ok := stats.(map[string]interface{}); ok { - return statsMap - } + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + + // 计算平均延迟 + avgLatency := float64(0) + if c.totalRequests > 0 { + avgLatency = float64(c.latencySum) / float64(c.totalRequests) } - stats := c.statsPool.Get().(map[string]interface{}) - defer c.statsPool.Put(stats) + // 收集状态码统计 + statusCodeStats := make(map[string]int64) + c.statusCodeStats.Range(func(key, value interface{}) bool { + statusCodeStats[key.(string)] = atomic.LoadInt64(value.(*int64)) + return true + }) - var m runtime.MemStats - runtime.ReadMemStats(&m) - - uptime := time.Since(c.startTime) - currentRequests := atomic.LoadInt64(&c.totalRequests) - currentErrors := atomic.LoadInt64(&c.totalErrors) - currentBytes := c.totalBytes.Load() - - // 计算错误率 - var errorRate float64 - if currentRequests > 0 { - errorRate = float64(currentErrors) / float64(currentRequests) - } - - // 基础指标 - stats["uptime"] = uptime.String() - stats["active_requests"] = atomic.LoadInt64(&c.activeRequests) - stats["total_requests"] = currentRequests - stats["total_errors"] = currentErrors - stats["error_rate"] = errorRate - stats["total_bytes"] = currentBytes - stats["bytes_per_second"] = float64(currentBytes) / Max(uptime.Seconds(), 1) - stats["requests_per_second"] = float64(currentRequests) / Max(uptime.Seconds(), 1) - - // 系统指标 - stats["num_goroutine"] = runtime.NumGoroutine() - stats["memory_usage"] = FormatBytes(m.Alloc) - - // 延迟指标 - latencySum := c.latencySum.Load() - if currentRequests > 0 { - stats["avg_response_time"] = FormatDuration(time.Duration(latencySum / currentRequests)) - } else { - stats["avg_response_time"] = FormatDuration(0) - } - - // 状态码统计 - statusStats := make(map[string]int64) - for i := range c.statusStats { - statusStats[fmt.Sprintf("%dxx", i+1)] = c.statusStats[i].Load() - } - stats["status_code_stats"] = statusStats - - // 延迟百分位数 - stats["latency_percentiles"] = make([]float64, 0) - - // 路径统计 + // 收集路径统计 var pathMetrics []models.PathMetrics c.pathStats.Range(func(key, value interface{}) bool { - stats := value.(*models.PathStats) - if stats.Requests.Load() > 0 { - pathMetrics = append(pathMetrics, models.PathMetrics{ - Path: key.(string), - RequestCount: stats.Requests.Load(), - ErrorCount: stats.Errors.Load(), - AvgLatency: formatAvgLatency(stats.LatencySum.Load(), stats.Requests.Load()), - BytesTransferred: stats.Bytes.Load(), - }) - } + stats := value.(models.PathMetrics) + pathMetrics = append(pathMetrics, stats) return true }) @@ -240,95 +141,32 @@ func (c *Collector) GetStats() map[string]interface{} { return pathMetrics[i].RequestCount > pathMetrics[j].RequestCount }) + // 只保留前10个 if len(pathMetrics) > 10 { - stats["top_paths"] = pathMetrics[:10] - } else { - stats["top_paths"] = pathMetrics + pathMetrics = pathMetrics[:10] } - // 最近请求 - stats["recent_requests"] = c.getRecentRequests() - - // 引用来源统计 - var refererMetrics []models.PathMetrics - c.refererStats.Range(func(key, value interface{}) bool { - stats := value.(*models.PathStats) - if stats.Requests.Load() > 0 { - refererMetrics = append(refererMetrics, models.PathMetrics{ - Path: key.(string), - RequestCount: stats.Requests.Load(), - }) - } - return true - }) - - // 按请求数排序 - sort.Slice(refererMetrics, func(i, j int) bool { - return refererMetrics[i].RequestCount > refererMetrics[j].RequestCount - }) - - if len(refererMetrics) > 10 { - stats["top_referers"] = refererMetrics[:10] - } else { - stats["top_referers"] = refererMetrics - } - - // 检查告警 - c.monitor.CheckMetrics(stats) - - // 写入缓存 - c.cache.Set("stats", stats) - - return stats -} - -func (c *Collector) getRecentRequests() []models.RequestLog { - var recentReqs []models.RequestLog - c.recentRequests.RLock() - defer c.recentRequests.RUnlock() - - cursor := c.recentRequests.cursor.Load() - for i := 0; i < 10; i++ { - idx := (cursor - int64(i) + 1000) % 1000 - if c.recentRequests.items[idx] != nil { - recentReqs = append(recentReqs, *c.recentRequests.items[idx]) + // 计算每个路径的平均延迟 + for i := range pathMetrics { + if pathMetrics[i].RequestCount > 0 { + avgLatencyMs := float64(pathMetrics[i].TotalLatency) / float64(pathMetrics[i].RequestCount) / float64(time.Millisecond) + pathMetrics[i].AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs) } } - return recentReqs -} -// 辅助函数 -func FormatDuration(d time.Duration) string { - if d < time.Millisecond { - return fmt.Sprintf("%.2f μs", float64(d.Microseconds())) + return map[string]interface{}{ + "uptime": time.Since(c.startTime).String(), + "active_requests": atomic.LoadInt64(&c.activeRequests), + "total_requests": atomic.LoadInt64(&c.totalRequests), + "total_errors": atomic.LoadInt64(&c.totalErrors), + "total_bytes": atomic.LoadInt64(&c.totalBytes), + "num_goroutine": runtime.NumGoroutine(), + "memory_usage": utils.FormatBytes(int64(mem.Alloc)), + "avg_response_time": fmt.Sprintf("%.2fms", avgLatency/float64(time.Millisecond)), + "status_code_stats": statusCodeStats, + "top_paths": pathMetrics, + "recent_requests": c.recentRequests.GetAll(), } - if d < time.Second { - return fmt.Sprintf("%.2f ms", float64(d.Milliseconds())) - } - return fmt.Sprintf("%.2f s", d.Seconds()) -} - -func FormatBytes(bytes uint64) string { - const ( - MB = 1024 * 1024 - KB = 1024 - ) - - switch { - case bytes >= MB: - return fmt.Sprintf("%.2f MB", float64(bytes)/MB) - case bytes >= KB: - return fmt.Sprintf("%.2f KB", float64(bytes)/KB) - default: - return fmt.Sprintf("%d Bytes", bytes) - } -} - -func Max(a, b float64) float64 { - if a > b { - return a - } - return b } func (c *Collector) SaveMetrics(stats map[string]interface{}) error { @@ -354,7 +192,7 @@ func (c *Collector) validateLoadedData() error { // 验证基础指标 if atomic.LoadInt64(&c.totalRequests) < 0 || atomic.LoadInt64(&c.totalErrors) < 0 || - c.totalBytes.Load() < 0 { + atomic.LoadInt64(&c.totalBytes) < 0 { return fmt.Errorf("invalid stats values") } @@ -364,20 +202,18 @@ func (c *Collector) validateLoadedData() error { } // 验证状态码统计 - for i := range c.statusStats { - if c.statusStats[i].Load() < 0 { - return fmt.Errorf("invalid status code count at index %d", i) - } - } + c.statusCodeStats.Range(func(key, value interface{}) bool { + return atomic.LoadInt64(value.(*int64)) >= 0 + }) // 验证路径统计 var totalPathRequests int64 c.pathStats.Range(func(_, value interface{}) bool { - stats := value.(*models.PathStats) - if stats.Requests.Load() < 0 || stats.Errors.Load() < 0 { + stats := value.(models.PathMetrics) + if stats.RequestCount < 0 || stats.ErrorCount < 0 { return false } - totalPathRequests += stats.Requests.Load() + totalPathRequests += stats.RequestCount return true }) @@ -389,69 +225,6 @@ func (c *Collector) validateLoadedData() error { return nil } -func formatAvgLatency(latencySum, requests int64) string { - if requests <= 0 || latencySum <= 0 { - return "0 ms" - } - return FormatDuration(time.Duration(latencySum / requests)) -} - -// SaveBackup 保存数据备份 -func (c *Collector) SaveBackup() error { - stats := c.GetStats() - backupFile := fmt.Sprintf("backup_%s.json", time.Now().Format("20060102_150405")) - - data, err := json.MarshalIndent(stats, "", " ") - if err != nil { - return err - } - - return os.WriteFile(path.Join("data/backup", backupFile), data, 0644) -} - -// LoadBackup 加载备份数据 -func (c *Collector) LoadBackup(backupFile string) error { - data, err := os.ReadFile(path.Join("data/backup", backupFile)) - if err != nil { - return err - } - - var stats map[string]interface{} - if err := json.Unmarshal(data, &stats); err != nil { - return err - } - - return c.RestoreFromBackup(stats) -} - -// RestoreFromBackup 从备份恢复数据 -func (c *Collector) RestoreFromBackup(stats map[string]interface{}) error { - // 恢复基础指标 - if totalReqs, ok := stats["total_requests"].(int64); ok { - atomic.StoreInt64(&c.totalRequests, totalReqs) - } - if totalErrs, ok := stats["total_errors"].(int64); ok { - atomic.StoreInt64(&c.totalErrors, totalErrs) - } - if totalBytes, ok := stats["total_bytes"].(int64); ok { - c.totalBytes.Store(totalBytes) - } - - // 恢复状态码统计 - if statusStats, ok := stats["status_code_stats"].(map[string]int64); ok { - for group, count := range statusStats { - if len(group) > 0 { - idx := (int(group[0]) - '0') - 1 - if idx >= 0 && idx < len(c.statusStats) { - c.statusStats[idx].Store(count) - } - } - } - } - - return nil -} - // GetLastSaveTime 实现 interfaces.MetricsCollector 接口 var lastSaveTime time.Time diff --git a/internal/models/metrics.go b/internal/models/metrics.go index 7a0a3eb..c58dffe 100644 --- a/internal/models/metrics.go +++ b/internal/models/metrics.go @@ -2,18 +2,8 @@ package models import ( "sync/atomic" - "time" ) -type RequestLog struct { - Time time.Time - Path string - Status int - Latency time.Duration - BytesSent int64 - ClientIP string -} - type PathStats struct { Requests atomic.Int64 Errors atomic.Int64 @@ -29,11 +19,3 @@ type HistoricalMetrics struct { ErrorRate float64 `json:"error_rate"` AvgLatency float64 `json:"avg_latency"` } - -type PathMetrics struct { - Path string `json:"path"` - RequestCount int64 `json:"request_count"` - ErrorCount int64 `json:"error_count"` - AvgLatency string `json:"avg_latency"` - BytesTransferred int64 `json:"bytes_transferred"` -} diff --git a/internal/models/request.go b/internal/models/request.go new file mode 100644 index 0000000..ccf95c2 --- /dev/null +++ b/internal/models/request.go @@ -0,0 +1,64 @@ +package models + +import ( + "sync" + "time" +) + +// RequestLog 请求日志 +type RequestLog struct { + Time time.Time `json:"time"` + Path string `json:"path"` + Status int `json:"status"` + Latency int64 `json:"latency"` + BytesSent int64 `json:"bytes_sent"` + ClientIP string `json:"client_ip"` +} + +// PathMetrics 路径指标 +type PathMetrics struct { + Path string `json:"path"` + RequestCount int64 `json:"request_count"` + ErrorCount int64 `json:"error_count"` + TotalLatency int64 `json:"-"` + AvgLatency string `json:"avg_latency"` + BytesTransferred int64 `json:"bytes_transferred"` +} + +// RequestQueue 请求队列 +type RequestQueue struct { + sync.RWMutex + items []RequestLog + size int + cursor int +} + +// NewRequestQueue 创建新的请求队列 +func NewRequestQueue(size int) *RequestQueue { + return &RequestQueue{ + items: make([]RequestLog, size), + size: size, + } +} + +// Push 添加请求日志 +func (q *RequestQueue) Push(log RequestLog) { + q.Lock() + defer q.Unlock() + q.items[q.cursor] = log + q.cursor = (q.cursor + 1) % q.size +} + +// GetAll 获取所有请求日志 +func (q *RequestQueue) GetAll() []RequestLog { + q.RLock() + defer q.RUnlock() + result := make([]RequestLog, 0, q.size) + for i := 0; i < q.size; i++ { + idx := (q.cursor - i - 1 + q.size) % q.size + if !q.items[idx].Time.IsZero() { + result = append(result, q.items[idx]) + } + } + return result +}