feat(metrics): 优化指标数据处理和清理机制

- 在指标收集器中新增定期清理任务,自动清理无效的统计数据
- 修改 MetricsHandler 中的延迟分布处理,移除冗余日志输出
- 优化 MetricsStorage 的数据加载逻辑,限制加载的统计数据数量
- 新增延迟分布单独持久化存储,减少主指标文件的复杂性
- 改进数据加载和恢复的日志记录,提供更清晰的操作反馈
This commit is contained in:
wood chen 2025-03-09 13:21:35 +08:00
parent 006fa9a172
commit 22c0d2e301
3 changed files with 221 additions and 142 deletions

View File

@ -161,50 +161,33 @@ func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) {
metrics.LatencyStats.Max = utils.SafeString(latencyStats["max"], "0ms")
// 处理分布数据
log.Printf("[MetricsHandler] 处理延迟分布数据: stats[latency_stats]=%v", stats["latency_stats"])
if stats["latency_stats"] != nil {
latencyStatsMap, ok := stats["latency_stats"].(map[string]interface{})
if ok {
log.Printf("[MetricsHandler] latencyStatsMap=%v", latencyStatsMap)
distribution, ok := latencyStatsMap["distribution"]
if ok && distribution != nil {
log.Printf("[MetricsHandler] distribution=%v", distribution)
// 尝试直接使用 map[string]int64 类型
if distributionMap, ok := distribution.(map[string]int64); ok {
log.Printf("[MetricsHandler] 直接使用 map[string]int64: %v", distributionMap)
metrics.LatencyStats.Distribution = distributionMap
} else if distributionMap, ok := distribution.(map[string]interface{}); ok {
// 如果不是 map[string]int64尝试转换 map[string]interface{}
log.Printf("[MetricsHandler] 转换 map[string]interface{}: %v", distributionMap)
metrics.LatencyStats.Distribution = make(map[string]int64)
for k, v := range distributionMap {
log.Printf("[MetricsHandler] 处理延迟分布项: %s=%v (type=%T)", k, v, v)
if intValue, ok := v.(float64); ok {
metrics.LatencyStats.Distribution[k] = int64(intValue)
log.Printf("[MetricsHandler] 转换为int64: %s=%d", k, int64(intValue))
} else if intValue, ok := v.(int64); ok {
metrics.LatencyStats.Distribution[k] = intValue
log.Printf("[MetricsHandler] 已经是int64: %s=%d", k, intValue)
}
}
} else {
log.Printf("[MetricsHandler] distribution类型未知: %v (type=%T)", distribution, distribution)
log.Printf("[MetricsHandler] distribution类型未知: %T", distribution)
}
} else {
log.Printf("[MetricsHandler] 没有distribution字段或为nil: ok=%v, distribution=%v", ok, distribution)
}
} else {
log.Printf("[MetricsHandler] latency_stats不是map: %v (type=%T)", stats["latency_stats"], stats["latency_stats"])
}
} else {
log.Printf("[MetricsHandler] latency_stats为nil")
}
// 如果分布数据为空,初始化一个空的分布
if metrics.LatencyStats.Distribution == nil {
log.Printf("[MetricsHandler] 初始化空的延迟分布")
metrics.LatencyStats.Distribution = make(map[string]int64)
// 添加默认的延迟桶
metrics.LatencyStats.Distribution["lt10ms"] = 0
@ -219,9 +202,6 @@ func (h *ProxyHandler) MetricsHandler(w http.ResponseWriter, r *http.Request) {
metrics.ErrorStats.ServerErrors = serverErrors
metrics.ErrorStats.Types = errorTypes
// 打印最终的延迟分布数据
log.Printf("[MetricsHandler] 最终的延迟分布数据: %v", metrics.LatencyStats.Distribution)
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(metrics); err != nil {
log.Printf("Error encoding metrics: %v", err)

View File

@ -70,6 +70,9 @@ func InitCollector(cfg *config.Config) error {
// 启动数据一致性检查器
instance.startConsistencyChecker()
// 启动定期清理任务
instance.startCleanupTask()
})
return nil
}
@ -276,6 +279,7 @@ func (c *Collector) GetStats() map[string]interface{} {
// 收集路径统计
var pathMetrics []*models.PathMetrics
pathCount := 0
c.pathStats.Range(func(key, value interface{}) bool {
stats := value.(*models.PathMetrics)
requestCount := stats.GetRequestCount()
@ -285,7 +289,10 @@ func (c *Collector) GetStats() map[string]interface{} {
stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs)
pathMetrics = append(pathMetrics, stats)
}
return true
// 限制遍历的数量,避免过多数据导致内存占用过高
pathCount++
return pathCount < 100 // 最多遍历100个路径
})
// 按请求数降序排序,请求数相同时按路径字典序排序
@ -311,6 +318,7 @@ func (c *Collector) GetStats() map[string]interface{} {
// 收集引用来源统计
var refererMetrics []*models.PathMetrics
refererCount := 0
c.refererStats.Range(func(key, value interface{}) bool {
stats := value.(*models.PathMetrics)
requestCount := stats.GetRequestCount()
@ -320,7 +328,10 @@ func (c *Collector) GetStats() map[string]interface{} {
stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs)
refererMetrics = append(refererMetrics, stats)
}
return true
// 限制遍历的数量,避免过多数据导致内存占用过高
refererCount++
return refererCount < 50 // 最多遍历50个引用来源
})
// 按请求数降序排序,请求数相同时按引用来源字典序排序
@ -568,3 +579,80 @@ func simplifyReferer(referer string) string {
return referer
}
// startCleanupTask 启动定期清理任务
func (c *Collector) startCleanupTask() {
go func() {
ticker := time.NewTicker(1 * time.Hour) // 每小时清理一次
defer ticker.Stop()
for range ticker.C {
c.cleanupOldData()
}
}()
}
// cleanupOldData 清理旧数据
func (c *Collector) cleanupOldData() {
log.Printf("[Metrics] 开始清理旧数据...")
// 清理路径统计 - 只保留有请求的路径
var pathsToRemove []string
c.pathStats.Range(func(key, value interface{}) bool {
path := key.(string)
stats := value.(*models.PathMetrics)
if stats.GetRequestCount() == 0 {
pathsToRemove = append(pathsToRemove, path)
}
return true
})
for _, path := range pathsToRemove {
c.pathStats.Delete(path)
}
// 清理引用来源统计 - 只保留有请求的引用来源
var referersToRemove []string
c.refererStats.Range(func(key, value interface{}) bool {
referer := key.(string)
stats := value.(*models.PathMetrics)
if stats.GetRequestCount() == 0 {
referersToRemove = append(referersToRemove, referer)
}
return true
})
for _, referer := range referersToRemove {
c.refererStats.Delete(referer)
}
// 清理带宽历史 - 只保留最近的记录
c.bandwidthStats.Lock()
if len(c.bandwidthStats.history) > 10 {
// 找出最旧的记录并删除
var oldestKeys []string
var oldestTimes []time.Time
for k := range c.bandwidthStats.history {
t, err := time.Parse("01-02 15:04", k)
if err != nil {
continue
}
oldestTimes = append(oldestTimes, t)
oldestKeys = append(oldestKeys, k)
}
// 按时间排序
sort.Slice(oldestKeys, func(i, j int) bool {
return oldestTimes[i].Before(oldestTimes[j])
})
// 删除最旧的记录只保留最近10条
for i := 0; i < len(oldestKeys)-10; i++ {
delete(c.bandwidthStats.history, oldestKeys[i])
}
}
c.bandwidthStats.Unlock()
log.Printf("[Metrics] 清理完成: 删除了 %d 个路径, %d 个引用来源", len(pathsToRemove), len(referersToRemove))
}

View File

@ -104,25 +104,29 @@ func (ms *MetricsStorage) SaveMetrics() error {
// 获取当前指标数据
stats := ms.collector.GetStats()
// 保存基本指标
// 保存基本指标 - 只保存必要的字段
basicMetrics := map[string]interface{}{
"uptime": stats["uptime"],
"total_bytes": stats["total_bytes"],
"avg_response_time": stats["avg_response_time"],
"requests_per_second": stats["requests_per_second"],
"bytes_per_second": stats["bytes_per_second"],
"latency_stats": stats["latency_stats"],
"bandwidth_history": stats["bandwidth_history"],
"current_bandwidth": stats["current_bandwidth"],
"save_time": time.Now().Format(time.RFC3339),
}
// 单独保存延迟统计,避免嵌套结构导致的内存占用
if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok {
basicMetrics["latency_min"] = latencyStats["min"]
basicMetrics["latency_max"] = latencyStats["max"]
}
if err := saveJSONToFile(ms.metricsFile, basicMetrics); err != nil {
return fmt.Errorf("保存基本指标失败: %v", err)
}
// 保存路径统计
if err := saveJSONToFile(ms.pathStatsFile, stats["top_paths"]); err != nil {
// 保存路径统计 - 限制数量
topPaths := stats["top_paths"]
if err := saveJSONToFile(ms.pathStatsFile, topPaths); err != nil {
return fmt.Errorf("保存路径统计失败: %v", err)
}
@ -131,11 +135,21 @@ func (ms *MetricsStorage) SaveMetrics() error {
return fmt.Errorf("保存状态码统计失败: %v", err)
}
// 保存引用来源统计
if err := saveJSONToFile(ms.refererStatsFile, stats["top_referers"]); err != nil {
// 保存引用来源统计 - 限制数量
topReferers := stats["top_referers"]
if err := saveJSONToFile(ms.refererStatsFile, topReferers); err != nil {
return fmt.Errorf("保存引用来源统计失败: %v", err)
}
// 单独保存延迟分布
if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok {
if distribution, ok := latencyStats["distribution"]; ok {
if err := saveJSONToFile(filepath.Join(ms.dataDir, "latency_distribution.json"), distribution); err != nil {
log.Printf("[MetricsStorage] 保存延迟分布失败: %v", err)
}
}
}
ms.mutex.Lock()
ms.lastSaveTime = time.Now()
ms.mutex.Unlock()
@ -150,7 +164,7 @@ func (ms *MetricsStorage) LoadMetrics() error {
log.Printf("[MetricsStorage] 开始加载指标数据...")
// 检查文件是否存在
if !fileExists(ms.metricsFile) || !fileExists(ms.pathStatsFile) || !fileExists(ms.statusCodeFile) {
if !fileExists(ms.metricsFile) {
return fmt.Errorf("指标数据文件不存在")
}
@ -160,141 +174,138 @@ func (ms *MetricsStorage) LoadMetrics() error {
return fmt.Errorf("加载基本指标失败: %v", err)
}
// 加载路径统计
var pathStats []map[string]interface{}
if err := loadJSONFromFile(ms.pathStatsFile, &pathStats); err != nil {
return fmt.Errorf("加载路径统计失败: %v", err)
}
// 加载状态码统计
var statusCodeStats map[string]interface{}
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
return fmt.Errorf("加载状态码统计失败: %v", err)
}
// 加载引用来源统计(如果文件存在)
var refererStats []map[string]interface{}
if fileExists(ms.refererStatsFile) {
if err := loadJSONFromFile(ms.refererStatsFile, &refererStats); err != nil {
log.Printf("[MetricsStorage] 加载引用来源统计失败: %v", err)
// 不中断加载过程
} else {
log.Printf("[MetricsStorage] 成功加载引用来源统计: %d 条记录", len(refererStats))
}
}
// 将加载的数据应用到收集器
// 1. 应用总字节数
if totalBytes, ok := basicMetrics["total_bytes"].(float64); ok {
atomic.StoreInt64(&ms.collector.totalBytes, int64(totalBytes))
}
// 2. 应用路径统计
for _, pathStat := range pathStats {
path, ok := pathStat["path"].(string)
if !ok {
continue
}
requestCount, _ := pathStat["request_count"].(float64)
errorCount, _ := pathStat["error_count"].(float64)
bytesTransferred, _ := pathStat["bytes_transferred"].(float64)
// 创建或更新路径统计
var pathMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.pathStats.Load(path); ok {
pathMetrics = existingMetrics.(*models.PathMetrics)
// 2. 加载路径统计(如果文件存在)
if fileExists(ms.pathStatsFile) {
var pathStats []map[string]interface{}
if err := loadJSONFromFile(ms.pathStatsFile, &pathStats); err != nil {
log.Printf("[MetricsStorage] 加载路径统计失败: %v", err)
} else {
pathMetrics = &models.PathMetrics{Path: path}
ms.collector.pathStats.Store(path, pathMetrics)
}
// 设置统计值
pathMetrics.RequestCount.Store(int64(requestCount))
pathMetrics.ErrorCount.Store(int64(errorCount))
pathMetrics.BytesTransferred.Store(int64(bytesTransferred))
}
// 3. 应用状态码统计
for statusCode, count := range statusCodeStats {
countValue, ok := count.(float64)
if !ok {
continue
}
// 创建或更新状态码统计
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
} else {
counter := new(int64)
*counter = int64(countValue)
ms.collector.statusCodeStats.Store(statusCode, counter)
}
}
// 4. 应用引用来源统计
if len(refererStats) > 0 {
for _, refererStat := range refererStats {
referer, ok := refererStat["path"].(string)
if !ok {
continue
// 只加载前10个路径统计
maxPaths := 10
if len(pathStats) > maxPaths {
pathStats = pathStats[:maxPaths]
}
requestCount, _ := refererStat["request_count"].(float64)
errorCount, _ := refererStat["error_count"].(float64)
bytesTransferred, _ := refererStat["bytes_transferred"].(float64)
for _, pathStat := range pathStats {
path, ok := pathStat["path"].(string)
if !ok {
continue
}
// 创建或更新引用来源统计
var refererMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
} else {
refererMetrics = &models.PathMetrics{Path: referer}
ms.collector.refererStats.Store(referer, refererMetrics)
requestCount, _ := pathStat["request_count"].(float64)
errorCount, _ := pathStat["error_count"].(float64)
bytesTransferred, _ := pathStat["bytes_transferred"].(float64)
// 创建或更新路径统计
var pathMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.pathStats.Load(path); ok {
pathMetrics = existingMetrics.(*models.PathMetrics)
} else {
pathMetrics = &models.PathMetrics{Path: path}
ms.collector.pathStats.Store(path, pathMetrics)
}
// 设置统计值
pathMetrics.RequestCount.Store(int64(requestCount))
pathMetrics.ErrorCount.Store(int64(errorCount))
pathMetrics.BytesTransferred.Store(int64(bytesTransferred))
}
// 设置统计值
refererMetrics.RequestCount.Store(int64(requestCount))
refererMetrics.ErrorCount.Store(int64(errorCount))
refererMetrics.BytesTransferred.Store(int64(bytesTransferred))
log.Printf("[MetricsStorage] 加载了 %d 条路径统计", len(pathStats))
}
log.Printf("[MetricsStorage] 应用了 %d 条引用来源统计记录", len(refererStats))
}
// 4. 应用延迟分布桶(如果有)
if latencyStats, ok := basicMetrics["latency_stats"].(map[string]interface{}); ok {
if distribution, ok := latencyStats["distribution"].(map[string]interface{}); ok {
// 3. 加载状态码统计(如果文件存在)
if fileExists(ms.statusCodeFile) {
var statusCodeStats map[string]interface{}
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
} else {
for statusCode, count := range statusCodeStats {
countValue, ok := count.(float64)
if !ok {
continue
}
// 创建或更新状态码统计
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
} else {
counter := new(int64)
*counter = int64(countValue)
ms.collector.statusCodeStats.Store(statusCode, counter)
}
}
log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats))
}
}
// 4. 加载引用来源统计(如果文件存在)
if fileExists(ms.refererStatsFile) {
var refererStats []map[string]interface{}
if err := loadJSONFromFile(ms.refererStatsFile, &refererStats); err != nil {
log.Printf("[MetricsStorage] 加载引用来源统计失败: %v", err)
} else {
// 只加载前10个引用来源统计
maxReferers := 10
if len(refererStats) > maxReferers {
refererStats = refererStats[:maxReferers]
}
for _, refererStat := range refererStats {
referer, ok := refererStat["path"].(string)
if !ok {
continue
}
requestCount, _ := refererStat["request_count"].(float64)
errorCount, _ := refererStat["error_count"].(float64)
bytesTransferred, _ := refererStat["bytes_transferred"].(float64)
// 创建或更新引用来源统计
var refererMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
} else {
refererMetrics = &models.PathMetrics{Path: referer}
ms.collector.refererStats.Store(referer, refererMetrics)
}
// 设置统计值
refererMetrics.RequestCount.Store(int64(requestCount))
refererMetrics.ErrorCount.Store(int64(errorCount))
refererMetrics.BytesTransferred.Store(int64(bytesTransferred))
}
log.Printf("[MetricsStorage] 加载了 %d 条引用来源统计", len(refererStats))
}
}
// 5. 加载延迟分布(如果文件存在)
latencyDistributionFile := filepath.Join(ms.dataDir, "latency_distribution.json")
if fileExists(latencyDistributionFile) {
var distribution map[string]interface{}
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
} else {
for bucket, count := range distribution {
countValue, ok := count.(float64)
if !ok {
continue
}
if bucketCounter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
atomic.StoreInt64(bucketCounter.(*int64), int64(countValue))
if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
}
}
log.Printf("[MetricsStorage] 加载了延迟分布数据")
}
}
// 5. 应用带宽历史(如果有)
if bandwidthHistory, ok := basicMetrics["bandwidth_history"].(map[string]interface{}); ok {
ms.collector.bandwidthStats.Lock()
ms.collector.bandwidthStats.history = make(map[string]int64)
for timeKey, bandwidth := range bandwidthHistory {
bandwidthValue, ok := bandwidth.(string)
if !ok {
continue
}
// 解析带宽值(假设格式为 "X.XX MB/s"
var bytesValue float64
fmt.Sscanf(bandwidthValue, "%f", &bytesValue)
ms.collector.bandwidthStats.history[timeKey] = int64(bytesValue)
}
ms.collector.bandwidthStats.Unlock()
}
ms.mutex.Lock()
if saveTime, ok := basicMetrics["save_time"].(string); ok {
if t, err := time.Parse(time.RFC3339, saveTime); err == nil {