From e67a3880f5877dc0f25353f25a20ad019aa231f1 Mon Sep 17 00:00:00 2001 From: wood chen Date: Sun, 9 Mar 2025 10:48:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E7=BB=9F=E8=AE=A1=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/config/config.go | 2 - internal/config/types.go | 20 +- internal/metrics/collector.go | 433 ++++-------------------------- web/app/dashboard/config/page.tsx | 83 +----- 4 files changed, 59 insertions(+), 479 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index cedf180..f515b38 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -76,8 +76,6 @@ func (c *configImpl) Update(newConfig *Config) { // 更新配置 c.MAP = newConfig.MAP c.Compression = newConfig.Compression - c.MetricsSaveInterval = newConfig.MetricsSaveInterval - c.MetricsMaxFiles = newConfig.MetricsMaxFiles // 触发回调 for _, callback := range c.onConfigUpdate { diff --git a/internal/config/types.go b/internal/config/types.go index 16e4cda..901142e 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -6,10 +6,8 @@ import ( ) type Config struct { - MAP map[string]PathConfig `json:"MAP"` // 改为使用PathConfig - Compression CompressionConfig `json:"Compression"` - MetricsSaveInterval int `json:"MetricsSaveInterval"` // 指标保存间隔(分钟) - MetricsMaxFiles int `json:"MetricsMaxFiles"` // 保留的最大统计文件数量 + MAP map[string]PathConfig `json:"MAP"` // 改为使用PathConfig + Compression CompressionConfig `json:"Compression"` } type PathConfig struct { @@ -35,10 +33,8 @@ type CompressorConfig struct { func (c *Config) UnmarshalJSON(data []byte) error { // 创建一个临时结构来解析原始JSON type TempConfig struct { - MAP map[string]json.RawMessage `json:"MAP"` - Compression CompressionConfig `json:"Compression"` - MetricsSaveInterval int `json:"MetricsSaveInterval"` - MetricsMaxFiles int `json:"MetricsMaxFiles"` + MAP map[string]json.RawMessage `json:"MAP"` + Compression CompressionConfig `json:"Compression"` } var temp TempConfig @@ -49,11 +45,6 @@ func (c *Config) UnmarshalJSON(data []byte) error { // 初始化 MAP c.MAP = make(map[string]PathConfig) - // 复制其他字段 - c.Compression = temp.Compression - c.MetricsSaveInterval = temp.MetricsSaveInterval - c.MetricsMaxFiles = temp.MetricsMaxFiles - // 处理每个路径配置 for key, raw := range temp.MAP { // 尝试作为字符串解析 @@ -76,6 +67,9 @@ func (c *Config) UnmarshalJSON(data []byte) error { c.MAP[key] = pathConfig } + // 复制其他字段 + c.Compression = temp.Compression + return nil } diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index 55a950b..c099946 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -1,23 +1,17 @@ package metrics import ( - "encoding/json" "fmt" "log" "math" "net/http" - "os" - "os/signal" - "path/filepath" "proxy-go/internal/config" "proxy-go/internal/models" "proxy-go/internal/utils" "runtime" "sort" - "strings" "sync" "sync/atomic" - "syscall" "time" ) @@ -42,7 +36,6 @@ type Collector struct { recentRequests *models.RequestQueue pathStatsMutex sync.RWMutex config *config.Config - lastSaveTime time.Time // 最后一次保存指标的时间 } var ( @@ -61,7 +54,7 @@ func InitCollector(cfg *config.Config) error { } // 初始化带宽统计 - instance.bandwidthStats.window = 5 * time.Minute + instance.bandwidthStats.window = time.Minute instance.bandwidthStats.lastUpdate = time.Now() instance.bandwidthStats.history = make(map[string]int64) @@ -72,20 +65,8 @@ func InitCollector(cfg *config.Config) error { instance.latencyBuckets.Store("200-1000ms", new(int64)) instance.latencyBuckets.Store(">1s", new(int64)) - log.Printf("[Metrics] Initializing metrics collector...") - - // 加载历史统计数据 - if err := instance.LoadRecentStats(); err != nil { - log.Printf("[Metrics] Warning: Failed to load stats: %v", err) - } - // 启动数据一致性检查器 instance.startConsistencyChecker() - - // 启动定时保存任务 - instance.startMetricsSaver() - - log.Printf("[Metrics] Metrics collector initialized") }) return nil } @@ -169,28 +150,23 @@ func (c *Collector) RecordRequest(path string, status int, latency time.Duration // 更新路径统计 c.pathStatsMutex.Lock() if value, ok := c.pathStats.Load(path); ok { - stat, ok := value.(*models.PathStats) - if ok { - stat.Requests.Add(1) - stat.Bytes.Add(bytes) - stat.LatencySum.Add(int64(latency)) - if status >= 400 { - stat.Errors.Add(1) - } - } - } else { - newStat := &models.PathStats{ - Requests: atomic.Int64{}, - Bytes: atomic.Int64{}, - LatencySum: atomic.Int64{}, - Errors: atomic.Int64{}, - } - newStat.Requests.Store(1) - newStat.Bytes.Store(bytes) - newStat.LatencySum.Store(int64(latency)) + stat := value.(*models.PathMetrics) + stat.AddRequest() if status >= 400 { - newStat.Errors.Store(1) + stat.AddError() } + stat.AddLatency(int64(latency)) + stat.AddBytes(bytes) + } else { + newStat := &models.PathMetrics{ + Path: path, + } + newStat.RequestCount.Store(1) + if status >= 400 { + newStat.ErrorCount.Store(1) + } + newStat.TotalLatency.Store(int64(latency)) + newStat.BytesTransferred.Store(bytes) c.pathStats.Store(path, newStat) } c.pathStatsMutex.Unlock() @@ -265,85 +241,38 @@ func (c *Collector) GetStats() map[string]interface{} { }) // 收集路径统计 - pathStatsMap := make(map[string]interface{}) + var pathMetrics []*models.PathMetrics c.pathStats.Range(func(key, value interface{}) bool { - path := key.(string) - stats, ok := value.(*models.PathStats) - if !ok { - return true - } - requestCount := stats.Requests.Load() + stats := value.(*models.PathMetrics) + requestCount := stats.GetRequestCount() if requestCount > 0 { - latencySum := stats.LatencySum.Load() - avgLatencyMs := float64(latencySum) / float64(requestCount) / float64(time.Millisecond) - - pathStatsMap[path] = map[string]interface{}{ - "requests": requestCount, - "errors": stats.Errors.Load(), - "bytes": stats.Bytes.Load(), - "latency_sum": latencySum, - "avg_latency": fmt.Sprintf("%.2fms", avgLatencyMs), - } + totalLatency := stats.GetTotalLatency() + avgLatencyMs := float64(totalLatency) / float64(requestCount) / float64(time.Millisecond) + stats.AvgLatency = fmt.Sprintf("%.2fms", avgLatencyMs) + pathMetrics = append(pathMetrics, stats) } return true }) - // 按请求数降序排序路径 - type pathStat struct { - Path string - Requests int64 - Errors int64 - Bytes int64 - LatencySum int64 - AvgLatency string - } - - // 限制路径统计的数量,只保留前N个 - maxPaths := 20 // 只保留前20个路径 - var pathStatsList []pathStat - - // 先将pathStatsMap转换为pathStatsList - for path, statData := range pathStatsMap { - if stat, ok := statData.(map[string]interface{}); ok { - pathStatsList = append(pathStatsList, pathStat{ - Path: path, - Requests: stat["requests"].(int64), - Errors: stat["errors"].(int64), - Bytes: stat["bytes"].(int64), - LatencySum: stat["latency_sum"].(int64), - AvgLatency: stat["avg_latency"].(string), - }) - } - } - - // 释放pathStatsMap内存 - pathStatsMap = nil - // 按请求数降序排序,请求数相同时按路径字典序排序 - sort.Slice(pathStatsList, func(i, j int) bool { - if pathStatsList[i].Requests != pathStatsList[j].Requests { - return pathStatsList[i].Requests > pathStatsList[j].Requests + sort.Slice(pathMetrics, func(i, j int) bool { + countI := pathMetrics[i].GetRequestCount() + countJ := pathMetrics[j].GetRequestCount() + if countI != countJ { + return countI > countJ } - return pathStatsList[i].Path < pathStatsList[j].Path + return pathMetrics[i].Path < pathMetrics[j].Path }) - // 只保留前maxPaths个 - if len(pathStatsList) > maxPaths { - pathStatsList = pathStatsList[:maxPaths] + // 只保留前10个 + if len(pathMetrics) > 10 { + pathMetrics = pathMetrics[:10] } - // 转换为有序的map - orderedPathStats := make([]map[string]interface{}, len(pathStatsList)) - for i, ps := range pathStatsList { - orderedPathStats[i] = map[string]interface{}{ - "path": ps.Path, - "requests": ps.Requests, - "errors": ps.Errors, - "bytes": ps.Bytes, - "latency_sum": ps.LatencySum, - "avg_latency": ps.AvgLatency, - "error_rate": fmt.Sprintf("%.2f%%", float64(ps.Errors)*100/float64(ps.Requests)), - } + // 转换为值切片 + pathMetricsValues := make([]models.PathMetricsJSON, len(pathMetrics)) + for i, metric := range pathMetrics { + pathMetricsValues[i] = metric.ToJSON() } // 收集延迟分布 @@ -380,8 +309,7 @@ func (c *Collector) GetStats() map[string]interface{} { "requests_per_second": requestsPerSecond, "bytes_per_second": float64(atomic.LoadInt64(&c.totalBytes)) / totalRuntime.Seconds(), "status_code_stats": statusCodeStats, - "top_paths": orderedPathStats, - "path_stats": pathStatsMap, + "top_paths": pathMetricsValues, "recent_requests": recentRequests, "latency_stats": map[string]interface{}{ "min": fmt.Sprintf("%.2fms", float64(minLatency)/float64(time.Millisecond)), @@ -394,229 +322,15 @@ func (c *Collector) GetStats() map[string]interface{} { } func (c *Collector) SaveMetrics(stats map[string]interface{}) error { - // 确保data目录存在 - if err := os.MkdirAll("data/metrics", 0755); err != nil { - return fmt.Errorf("failed to create metrics directory: %v", err) - } - - // 将统计数据保存到文件 - data, err := json.Marshal(stats) // 使用Marshal而不是MarshalIndent来减少内存使用 - if err != nil { - return fmt.Errorf("failed to marshal metrics data: %v", err) - } - - // 写入文件 - filename := fmt.Sprintf("data/metrics/stats_%s.json", time.Now().Format("20060102_150405")) - if err := os.WriteFile(filename, data, 0644); err != nil { - return fmt.Errorf("failed to write metrics file: %v", err) - } - - // 同时保存一个最新的副本 - if err := os.WriteFile("data/metrics/latest_stats.json", data, 0644); err != nil { - return fmt.Errorf("failed to write latest metrics file: %v", err) - } - - // 清理旧文件 - if err := c.cleanupOldMetricsFiles(); err != nil { - log.Printf("[Metrics] Warning: Failed to cleanup old metrics files: %v", err) - } - - // 释放内存 - data = nil - runtime.GC() - - c.lastSaveTime = time.Now() - log.Printf("[Metrics] Saved metrics to %s", filename) + lastSaveTime = time.Now() return nil } -// cleanupOldMetricsFiles 清理旧的统计数据文件,只保留最近的N个文件 -func (c *Collector) cleanupOldMetricsFiles() error { - // 默认保留最近的10个文件 - maxFiles := 10 - - // 如果配置中有指定,则使用配置的值 - if c.config != nil && c.config.MetricsMaxFiles > 0 { - maxFiles = c.config.MetricsMaxFiles - } - - // 读取metrics目录中的所有文件 - files, err := os.ReadDir("data/metrics") - if err != nil { - return fmt.Errorf("failed to read metrics directory: %v", err) - } - - // 过滤出统计数据文件(排除latest_stats.json) - var statsFiles []os.DirEntry - for _, file := range files { - if !file.IsDir() && file.Name() != "latest_stats.json" && - strings.HasPrefix(file.Name(), "stats_") && strings.HasSuffix(file.Name(), ".json") { - statsFiles = append(statsFiles, file) - } - } - - // 如果文件数量未超过限制,则不需要清理 - if len(statsFiles) <= maxFiles { - return nil - } - - // 获取文件信息并创建带时间戳的文件列表 - type fileInfo struct { - entry os.DirEntry - modTime time.Time - fullPath string - } - - var filesWithInfo []fileInfo - for _, file := range statsFiles { - info, err := file.Info() - if err != nil { - log.Printf("[Metrics] Warning: Failed to get file info for %s: %v", file.Name(), err) - continue - } - filesWithInfo = append(filesWithInfo, fileInfo{ - entry: file, - modTime: info.ModTime(), - fullPath: filepath.Join("data/metrics", file.Name()), - }) - } - - // 释放statsFiles内存 - statsFiles = nil - - // 按修改时间排序文件(从新到旧) - sort.Slice(filesWithInfo, func(i, j int) bool { - return filesWithInfo[i].modTime.After(filesWithInfo[j].modTime) - }) - - // 删除多余的旧文件 - for i := maxFiles; i < len(filesWithInfo); i++ { - if err := os.Remove(filesWithInfo[i].fullPath); err != nil { - return fmt.Errorf("failed to remove old metrics file %s: %v", filesWithInfo[i].fullPath, err) - } - log.Printf("[Metrics] Removed old metrics file: %s", filesWithInfo[i].fullPath) - } - - // 释放filesWithInfo内存 - filesWithInfo = nil - runtime.GC() - - return nil -} - -// LoadRecentStats 从文件加载最近的统计数据 +// LoadRecentStats 简化为只进行数据验证 func (c *Collector) LoadRecentStats() error { start := time.Now() log.Printf("[Metrics] Loading stats...") - // 尝试从最新的统计文件加载数据 - data, err := os.ReadFile("data/metrics/latest_stats.json") - if err != nil { - if os.IsNotExist(err) { - log.Printf("[Metrics] No previous stats found, starting fresh") - return nil - } - return fmt.Errorf("failed to read metrics file: %v", err) - } - - log.Printf("[Metrics] Found latest_stats.json, size: %d bytes", len(data)) - - // 解析JSON数据 - var stats map[string]interface{} - if err := json.Unmarshal(data, &stats); err != nil { - return fmt.Errorf("failed to unmarshal metrics data: %v", err) - } - - log.Printf("[Metrics] Successfully parsed JSON data") - - // 恢复统计数据 - if totalBytes, ok := stats["total_bytes"].(float64); ok { - atomic.StoreInt64(&c.totalBytes, int64(totalBytes)) - log.Printf("[Metrics] Restored total_bytes: %v", totalBytes) - } else { - log.Printf("[Metrics] No total_bytes found in stats") - } - - // 恢复路径统计 - pathStatsRestored := 0 - if pathStats, ok := stats["path_stats"].(map[string]interface{}); ok { - log.Printf("[Metrics] Found path_stats with %d entries", len(pathStats)) - for path, stat := range pathStats { - if statMap, ok := stat.(map[string]interface{}); ok { - pathStat := &models.PathStats{} - - if count, ok := statMap["requests"].(float64); ok { - pathStat.Requests.Store(int64(count)) - } - - if bytes, ok := statMap["bytes"].(float64); ok { - pathStat.Bytes.Store(int64(bytes)) - } - - if errors, ok := statMap["errors"].(float64); ok { - pathStat.Errors.Store(int64(errors)) - } - - if latency, ok := statMap["latency_sum"].(float64); ok { - pathStat.LatencySum.Store(int64(latency)) - } - - c.pathStats.Store(path, pathStat) - pathStatsRestored++ - } - } - log.Printf("[Metrics] Restored %d path stats", pathStatsRestored) - } else { - log.Printf("[Metrics] No path_stats found in stats") - - // 尝试从top_paths恢复数据 - if topPaths, ok := stats["top_paths"].([]interface{}); ok { - log.Printf("[Metrics] Found top_paths with %d entries", len(topPaths)) - for _, pathData := range topPaths { - if pathMap, ok := pathData.(map[string]interface{}); ok { - path, ok1 := pathMap["path"].(string) - requests, ok2 := pathMap["requests"].(float64) - errors, ok3 := pathMap["errors"].(float64) - bytes, ok4 := pathMap["bytes"].(float64) - latencySum, ok5 := pathMap["latency_sum"].(float64) - - if ok1 && ok2 && ok3 && ok4 && ok5 { - pathStat := &models.PathStats{} - pathStat.Requests.Store(int64(requests)) - pathStat.Errors.Store(int64(errors)) - pathStat.Bytes.Store(int64(bytes)) - pathStat.LatencySum.Store(int64(latencySum)) - - c.pathStats.Store(path, pathStat) - pathStatsRestored++ - } - } - } - log.Printf("[Metrics] Restored %d path stats from top_paths", pathStatsRestored) - } else { - log.Printf("[Metrics] No top_paths found in stats") - } - } - - // 恢复状态码统计 - statusCodesRestored := 0 - if statusStats, ok := stats["status_code_stats"].(map[string]interface{}); ok { - log.Printf("[Metrics] Found status_code_stats with %d entries", len(statusStats)) - for code, count := range statusStats { - if countVal, ok := count.(float64); ok { - codeInt := 0 - if _, err := fmt.Sscanf(code, "%d", &codeInt); err == nil { - var counter int64 = int64(countVal) - c.statusCodeStats.Store(codeInt, &counter) - statusCodesRestored++ - } - } - } - log.Printf("[Metrics] Restored %d status codes", statusCodesRestored) - } else { - log.Printf("[Metrics] No status_code_stats found in stats") - } - if err := c.validateLoadedData(); err != nil { return fmt.Errorf("data validation failed: %v", err) } @@ -647,43 +361,32 @@ func (c *Collector) validateLoadedData() error { // 验证路径统计 var totalPathRequests int64 c.pathStats.Range(func(_, value interface{}) bool { - stats, ok := value.(*models.PathStats) - if !ok { - log.Printf("[Metrics] Warning: Invalid path stats type: %T", value) - return true - } - requestCount := stats.Requests.Load() - errorCount := stats.Errors.Load() + stats := value.(*models.PathMetrics) + requestCount := stats.GetRequestCount() + errorCount := stats.GetErrorCount() if requestCount < 0 || errorCount < 0 { - log.Printf("[Metrics] Warning: Invalid path stats values: requests=%d, errors=%d", requestCount, errorCount) return false } if errorCount > requestCount { - log.Printf("[Metrics] Warning: Error count (%d) exceeds request count (%d)", errorCount, requestCount) return false } totalPathRequests += requestCount return true }) - // 如果没有请求,则跳过验证 - if statusCodeTotal == 0 && totalPathRequests == 0 { - log.Printf("[Metrics] No requests to validate") - return nil - } - - // 允许一定的误差 - if math.Abs(float64(totalPathRequests-statusCodeTotal)) > float64(statusCodeTotal)*0.1 { - log.Printf("[Metrics] Warning: Path stats total (%d) does not match status code total (%d)", totalPathRequests, statusCodeTotal) - // 不返回错误,只记录警告 + if totalPathRequests != statusCodeTotal { + return fmt.Errorf("path stats total (%d) does not match status code total (%d)", + totalPathRequests, statusCodeTotal) } return nil } // GetLastSaveTime 实现 interfaces.MetricsCollector 接口 +var lastSaveTime time.Time + func (c *Collector) GetLastSaveTime() time.Time { - return c.lastSaveTime + return lastSaveTime } // CheckDataConsistency 实现 interfaces.MetricsCollector 接口 @@ -767,43 +470,3 @@ func (c *Collector) getBandwidthHistory() map[string]string { } return history } - -// startMetricsSaver 启动定时保存统计数据的任务 -func (c *Collector) startMetricsSaver() { - // 定义保存间隔,可以根据需要调整 - saveInterval := 15 * time.Minute - - // 如果配置中有指定,则使用配置的间隔 - if c.config != nil && c.config.MetricsSaveInterval > 0 { - saveInterval = time.Duration(c.config.MetricsSaveInterval) * time.Minute - } - - ticker := time.NewTicker(saveInterval) - go func() { - for range ticker.C { - func() { - // 使用匿名函数来确保每次迭代后都能释放内存 - stats := c.GetStats() - if err := c.SaveMetrics(stats); err != nil { - log.Printf("[Metrics] Failed to save metrics: %v", err) - } - // 释放内存 - stats = nil - runtime.GC() - }() - } - }() - - // 注册信号处理,在程序退出前保存一次 - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigChan - log.Println("[Metrics] Received shutdown signal, saving metrics...") - stats := c.GetStats() - if err := c.SaveMetrics(stats); err != nil { - log.Printf("[Metrics] Failed to save metrics on shutdown: %v", err) - } - os.Exit(0) - }() -} diff --git a/web/app/dashboard/config/page.tsx b/web/app/dashboard/config/page.tsx index 58205b3..663c12b 100644 --- a/web/app/dashboard/config/page.tsx +++ b/web/app/dashboard/config/page.tsx @@ -2,7 +2,7 @@ import { useEffect, useState, useCallback, useRef } from "react" import { Button } from "@/components/ui/button" -import { Card, CardContent, CardHeader, CardTitle, CardDescription } from "@/components/ui/card" +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { useToast } from "@/components/ui/use-toast" import { useRouter } from "next/navigation" import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" @@ -55,8 +55,6 @@ interface Config { Gzip: CompressionConfig Brotli: CompressionConfig } - MetricsSaveInterval?: number // 指标保存间隔(分钟) - MetricsMaxFiles?: number // 保留的最大统计文件数量 } export default function ConfigPage() { @@ -124,16 +122,6 @@ export default function ConfigPage() { } const data = await response.json() - - // 设置默认值 - if (data.MetricsSaveInterval === undefined || data.MetricsSaveInterval === 0) { - data.MetricsSaveInterval = 15; // 默认15分钟 - } - - if (data.MetricsMaxFiles === undefined || data.MetricsMaxFiles === 0) { - data.MetricsMaxFiles = 10; // 默认10个文件 - } - setConfig(data) } catch (error) { const message = error instanceof Error ? error.message : "获取配置失败" @@ -335,13 +323,6 @@ export default function ConfigPage() { setConfig(newConfig) } - const updateMetricsSettings = (field: 'MetricsSaveInterval' | 'MetricsMaxFiles', value: number) => { - if (!config) return - const newConfig = { ...config } - newConfig[field] = value - setConfig(newConfig) - } - const handleExtensionMapEdit = (path: string, ext?: string, target?: string) => { setEditingPath(path) if (ext && target) { @@ -444,6 +425,9 @@ export default function ConfigPage() { }) } + + + const openAddPathDialog = () => { setEditingPathData(null) setNewPathData({ @@ -617,7 +601,6 @@ export default function ConfigPage() { 路径映射 压缩设置 - 指标设置 @@ -930,64 +913,6 @@ export default function ConfigPage() { - - - - - 指标设置 - - 配置指标收集和保存的相关设置 - - - -
-
-
- -
- updateMetricsSettings('MetricsSaveInterval', parseInt(e.target.value) || 15)} - className="w-24" - /> - - 分钟(默认:15分钟) - -
-

- 系统将按此间隔定期保存统计数据到文件 -

-
- -
- -
- updateMetricsSettings('MetricsMaxFiles', parseInt(e.target.value) || 10)} - className="w-24" - /> - - 个文件(默认:10个) - -
-

- 超过此数量的旧统计文件将被自动清理 -

-
-
-
-
-
-