From 22c0d2e301588cc41b73416b06d4e7169ca0093c Mon Sep 17 00:00:00 2001 From: wood chen Date: Sun, 9 Mar 2025 13:21:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(metrics):=20=E4=BC=98=E5=8C=96=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=92=8C=E6=B8=85?= =?UTF-8?q?=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在指标收集器中新增定期清理任务,自动清理无效的统计数据 - 修改 MetricsHandler 中的延迟分布处理,移除冗余日志输出 - 优化 MetricsStorage 的数据加载逻辑,限制加载的统计数据数量 - 新增延迟分布单独持久化存储,减少主指标文件的复杂性 - 改进数据加载和恢复的日志记录,提供更清晰的操作反馈 --- internal/handler/metrics.go | 22 +-- internal/metrics/collector.go | 92 +++++++++++- internal/metrics/persistence.go | 249 +++++++++++++++++--------------- 3 files changed, 221 insertions(+), 142 deletions(-) diff --git a/internal/handler/metrics.go b/internal/handler/metrics.go index 06ffd5c..084b53b 100644 --- a/internal/handler/metrics.go +++ b/internal/handler/metrics.go @@ -161,50 +161,33 @@ func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) { metrics.LatencyStats.Max = utils.SafeString(latencyStats["max"], "0ms") // 处理分布数据 - log.Printf("[MetricsHandler] 处理延迟分布数据: stats[latency_stats]=%v", stats["latency_stats"]) - if stats["latency_stats"] != nil { latencyStatsMap, ok := stats["latency_stats"].(map[string]interface{}) if ok { - log.Printf("[MetricsHandler] latencyStatsMap=%v", latencyStatsMap) distribution, ok := latencyStatsMap["distribution"] if ok && distribution != nil { - log.Printf("[MetricsHandler] distribution=%v", distribution) - // 尝试直接使用 map[string]int64 类型 if distributionMap, ok := distribution.(map[string]int64); ok { - log.Printf("[MetricsHandler] 直接使用 map[string]int64: %v", distributionMap) metrics.LatencyStats.Distribution = distributionMap } else if distributionMap, ok := distribution.(map[string]interface{}); ok { // 如果不是 map[string]int64,尝试转换 map[string]interface{} - log.Printf("[MetricsHandler] 转换 map[string]interface{}: %v", distributionMap) metrics.LatencyStats.Distribution = make(map[string]int64) for k, v := range distributionMap { - log.Printf("[MetricsHandler] 处理延迟分布项: %s=%v (type=%T)", k, v, v) if intValue, ok := v.(float64); ok { metrics.LatencyStats.Distribution[k] = int64(intValue) - log.Printf("[MetricsHandler] 转换为int64: %s=%d", k, int64(intValue)) } else if intValue, ok := v.(int64); ok { metrics.LatencyStats.Distribution[k] = intValue - log.Printf("[MetricsHandler] 已经是int64: %s=%d", k, intValue) } } } else { - log.Printf("[MetricsHandler] distribution类型未知: %v (type=%T)", distribution, distribution) + log.Printf("[MetricsHandler] distribution类型未知: %T", distribution) } - } else { - log.Printf("[MetricsHandler] 没有distribution字段或为nil: ok=%v, distribution=%v", ok, distribution) } - } else { - log.Printf("[MetricsHandler] latency_stats不是map: %v (type=%T)", stats["latency_stats"], stats["latency_stats"]) } - } else { - log.Printf("[MetricsHandler] latency_stats为nil") } // 如果分布数据为空,初始化一个空的分布 if metrics.LatencyStats.Distribution == nil { - log.Printf("[MetricsHandler] 初始化空的延迟分布") metrics.LatencyStats.Distribution = make(map[string]int64) // 添加默认的延迟桶 metrics.LatencyStats.Distribution["lt10ms"] = 0 @@ -219,9 +202,6 @@ func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) { metrics.ErrorStats.ServerErrors = serverErrors metrics.ErrorStats.Types = errorTypes - // 打印最终的延迟分布数据 - log.Printf("[MetricsHandler] 最终的延迟分布数据: %v", metrics.LatencyStats.Distribution) - w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(metrics); err != nil { log.Printf("Error encoding metrics: %v", err) diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index 2a40e48..e7947f6 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -70,6 +70,9 @@ func InitCollector(cfg *config.Config) error { // 启动数据一致性检查器 instance.startConsistencyChecker() + + // 启动定期清理任务 + instance.startCleanupTask() }) return nil } @@ -276,6 +279,7 @@ func (c *Collector) GetStats() map[string]interface{} { // 收集路径统计 var pathMetrics []*models.PathMetrics + pathCount := 0 c.pathStats.Range(func(key, value interface{}) bool { stats := value.(*models.PathMetrics) requestCount := stats.GetRequestCount() @@ -285,7 +289,10 @@ func (c *Collector) GetStats() map[string]interface{} { stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs) pathMetrics = append(pathMetrics, stats) } - return true + + // 限制遍历的数量,避免过多数据导致内存占用过高 + pathCount++ + return pathCount < 100 // 最多遍历100个路径 }) // 按请求数降序排序,请求数相同时按路径字典序排序 @@ -311,6 +318,7 @@ func (c *Collector) GetStats() map[string]interface{} { // 收集引用来源统计 var refererMetrics []*models.PathMetrics + refererCount := 0 c.refererStats.Range(func(key, value interface{}) bool { stats := value.(*models.PathMetrics) requestCount := stats.GetRequestCount() @@ -320,7 +328,10 @@ func (c *Collector) GetStats() map[string]interface{} { stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs) refererMetrics = append(refererMetrics, stats) } - return true + + // 限制遍历的数量,避免过多数据导致内存占用过高 + refererCount++ + return refererCount < 50 // 最多遍历50个引用来源 }) // 按请求数降序排序,请求数相同时按引用来源字典序排序 @@ -568,3 +579,80 @@ func simplifyReferer(referer string) string { return referer } + +// startCleanupTask 启动定期清理任务 +func (c *Collector) startCleanupTask() { + go func() { + ticker := time.NewTicker(1 * time.Hour) // 每小时清理一次 + defer ticker.Stop() + + for range ticker.C { + c.cleanupOldData() + } + }() +} + +// cleanupOldData 清理旧数据 +func (c *Collector) cleanupOldData() { + log.Printf("[Metrics] 开始清理旧数据...") + + // 清理路径统计 - 只保留有请求的路径 + var pathsToRemove []string + c.pathStats.Range(func(key, value interface{}) bool { + path := key.(string) + stats := value.(*models.PathMetrics) + if stats.GetRequestCount() == 0 { + pathsToRemove = append(pathsToRemove, path) + } + return true + }) + + for _, path := range pathsToRemove { + c.pathStats.Delete(path) + } + + // 清理引用来源统计 - 只保留有请求的引用来源 + var referersToRemove []string + c.refererStats.Range(func(key, value interface{}) bool { + referer := key.(string) + stats := value.(*models.PathMetrics) + if stats.GetRequestCount() == 0 { + referersToRemove = append(referersToRemove, referer) + } + return true + }) + + for _, referer := range referersToRemove { + c.refererStats.Delete(referer) + } + + // 清理带宽历史 - 只保留最近的记录 + c.bandwidthStats.Lock() + if len(c.bandwidthStats.history) > 10 { + // 找出最旧的记录并删除 + var oldestKeys []string + var oldestTimes []time.Time + + for k := range c.bandwidthStats.history { + t, err := time.Parse("01-02 15:04", k) + if err != nil { + continue + } + oldestTimes = append(oldestTimes, t) + oldestKeys = append(oldestKeys, k) + } + + // 按时间排序 + sort.Slice(oldestKeys, func(i, j int) bool { + return oldestTimes[i].Before(oldestTimes[j]) + }) + + // 删除最旧的记录,只保留最近10条 + for i := 0; i < len(oldestKeys)-10; i++ { + delete(c.bandwidthStats.history, oldestKeys[i]) + } + } + c.bandwidthStats.Unlock() + + log.Printf("[Metrics] 清理完成: 删除了 %d 个路径, %d 个引用来源", len(pathsToRemove), len(referersToRemove)) +} diff --git a/internal/metrics/persistence.go b/internal/metrics/persistence.go index 576f17e..62a0c8d 100644 --- a/internal/metrics/persistence.go +++ b/internal/metrics/persistence.go @@ -104,25 +104,29 @@ func (ms *MetricsStorage) SaveMetrics() error { // 获取当前指标数据 stats := ms.collector.GetStats() - // 保存基本指标 + // 保存基本指标 - 只保存必要的字段 basicMetrics := map[string]interface{}{ "uptime": stats["uptime"], "total_bytes": stats["total_bytes"], "avg_response_time": stats["avg_response_time"], "requests_per_second": stats["requests_per_second"], "bytes_per_second": stats["bytes_per_second"], - "latency_stats": stats["latency_stats"], - "bandwidth_history": stats["bandwidth_history"], - "current_bandwidth": stats["current_bandwidth"], "save_time": time.Now().Format(time.RFC3339), } + // 单独保存延迟统计,避免嵌套结构导致的内存占用 + if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok { + basicMetrics["latency_min"] = latencyStats["min"] + basicMetrics["latency_max"] = latencyStats["max"] + } + if err := saveJSONToFile(ms.metricsFile, basicMetrics); err != nil { return fmt.Errorf("保存基本指标失败: %v", err) } - // 保存路径统计 - if err := saveJSONToFile(ms.pathStatsFile, stats["top_paths"]); err != nil { + // 保存路径统计 - 限制数量 + topPaths := stats["top_paths"] + if err := saveJSONToFile(ms.pathStatsFile, topPaths); err != nil { return fmt.Errorf("保存路径统计失败: %v", err) } @@ -131,11 +135,21 @@ func (ms *MetricsStorage) SaveMetrics() error { return fmt.Errorf("保存状态码统计失败: %v", err) } - // 保存引用来源统计 - if err := saveJSONToFile(ms.refererStatsFile, stats["top_referers"]); err != nil { + // 保存引用来源统计 - 限制数量 + topReferers := stats["top_referers"] + if err := saveJSONToFile(ms.refererStatsFile, topReferers); err != nil { return fmt.Errorf("保存引用来源统计失败: %v", err) } + // 单独保存延迟分布 + if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok { + if distribution, ok := latencyStats["distribution"]; ok { + if err := saveJSONToFile(filepath.Join(ms.dataDir, "latency_distribution.json"), distribution); err != nil { + log.Printf("[MetricsStorage] 保存延迟分布失败: %v", err) + } + } + } + ms.mutex.Lock() ms.lastSaveTime = time.Now() ms.mutex.Unlock() @@ -150,7 +164,7 @@ func (ms *MetricsStorage) LoadMetrics() error { log.Printf("[MetricsStorage] 开始加载指标数据...") // 检查文件是否存在 - if !fileExists(ms.metricsFile) || !fileExists(ms.pathStatsFile) || !fileExists(ms.statusCodeFile) { + if !fileExists(ms.metricsFile) { return fmt.Errorf("指标数据文件不存在") } @@ -160,141 +174,138 @@ func (ms *MetricsStorage) LoadMetrics() error { return fmt.Errorf("加载基本指标失败: %v", err) } - // 加载路径统计 - var pathStats []map[string]interface{} - if err := loadJSONFromFile(ms.pathStatsFile, &pathStats); err != nil { - return fmt.Errorf("加载路径统计失败: %v", err) - } - - // 加载状态码统计 - var statusCodeStats map[string]interface{} - if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil { - return fmt.Errorf("加载状态码统计失败: %v", err) - } - - // 加载引用来源统计(如果文件存在) - var refererStats []map[string]interface{} - if fileExists(ms.refererStatsFile) { - if err := loadJSONFromFile(ms.refererStatsFile, &refererStats); err != nil { - log.Printf("[MetricsStorage] 加载引用来源统计失败: %v", err) - // 不中断加载过程 - } else { - log.Printf("[MetricsStorage] 成功加载引用来源统计: %d 条记录", len(refererStats)) - } - } - // 将加载的数据应用到收集器 // 1. 应用总字节数 if totalBytes, ok := basicMetrics["total_bytes"].(float64); ok { atomic.StoreInt64(&ms.collector.totalBytes, int64(totalBytes)) } - // 2. 应用路径统计 - for _, pathStat := range pathStats { - path, ok := pathStat["path"].(string) - if !ok { - continue - } - - requestCount, _ := pathStat["request_count"].(float64) - errorCount, _ := pathStat["error_count"].(float64) - bytesTransferred, _ := pathStat["bytes_transferred"].(float64) - - // 创建或更新路径统计 - var pathMetrics *models.PathMetrics - if existingMetrics, ok := ms.collector.pathStats.Load(path); ok { - pathMetrics = existingMetrics.(*models.PathMetrics) + // 2. 加载路径统计(如果文件存在) + if fileExists(ms.pathStatsFile) { + var pathStats []map[string]interface{} + if err := loadJSONFromFile(ms.pathStatsFile, &pathStats); err != nil { + log.Printf("[MetricsStorage] 加载路径统计失败: %v", err) } else { - pathMetrics = &models.PathMetrics{Path: path} - ms.collector.pathStats.Store(path, pathMetrics) - } - - // 设置统计值 - pathMetrics.RequestCount.Store(int64(requestCount)) - pathMetrics.ErrorCount.Store(int64(errorCount)) - pathMetrics.BytesTransferred.Store(int64(bytesTransferred)) - } - - // 3. 应用状态码统计 - for statusCode, count := range statusCodeStats { - countValue, ok := count.(float64) - if !ok { - continue - } - - // 创建或更新状态码统计 - if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok { - atomic.StoreInt64(counter.(*int64), int64(countValue)) - } else { - counter := new(int64) - *counter = int64(countValue) - ms.collector.statusCodeStats.Store(statusCode, counter) - } - } - - // 4. 应用引用来源统计 - if len(refererStats) > 0 { - for _, refererStat := range refererStats { - referer, ok := refererStat["path"].(string) - if !ok { - continue + // 只加载前10个路径统计 + maxPaths := 10 + if len(pathStats) > maxPaths { + pathStats = pathStats[:maxPaths] } - requestCount, _ := refererStat["request_count"].(float64) - errorCount, _ := refererStat["error_count"].(float64) - bytesTransferred, _ := refererStat["bytes_transferred"].(float64) + for _, pathStat := range pathStats { + path, ok := pathStat["path"].(string) + if !ok { + continue + } - // 创建或更新引用来源统计 - var refererMetrics *models.PathMetrics - if existingMetrics, ok := ms.collector.refererStats.Load(referer); ok { - refererMetrics = existingMetrics.(*models.PathMetrics) - } else { - refererMetrics = &models.PathMetrics{Path: referer} - ms.collector.refererStats.Store(referer, refererMetrics) + requestCount, _ := pathStat["request_count"].(float64) + errorCount, _ := pathStat["error_count"].(float64) + bytesTransferred, _ := pathStat["bytes_transferred"].(float64) + + // 创建或更新路径统计 + var pathMetrics *models.PathMetrics + if existingMetrics, ok := ms.collector.pathStats.Load(path); ok { + pathMetrics = existingMetrics.(*models.PathMetrics) + } else { + pathMetrics = &models.PathMetrics{Path: path} + ms.collector.pathStats.Store(path, pathMetrics) + } + + // 设置统计值 + pathMetrics.RequestCount.Store(int64(requestCount)) + pathMetrics.ErrorCount.Store(int64(errorCount)) + pathMetrics.BytesTransferred.Store(int64(bytesTransferred)) } - - // 设置统计值 - refererMetrics.RequestCount.Store(int64(requestCount)) - refererMetrics.ErrorCount.Store(int64(errorCount)) - refererMetrics.BytesTransferred.Store(int64(bytesTransferred)) + log.Printf("[MetricsStorage] 加载了 %d 条路径统计", len(pathStats)) } - log.Printf("[MetricsStorage] 应用了 %d 条引用来源统计记录", len(refererStats)) } - // 4. 应用延迟分布桶(如果有) - if latencyStats, ok := basicMetrics["latency_stats"].(map[string]interface{}); ok { - if distribution, ok := latencyStats["distribution"].(map[string]interface{}); ok { + // 3. 加载状态码统计(如果文件存在) + if fileExists(ms.statusCodeFile) { + var statusCodeStats map[string]interface{} + if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil { + log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err) + } else { + for statusCode, count := range statusCodeStats { + countValue, ok := count.(float64) + if !ok { + continue + } + + // 创建或更新状态码统计 + if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok { + atomic.StoreInt64(counter.(*int64), int64(countValue)) + } else { + counter := new(int64) + *counter = int64(countValue) + ms.collector.statusCodeStats.Store(statusCode, counter) + } + } + log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats)) + } + } + + // 4. 加载引用来源统计(如果文件存在) + if fileExists(ms.refererStatsFile) { + var refererStats []map[string]interface{} + if err := loadJSONFromFile(ms.refererStatsFile, &refererStats); err != nil { + log.Printf("[MetricsStorage] 加载引用来源统计失败: %v", err) + } else { + // 只加载前10个引用来源统计 + maxReferers := 10 + if len(refererStats) > maxReferers { + refererStats = refererStats[:maxReferers] + } + + for _, refererStat := range refererStats { + referer, ok := refererStat["path"].(string) + if !ok { + continue + } + + requestCount, _ := refererStat["request_count"].(float64) + errorCount, _ := refererStat["error_count"].(float64) + bytesTransferred, _ := refererStat["bytes_transferred"].(float64) + + // 创建或更新引用来源统计 + var refererMetrics *models.PathMetrics + if existingMetrics, ok := ms.collector.refererStats.Load(referer); ok { + refererMetrics = existingMetrics.(*models.PathMetrics) + } else { + refererMetrics = &models.PathMetrics{Path: referer} + ms.collector.refererStats.Store(referer, refererMetrics) + } + + // 设置统计值 + refererMetrics.RequestCount.Store(int64(requestCount)) + refererMetrics.ErrorCount.Store(int64(errorCount)) + refererMetrics.BytesTransferred.Store(int64(bytesTransferred)) + } + log.Printf("[MetricsStorage] 加载了 %d 条引用来源统计", len(refererStats)) + } + } + + // 5. 加载延迟分布(如果文件存在) + latencyDistributionFile := filepath.Join(ms.dataDir, "latency_distribution.json") + if fileExists(latencyDistributionFile) { + var distribution map[string]interface{} + if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil { + log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err) + } else { for bucket, count := range distribution { countValue, ok := count.(float64) if !ok { continue } - if bucketCounter, ok := ms.collector.latencyBuckets.Load(bucket); ok { - atomic.StoreInt64(bucketCounter.(*int64), int64(countValue)) + if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok { + atomic.StoreInt64(counter.(*int64), int64(countValue)) } } + log.Printf("[MetricsStorage] 加载了延迟分布数据") } } - // 5. 应用带宽历史(如果有) - if bandwidthHistory, ok := basicMetrics["bandwidth_history"].(map[string]interface{}); ok { - ms.collector.bandwidthStats.Lock() - ms.collector.bandwidthStats.history = make(map[string]int64) - for timeKey, bandwidth := range bandwidthHistory { - bandwidthValue, ok := bandwidth.(string) - if !ok { - continue - } - - // 解析带宽值(假设格式为 "X.XX MB/s") - var bytesValue float64 - fmt.Sscanf(bandwidthValue, "%f", &bytesValue) - ms.collector.bandwidthStats.history[timeKey] = int64(bytesValue) - } - ms.collector.bandwidthStats.Unlock() - } - ms.mutex.Lock() if saveTime, ok := basicMetrics["save_time"].(string); ok { if t, err := time.Parse(time.RFC3339, saveTime); err == nil {