From 015aa6bc151cbe270a38fc3981940546229dbc28 Mon Sep 17 00:00:00 2001 From: wood chen Date: Sun, 9 Mar 2025 05:41:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(metrics):=20=E5=A2=9E=E5=8A=A0=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E4=BF=9D=E5=AD=98=E5=92=8C=E7=AE=A1=E7=90=86=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在配置中新增指标保存间隔和最大文件数量设置 - 实现定期自动保存指标数据到文件 - 添加指标文件自动清理机制 - 优化指标收集和存储逻辑 - 在Web界面添加指标设置选项卡 --- internal/config/config.go | 2 + internal/config/types.go | 6 +- internal/metrics/collector.go | 328 ++++++++++++++++++++++++++---- web/app/dashboard/config/page.tsx | 73 ++++++- 4 files changed, 358 insertions(+), 51 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index f515b38..cedf180 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -76,6 +76,8 @@ func (c *configImpl) Update(newConfig *Config) { // 更新配置 c.MAP = newConfig.MAP c.Compression = newConfig.Compression + c.MetricsSaveInterval = newConfig.MetricsSaveInterval + c.MetricsMaxFiles = newConfig.MetricsMaxFiles // 触发回调 for _, callback := range c.onConfigUpdate { diff --git a/internal/config/types.go b/internal/config/types.go index 901142e..fb74809 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -6,8 +6,10 @@ import ( ) type Config struct { - MAP map[string]PathConfig `json:"MAP"` // 改为使用PathConfig - Compression CompressionConfig `json:"Compression"` + MAP map[string]PathConfig `json:"MAP"` // 改为使用PathConfig + Compression CompressionConfig `json:"Compression"` + MetricsSaveInterval int `json:"MetricsSaveInterval"` // 指标保存间隔(分钟) + MetricsMaxFiles int `json:"MetricsMaxFiles"` // 保留的最大统计文件数量 } type PathConfig struct { diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index c099946..dfe6b4b 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -1,17 +1,23 @@ package metrics import ( + "encoding/json" "fmt" "log" "math" "net/http" + "os" + "os/signal" + "path/filepath" "proxy-go/internal/config" "proxy-go/internal/models" "proxy-go/internal/utils" "runtime" "sort" + "strings" "sync" "sync/atomic" + "syscall" "time" ) @@ -36,6 +42,7 @@ type Collector struct { recentRequests *models.RequestQueue pathStatsMutex sync.RWMutex config *config.Config + lastSaveTime time.Time // 最后一次保存指标的时间 } var ( @@ -54,7 +61,7 @@ func InitCollector(cfg *config.Config) error { } // 初始化带宽统计 - instance.bandwidthStats.window = time.Minute + instance.bandwidthStats.window = 5 * time.Minute instance.bandwidthStats.lastUpdate = time.Now() instance.bandwidthStats.history = make(map[string]int64) @@ -65,8 +72,16 @@ func InitCollector(cfg *config.Config) error { instance.latencyBuckets.Store("200-1000ms", new(int64)) instance.latencyBuckets.Store(">1s", new(int64)) + // 加载历史统计数据 + if err := instance.LoadRecentStats(); err != nil { + log.Printf("[Metrics] Warning: Failed to load stats: %v", err) + } + // 启动数据一致性检查器 instance.startConsistencyChecker() + + // 启动定时保存任务 + instance.startMetricsSaver() }) return nil } @@ -150,23 +165,26 @@ func (c *Collector) RecordRequest(path string, status int, latency time.Duration // 更新路径统计 c.pathStatsMutex.Lock() if value, ok := c.pathStats.Load(path); ok { - stat := value.(*models.PathMetrics) - stat.AddRequest() + stat := value.(models.PathStats) + stat.Requests.Store(stat.Requests.Load() + 1) + stat.Bytes.Store(stat.Bytes.Load() + bytes) + stat.LatencySum.Store(stat.LatencySum.Load() + int64(latency)) if status >= 400 { - stat.AddError() + stat.Errors.Store(stat.Errors.Load() + 1) } - stat.AddLatency(int64(latency)) - stat.AddBytes(bytes) } else { - newStat := &models.PathMetrics{ - Path: path, + newStat := models.PathStats{ + Requests: atomic.Int64{}, + Bytes: atomic.Int64{}, + LatencySum: atomic.Int64{}, + Errors: atomic.Int64{}, } - newStat.RequestCount.Store(1) + newStat.Requests.Store(1) + newStat.Bytes.Store(bytes) + newStat.LatencySum.Store(int64(latency)) if status >= 400 { - newStat.ErrorCount.Store(1) + newStat.Errors.Store(1) } - newStat.TotalLatency.Store(int64(latency)) - newStat.BytesTransferred.Store(bytes) c.pathStats.Store(path, newStat) } c.pathStatsMutex.Unlock() @@ -241,38 +259,70 @@ func (c *Collector) GetStats() map[string]interface{} { }) // 收集路径统计 - var pathMetrics []*models.PathMetrics + pathStatsMap := make(map[string]interface{}) c.pathStats.Range(func(key, value interface{}) bool { - stats := value.(*models.PathMetrics) - requestCount := stats.GetRequestCount() + path := key.(string) + stats := value.(models.PathStats) + requestCount := stats.Requests.Load() if requestCount > 0 { - totalLatency := stats.GetTotalLatency() - avgLatencyMs := float64(totalLatency) / float64(requestCount) / float64(time.Millisecond) - stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs) - pathMetrics = append(pathMetrics, stats) + latencySum := stats.LatencySum.Load() + avgLatencyMs := float64(latencySum) / float64(requestCount) / float64(time.Millisecond) + + pathStatsMap[path] = map[string]interface{}{ + "requests": requestCount, + "errors": stats.Errors.Load(), + "bytes": stats.Bytes.Load(), + "latency_sum": latencySum, + "avg_latency": fmt.Sprintf("%.2fms", avgLatencyMs), + } } return true }) - // 按请求数降序排序,请求数相同时按路径字典序排序 - sort.Slice(pathMetrics, func(i, j int) bool { - countI := pathMetrics[i].GetRequestCount() - countJ := pathMetrics[j].GetRequestCount() - if countI != countJ { - return countI > countJ - } - return pathMetrics[i].Path < pathMetrics[j].Path - }) - - // 只保留前10个 - if len(pathMetrics) > 10 { - pathMetrics = pathMetrics[:10] + // 按请求数降序排序路径 + type pathStat struct { + Path string + Requests int64 + Errors int64 + Bytes int64 + LatencySum int64 + AvgLatency string } - // 转换为值切片 - pathMetricsValues := make([]models.PathMetricsJSON, len(pathMetrics)) - for i, metric := range pathMetrics { - pathMetricsValues[i] = metric.ToJSON() + var pathStatsList []pathStat + for path, statData := range pathStatsMap { + if stat, ok := statData.(map[string]interface{}); ok { + pathStatsList = append(pathStatsList, pathStat{ + Path: path, + Requests: stat["requests"].(int64), + Errors: stat["errors"].(int64), + Bytes: stat["bytes"].(int64), + LatencySum: stat["latency_sum"].(int64), + AvgLatency: stat["avg_latency"].(string), + }) + } + } + + // 按请求数降序排序,请求数相同时按路径字典序排序 + sort.Slice(pathStatsList, func(i, j int) bool { + if pathStatsList[i].Requests != pathStatsList[j].Requests { + return pathStatsList[i].Requests > pathStatsList[j].Requests + } + return pathStatsList[i].Path < pathStatsList[j].Path + }) + + // 转换为有序的map + orderedPathStats := make([]map[string]interface{}, len(pathStatsList)) + for i, ps := range pathStatsList { + orderedPathStats[i] = map[string]interface{}{ + "path": ps.Path, + "requests": ps.Requests, + "errors": ps.Errors, + "bytes": ps.Bytes, + "latency_sum": ps.LatencySum, + "avg_latency": ps.AvgLatency, + "error_rate": fmt.Sprintf("%.2f%%", float64(ps.Errors)*100/float64(ps.Requests)), + } } // 收集延迟分布 @@ -309,7 +359,7 @@ func (c *Collector) GetStats() map[string]interface{} { "requests_per_second": requestsPerSecond, "bytes_per_second": float64(atomic.LoadInt64(&c.totalBytes)) / totalRuntime.Seconds(), "status_code_stats": statusCodeStats, - "top_paths": pathMetricsValues, + "top_paths": orderedPathStats, "recent_requests": recentRequests, "latency_stats": map[string]interface{}{ "min": fmt.Sprintf("%.2fms", float64(minLatency)/float64(time.Millisecond)), @@ -322,15 +372,171 @@ func (c *Collector) GetStats() map[string]interface{} { } func (c *Collector) SaveMetrics(stats map[string]interface{}) error { - lastSaveTime = time.Now() + // 确保data目录存在 + if err := os.MkdirAll("data/metrics", 0755); err != nil { + return fmt.Errorf("failed to create metrics directory: %v", err) + } + + // 将统计数据保存到文件 + data, err := json.MarshalIndent(stats, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal metrics data: %v", err) + } + + // 写入文件 + filename := fmt.Sprintf("data/metrics/stats_%s.json", time.Now().Format("20060102_150405")) + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("failed to write metrics file: %v", err) + } + + // 同时保存一个最新的副本 + if err := os.WriteFile("data/metrics/latest_stats.json", data, 0644); err != nil { + return fmt.Errorf("failed to write latest metrics file: %v", err) + } + + // 清理旧文件 + if err := c.cleanupOldMetricsFiles(); err != nil { + log.Printf("[Metrics] Warning: Failed to cleanup old metrics files: %v", err) + } + + c.lastSaveTime = time.Now() + log.Printf("[Metrics] Saved metrics to %s", filename) return nil } -// LoadRecentStats 简化为只进行数据验证 +// cleanupOldMetricsFiles 清理旧的统计数据文件,只保留最近的N个文件 +func (c *Collector) cleanupOldMetricsFiles() error { + // 默认保留最近的10个文件 + maxFiles := 10 + + // 如果配置中有指定,则使用配置的值 + if c.config != nil && c.config.MetricsMaxFiles > 0 { + maxFiles = c.config.MetricsMaxFiles + } + + // 读取metrics目录中的所有文件 + files, err := os.ReadDir("data/metrics") + if err != nil { + return fmt.Errorf("failed to read metrics directory: %v", err) + } + + // 过滤出统计数据文件(排除latest_stats.json) + var statsFiles []os.DirEntry + for _, file := range files { + if !file.IsDir() && file.Name() != "latest_stats.json" && + strings.HasPrefix(file.Name(), "stats_") && strings.HasSuffix(file.Name(), ".json") { + statsFiles = append(statsFiles, file) + } + } + + // 如果文件数量未超过限制,则不需要清理 + if len(statsFiles) <= maxFiles { + return nil + } + + // 获取文件信息并创建带时间戳的文件列表 + type fileInfo struct { + entry os.DirEntry + modTime time.Time + fullPath string + } + + var filesWithInfo []fileInfo + for _, file := range statsFiles { + info, err := file.Info() + if err != nil { + log.Printf("[Metrics] Warning: Failed to get file info for %s: %v", file.Name(), err) + continue + } + filesWithInfo = append(filesWithInfo, fileInfo{ + entry: file, + modTime: info.ModTime(), + fullPath: filepath.Join("data/metrics", file.Name()), + }) + } + + // 按修改时间排序文件(从新到旧) + sort.Slice(filesWithInfo, func(i, j int) bool { + return filesWithInfo[i].modTime.After(filesWithInfo[j].modTime) + }) + + // 删除多余的旧文件 + for i := maxFiles; i < len(filesWithInfo); i++ { + if err := os.Remove(filesWithInfo[i].fullPath); err != nil { + return fmt.Errorf("failed to remove old metrics file %s: %v", filesWithInfo[i].fullPath, err) + } + log.Printf("[Metrics] Removed old metrics file: %s", filesWithInfo[i].fullPath) + } + + return nil +} + +// LoadRecentStats 从文件加载最近的统计数据 func (c *Collector) LoadRecentStats() error { start := time.Now() log.Printf("[Metrics] Loading stats...") + // 尝试从最新的统计文件加载数据 + data, err := os.ReadFile("data/metrics/latest_stats.json") + if err != nil { + if os.IsNotExist(err) { + log.Printf("[Metrics] No previous stats found, starting fresh") + return nil + } + return fmt.Errorf("failed to read metrics file: %v", err) + } + + // 解析JSON数据 + var stats map[string]interface{} + if err := json.Unmarshal(data, &stats); err != nil { + return fmt.Errorf("failed to unmarshal metrics data: %v", err) + } + + // 恢复统计数据 + if totalBytes, ok := stats["total_bytes"].(float64); ok { + atomic.StoreInt64(&c.totalBytes, int64(totalBytes)) + } + + // 恢复路径统计 + if pathStats, ok := stats["path_stats"].(map[string]interface{}); ok { + for path, stat := range pathStats { + if statMap, ok := stat.(map[string]interface{}); ok { + pathStat := &models.PathStats{} + + if count, ok := statMap["requests"].(float64); ok { + pathStat.Requests.Store(int64(count)) + } + + if bytes, ok := statMap["bytes"].(float64); ok { + pathStat.Bytes.Store(int64(bytes)) + } + + if errors, ok := statMap["errors"].(float64); ok { + pathStat.Errors.Store(int64(errors)) + } + + if latency, ok := statMap["latency_sum"].(float64); ok { + pathStat.LatencySum.Store(int64(latency)) + } + + c.pathStats.Store(path, pathStat) + } + } + } + + // 恢复状态码统计 + if statusStats, ok := stats["status_codes"].(map[string]interface{}); ok { + for code, count := range statusStats { + if countVal, ok := count.(float64); ok { + codeInt := 0 + if _, err := fmt.Sscanf(code, "%d", &codeInt); err == nil { + var counter int64 = int64(countVal) + c.statusCodeStats.Store(codeInt, &counter) + } + } + } + } + if err := c.validateLoadedData(); err != nil { return fmt.Errorf("data validation failed: %v", err) } @@ -361,9 +567,9 @@ func (c *Collector) validateLoadedData() error { // 验证路径统计 var totalPathRequests int64 c.pathStats.Range(func(_, value interface{}) bool { - stats := value.(*models.PathMetrics) - requestCount := stats.GetRequestCount() - errorCount := stats.GetErrorCount() + stats := value.(models.PathStats) + requestCount := stats.Requests.Load() + errorCount := stats.Errors.Load() if requestCount < 0 || errorCount < 0 { return false } @@ -383,10 +589,8 @@ func (c *Collector) validateLoadedData() error { } // GetLastSaveTime 实现 interfaces.MetricsCollector 接口 -var lastSaveTime time.Time - func (c *Collector) GetLastSaveTime() time.Time { - return lastSaveTime + return c.lastSaveTime } // CheckDataConsistency 实现 interfaces.MetricsCollector 接口 @@ -470,3 +674,37 @@ func (c *Collector) getBandwidthHistory() map[string]string { } return history } + +// startMetricsSaver 启动定时保存统计数据的任务 +func (c *Collector) startMetricsSaver() { + // 定义保存间隔,可以根据需要调整 + saveInterval := 15 * time.Minute + + // 如果配置中有指定,则使用配置的间隔 + if c.config != nil && c.config.MetricsSaveInterval > 0 { + saveInterval = time.Duration(c.config.MetricsSaveInterval) * time.Minute + } + + ticker := time.NewTicker(saveInterval) + go func() { + for range ticker.C { + stats := c.GetStats() + if err := c.SaveMetrics(stats); err != nil { + log.Printf("[Metrics] Failed to save metrics: %v", err) + } + } + }() + + // 注册信号处理,在程序退出前保存一次 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + log.Println("[Metrics] Received shutdown signal, saving metrics...") + stats := c.GetStats() + if err := c.SaveMetrics(stats); err != nil { + log.Printf("[Metrics] Failed to save metrics on shutdown: %v", err) + } + os.Exit(0) + }() +} diff --git a/web/app/dashboard/config/page.tsx b/web/app/dashboard/config/page.tsx index 663c12b..b29abff 100644 --- a/web/app/dashboard/config/page.tsx +++ b/web/app/dashboard/config/page.tsx @@ -2,7 +2,7 @@ import { useEffect, useState, useCallback, useRef } from "react" import { Button } from "@/components/ui/button" -import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" +import { Card, CardContent, CardHeader, CardTitle, CardDescription } from "@/components/ui/card" import { useToast } from "@/components/ui/use-toast" import { useRouter } from "next/navigation" import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" @@ -55,6 +55,8 @@ interface Config { Gzip: CompressionConfig Brotli: CompressionConfig } + MetricsSaveInterval?: number // 指标保存间隔(分钟) + MetricsMaxFiles?: number // 保留的最大统计文件数量 } export default function ConfigPage() { @@ -323,6 +325,13 @@ export default function ConfigPage() { setConfig(newConfig) } + const updateMetricsSettings = (field: 'MetricsSaveInterval' | 'MetricsMaxFiles', value: number) => { + if (!config) return + const newConfig = { ...config } + newConfig[field] = value + setConfig(newConfig) + } + const handleExtensionMapEdit = (path: string, ext?: string, target?: string) => { setEditingPath(path) if (ext && target) { @@ -425,9 +434,6 @@ export default function ConfigPage() { }) } - - - const openAddPathDialog = () => { setEditingPathData(null) setNewPathData({ @@ -601,6 +607,7 @@ export default function ConfigPage() { 路径映射 压缩设置 + 指标设置 @@ -913,6 +920,64 @@ export default function ConfigPage() { + + + + + 指标设置 + + 配置指标收集和保存的相关设置 + + + +
+
+
+ +
+ updateMetricsSettings('MetricsSaveInterval', parseInt(e.target.value) || 15)} + className="w-24" + /> + + 分钟(默认:15分钟) + +
+

+ 系统将按此间隔定期保存统计数据到文件 +

+
+ +
+ +
+ updateMetricsSettings('MetricsMaxFiles', parseInt(e.target.value) || 10)} + className="w-24" + /> + + 个文件(默认:10个) + +
+

+ 超过此数量的旧统计文件将被自动清理 +

+
+
+
+
+
+