diff --git a/internal/config/types.go b/internal/config/types.go index 579c491..4522085 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -59,6 +59,23 @@ type MetricsConfig struct { LargeLatency time.Duration `json:"LargeLatency"` // 大文件最大延迟 HugeLatency time.Duration `json:"HugeLatency"` // 超大文件最大延迟 } `json:"Latency"` + // 加载配置 + Load struct { + RetryCount int `json:"retry_count"` + RetryInterval time.Duration `json:"retry_interval"` + Timeout time.Duration `json:"timeout"` + } `json:"load"` + // 保存配置 + Save struct { + MinInterval time.Duration `json:"min_interval"` + MaxInterval time.Duration `json:"max_interval"` + DefaultInterval time.Duration `json:"default_interval"` + } `json:"save"` + // 验证配置 + Validation struct { + MaxErrorRate float64 `json:"max_error_rate"` + MaxDataDeviation float64 `json:"max_data_deviation"` + } `json:"validation"` } // 添加一个辅助方法来处理字符串到 PathConfig 的转换 diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 2fcdf96..7008fd1 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -51,6 +51,20 @@ var ( // 性能监控阈值 MaxRequestsPerMinute = 1000 // 每分钟最大请求数 MaxBytesPerMinute = 100 * 1024 * 1024 // 每分钟最大流量 (100MB) + + // 数据加载相关 + LoadRetryCount = 3 // 加载重试次数 + LoadRetryInterval = time.Second // 重试间隔 + LoadTimeout = 30 * time.Second // 加载超时时间 + + // 数据保存相关 + MinSaveInterval = 5 * time.Minute // 最小保存间隔 + MaxSaveInterval = 15 * time.Minute // 最大保存间隔 + DefaultSaveInterval = 10 * time.Minute // 默认保存间隔 + + // 数据验证相关 + MaxErrorRate = 0.8 // 最大错误率 + MaxDataDeviation = 0.01 // 最大数据偏差(1%) ) // UpdateFromConfig 从配置文件更新常量 @@ -97,4 +111,34 @@ func UpdateFromConfig(cfg *config.Config) { if cfg.Metrics.Latency.HugeLatency > 0 { HugeFileLatency = cfg.Metrics.Latency.HugeLatency } + + // 数据加载相关 + if cfg.Metrics.Load.RetryCount > 0 { + LoadRetryCount = cfg.Metrics.Load.RetryCount + } + if cfg.Metrics.Load.RetryInterval > 0 { + LoadRetryInterval = cfg.Metrics.Load.RetryInterval + } + if cfg.Metrics.Load.Timeout > 0 { + LoadTimeout = cfg.Metrics.Load.Timeout + } + + // 数据保存相关 + if cfg.Metrics.Save.MinInterval > 0 { + MinSaveInterval = cfg.Metrics.Save.MinInterval + } + if cfg.Metrics.Save.MaxInterval > 0 { + MaxSaveInterval = cfg.Metrics.Save.MaxInterval + } + if cfg.Metrics.Save.DefaultInterval > 0 { + DefaultSaveInterval = cfg.Metrics.Save.DefaultInterval + } + + // 数据验证相关 + if cfg.Metrics.Validation.MaxErrorRate > 0 { + MaxErrorRate = cfg.Metrics.Validation.MaxErrorRate + } + if cfg.Metrics.Validation.MaxDataDeviation > 0 { + MaxDataDeviation = cfg.Metrics.Validation.MaxDataDeviation + } } diff --git a/internal/interfaces/metrics.go b/internal/interfaces/metrics.go new file mode 100644 index 0000000..ff02770 --- /dev/null +++ b/internal/interfaces/metrics.go @@ -0,0 +1,11 @@ +package interfaces + +import ( + "time" +) + +// MetricsCollector 定义指标收集器接口 +type MetricsCollector interface { + CheckDataConsistency() error + GetLastSaveTime() time.Time +} diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index 6ac81c5..1ec73be 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -2,10 +2,13 @@ package metrics import ( "database/sql" + "encoding/json" "fmt" "log" "math/rand" "net/http" + "os" + "path" "proxy-go/internal/cache" "proxy-go/internal/config" "proxy-go/internal/constants" @@ -81,7 +84,7 @@ func InitCollector(dbPath string, config *config.Config) error { } globalCollector.cache = cache.NewCache(constants.CacheTTL) - globalCollector.monitor = monitor.NewMonitor() + globalCollector.monitor = monitor.NewMonitor(globalCollector) // 如果配置了飞书webhook,则启用飞书告警 if config.Metrics.FeishuWebhook != "" { @@ -436,12 +439,21 @@ func (c *Collector) SaveMetrics(stats map[string]interface{}) error { c.persistentStats.totalErrors.Store(stats["total_errors"].(int64)) c.persistentStats.totalBytes.Store(stats["total_bytes"].(int64)) + // 在重置前记录当前值 + oldRequests := atomic.LoadInt64(&c.totalRequests) + oldErrors := atomic.LoadInt64(&c.totalErrors) + oldBytes := c.totalBytes.Load() + // 重置当前会话计数器 atomic.StoreInt64(&c.totalRequests, 0) atomic.StoreInt64(&c.totalErrors, 0) c.totalBytes.Store(0) c.latencySum.Store(0) + // 记录重置日志 + log.Printf("Reset counters: requests=%d->0, errors=%d->0, bytes=%d->0", + oldRequests, oldErrors, oldBytes) + // 重置状态码统计 for i := range c.statusStats { c.statusStats[i].Store(0) @@ -459,20 +471,29 @@ func (c *Collector) SaveMetrics(stats map[string]interface{}) error { return true }) + // 更新最后保存时间 + lastSaveTime = time.Now() + return c.db.SaveMetrics(stats) } // LoadRecentStats 加载最近的统计数据 func (c *Collector) LoadRecentStats() error { start := time.Now() - var err error + log.Printf("Starting to load recent stats...") + var err error // 添加重试机制 for retryCount := 0; retryCount < 3; retryCount++ { if err = c.loadRecentStatsInternal(); err == nil { + // 添加数据验证 + if err = c.validateLoadedData(); err != nil { + log.Printf("Data validation failed: %v", err) + continue + } break } - log.Printf("Retry %d: Failed to load stats: %v", retryCount+1, err) + log.Printf("Retry %d/3: Failed to load stats: %v", retryCount+1, err) time.Sleep(time.Second) } @@ -480,69 +501,32 @@ func (c *Collector) LoadRecentStats() error { return fmt.Errorf("failed to load stats after retries: %v", err) } - log.Printf("Loaded all stats in %v", time.Since(start)) + log.Printf("Successfully loaded all stats in %v", time.Since(start)) return nil } // loadRecentStatsInternal 内部加载函数 func (c *Collector) loadRecentStatsInternal() error { - // 加载最近5分钟的数据 - row := c.db.DB.QueryRow(` - SELECT - total_requests, total_errors, total_bytes, avg_latency - FROM metrics_history - WHERE timestamp >= datetime('now', '-5', 'minutes') - ORDER BY timestamp DESC - LIMIT 1 - `) - - var metrics models.HistoricalMetrics - if err := row.Scan( - &metrics.TotalRequests, - &metrics.TotalErrors, - &metrics.TotalBytes, - &metrics.AvgLatency, - ); err != nil && err != sql.ErrNoRows { - return err + loadStart := time.Now() + // 先加载基础指标 + if err := c.loadBasicMetrics(); err != nil { + return fmt.Errorf("failed to load basic metrics: %v", err) } + log.Printf("Loaded basic metrics in %v", time.Since(loadStart)) - // 更新持久化数据 - if metrics.TotalRequests > 0 { - c.persistentStats.totalRequests.Store(metrics.TotalRequests) - c.persistentStats.totalErrors.Store(metrics.TotalErrors) - c.persistentStats.totalBytes.Store(metrics.TotalBytes) - log.Printf("Loaded persistent stats: requests=%d, errors=%d, bytes=%d", - metrics.TotalRequests, metrics.TotalErrors, metrics.TotalBytes) + // 再加载状态码统计 + statusStart := time.Now() + if err := c.loadStatusStats(); err != nil { + return fmt.Errorf("failed to load status stats: %v", err) } + log.Printf("Loaded status codes in %v", time.Since(statusStart)) - // 加载状态码统计 - rows, err := c.db.DB.Query(` - SELECT status_group, SUM(count) as count - FROM status_code_history - WHERE timestamp >= datetime('now', '-5', 'minutes') - GROUP BY status_group - `) - if err != nil { - return err + // 最后加载路径和引用来源统计 + pathStart := time.Now() + if err := c.loadPathAndRefererStats(); err != nil { + return fmt.Errorf("failed to load path and referer stats: %v", err) } - defer rows.Close() - - var totalStatusCodes int64 - for rows.Next() { - var group string - var count int64 - if err := rows.Scan(&group, &count); err != nil { - return err - } - if len(group) > 0 { - idx := (int(group[0]) - '0') - 1 - if idx >= 0 && idx < len(c.statusStats) { - c.statusStats[idx].Store(count) - totalStatusCodes += count - } - } - } - log.Printf("Loaded status codes stats: total=%d", totalStatusCodes) + log.Printf("Loaded path and referer stats in %v", time.Since(pathStart)) return nil } @@ -588,7 +572,7 @@ func (c *Collector) CheckDataConsistency() error { return fmt.Errorf("invalid total_requests: %d", totalReqs) } - // 检查状态码统计 + // 查状态码统计 statusStats := stats["status_code_stats"].(map[string]int64) var statusTotal int64 for _, count := range statusStats { @@ -598,10 +582,20 @@ func (c *Collector) CheckDataConsistency() error { statusTotal += count } + // 检查路径统计 + pathStats := stats["top_paths"].([]models.PathMetrics) + var pathTotal int64 + for _, p := range pathStats { + if p.RequestCount < 0 { + return fmt.Errorf("invalid path request count: %d", p.RequestCount) + } + pathTotal += p.RequestCount + } + // 允许一定的误差 - if abs(statusTotal-totalReqs) > totalReqs/100 { - log.Printf("Warning: status code total (%d) doesn't match total requests (%d)", - statusTotal, totalReqs) + if abs(statusTotal-totalReqs) > totalReqs/100 || abs(pathTotal-totalReqs) > totalReqs/100 { + log.Printf("Warning: status code total (%d) and path total (%d) don't match total requests (%d)", + statusTotal, pathTotal, totalReqs) } return nil @@ -613,3 +607,239 @@ func abs(x int64) int64 { } return x } + +func (c *Collector) validateLoadedData() error { + // 验证基础指标 + if c.persistentStats.totalRequests.Load() < 0 || + c.persistentStats.totalErrors.Load() < 0 || + c.persistentStats.totalBytes.Load() < 0 { + return fmt.Errorf("invalid persistent stats values") + } + + // 验证错误数不能大于总请求数 + if c.persistentStats.totalErrors.Load() > c.persistentStats.totalRequests.Load() { + return fmt.Errorf("total errors exceeds total requests") + } + + // 验证状态码统计 + for i := range c.statusStats { + if c.statusStats[i].Load() < 0 { + return fmt.Errorf("invalid status code count at index %d", i) + } + } + + // 验证路径统计 + var totalPathRequests int64 + c.pathStats.Range(func(_, value interface{}) bool { + stats := value.(*models.PathStats) + if stats.Requests.Load() < 0 || stats.Errors.Load() < 0 { + return false + } + totalPathRequests += stats.Requests.Load() + return true + }) + + // 验证总数一致性 + if totalPathRequests > c.persistentStats.totalRequests.Load() { + return fmt.Errorf("path stats total exceeds total requests") + } + + return nil +} + +// loadBasicMetrics 加载基础指标 +func (c *Collector) loadBasicMetrics() error { + // 加载最近5分钟的数据 + row := c.db.DB.QueryRow(` + SELECT + total_requests, total_errors, total_bytes, avg_latency + FROM metrics_history + WHERE timestamp >= datetime('now', '-24', 'hours') + ORDER BY timestamp DESC + LIMIT 1 + `) + + var metrics models.HistoricalMetrics + if err := row.Scan( + &metrics.TotalRequests, + &metrics.TotalErrors, + &metrics.TotalBytes, + &metrics.AvgLatency, + ); err != nil && err != sql.ErrNoRows { + return err + } + + // 更新持久化数据 + if metrics.TotalRequests > 0 { + c.persistentStats.totalRequests.Store(metrics.TotalRequests) + c.persistentStats.totalErrors.Store(metrics.TotalErrors) + c.persistentStats.totalBytes.Store(metrics.TotalBytes) + log.Printf("Loaded persistent stats: requests=%d, errors=%d, bytes=%d", + metrics.TotalRequests, metrics.TotalErrors, metrics.TotalBytes) + } + return nil +} + +// loadStatusStats 加载状态码统计 +func (c *Collector) loadStatusStats() error { + rows, err := c.db.DB.Query(` + SELECT status_group, SUM(count) as count + FROM status_code_history + WHERE timestamp >= datetime('now', '-24', 'hours') + GROUP BY status_group + `) + if err != nil { + return err + } + defer rows.Close() + + var totalStatusCodes int64 + for rows.Next() { + var group string + var count int64 + if err := rows.Scan(&group, &count); err != nil { + return err + } + if len(group) > 0 { + idx := (int(group[0]) - '0') - 1 + if idx >= 0 && idx < len(c.statusStats) { + c.statusStats[idx].Store(count) + totalStatusCodes += count + } + } + } + return rows.Err() +} + +// loadPathAndRefererStats 加载路径和引用来源统计 +func (c *Collector) loadPathAndRefererStats() error { + // 加载路径统计 + rows, err := c.db.DB.Query(` + SELECT + path, + SUM(request_count) as requests, + SUM(error_count) as errors, + AVG(bytes_transferred) as bytes, + AVG(avg_latency) as latency + FROM popular_paths_history + WHERE timestamp >= datetime('now', '-24', 'hours') + GROUP BY path + ORDER BY requests DESC + LIMIT 10 + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var path string + var requests, errors, bytes int64 + var latency float64 + if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil { + return err + } + stats := &models.PathStats{} + stats.Requests.Store(requests) + stats.Errors.Store(errors) + stats.Bytes.Store(bytes) + stats.LatencySum.Store(int64(latency)) + c.pathStats.Store(path, stats) + } + + if err := rows.Err(); err != nil { + return err + } + + // 加载引用来源统计 + rows, err = c.db.DB.Query(` + SELECT + referer, + SUM(request_count) as requests + FROM referer_history + WHERE timestamp >= datetime('now', '-24', 'hours') + GROUP BY referer + ORDER BY requests DESC + LIMIT 10 + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var referer string + var count int64 + if err := rows.Scan(&referer, &count); err != nil { + return err + } + stats := &models.PathStats{} + stats.Requests.Store(count) + c.refererStats.Store(referer, stats) + } + + return rows.Err() +} + +// SaveBackup 保存数据备份 +func (c *Collector) SaveBackup() error { + stats := c.GetStats() + backupFile := fmt.Sprintf("backup_%s.json", time.Now().Format("20060102_150405")) + + data, err := json.MarshalIndent(stats, "", " ") + if err != nil { + return err + } + + return os.WriteFile(path.Join("data/backup", backupFile), data, 0644) +} + +// LoadBackup 加载备份数据 +func (c *Collector) LoadBackup(backupFile string) error { + data, err := os.ReadFile(path.Join("data/backup", backupFile)) + if err != nil { + return err + } + + var stats map[string]interface{} + if err := json.Unmarshal(data, &stats); err != nil { + return err + } + + return c.RestoreFromBackup(stats) +} + +// RestoreFromBackup 从备份恢复数据 +func (c *Collector) RestoreFromBackup(stats map[string]interface{}) error { + // 恢复基础指标 + if totalReqs, ok := stats["total_requests"].(int64); ok { + c.persistentStats.totalRequests.Store(totalReqs) + } + if totalErrs, ok := stats["total_errors"].(int64); ok { + c.persistentStats.totalErrors.Store(totalErrs) + } + if totalBytes, ok := stats["total_bytes"].(int64); ok { + c.persistentStats.totalBytes.Store(totalBytes) + } + + // 恢复状态码统计 + if statusStats, ok := stats["status_code_stats"].(map[string]int64); ok { + for group, count := range statusStats { + if len(group) > 0 { + idx := (int(group[0]) - '0') - 1 + if idx >= 0 && idx < len(c.statusStats) { + c.statusStats[idx].Store(count) + } + } + } + } + + return nil +} + +// GetLastSaveTime 实现 interfaces.MetricsCollector 接口 +var lastSaveTime time.Time + +func (c *Collector) GetLastSaveTime() time.Time { + return lastSaveTime +} diff --git a/internal/metrics/collector_test.go b/internal/metrics/collector_test.go new file mode 100644 index 0000000..f86b20f --- /dev/null +++ b/internal/metrics/collector_test.go @@ -0,0 +1,35 @@ +package metrics + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +// NewTestCollector creates a new collector for testing +func NewTestCollector() *Collector { + return &Collector{ + startTime: time.Now(), + pathStats: sync.Map{}, + statusStats: [6]atomic.Int64{}, + latencyBuckets: [10]atomic.Int64{}, + } +} + +func TestDataConsistency(t *testing.T) { + c := NewTestCollector() + + // 测试基础指标 + c.RecordRequest("/test", 200, time.Second, 1024, "127.0.0.1", nil) + if err := c.CheckDataConsistency(); err != nil { + t.Errorf("Data consistency check failed: %v", err) + } + + // 测试错误数据 + c.persistentStats.totalErrors.Store(100) + c.persistentStats.totalRequests.Store(50) + if err := c.CheckDataConsistency(); err == nil { + t.Error("Expected error for invalid data") + } +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index c33a97e..00fc2aa 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "proxy-go/internal/constants" + "proxy-go/internal/interfaces" "sync" "sync/atomic" "time" @@ -53,12 +54,14 @@ type Monitor struct { currentWindow atomic.Int32 transferWindow [12]TransferStats currentTWindow atomic.Int32 + collector interfaces.MetricsCollector } -func NewMonitor() *Monitor { +func NewMonitor(collector interfaces.MetricsCollector) *Monitor { m := &Monitor{ - alerts: make(chan Alert, 100), - handlers: make([]AlertHandler, 0), + alerts: make(chan Alert, 100), + handlers: make([]AlertHandler, 0), + collector: collector, } // 初始化第一个窗口 @@ -142,6 +145,23 @@ func (m *Monitor) CheckMetrics(stats map[string]interface{}) { } } } + + // 检查数据一致性 + if err := m.collector.CheckDataConsistency(); err != nil { + m.recordAlert("数据一致性告警", err.Error()) + } + + // 检查错误率 + totalReqs := stats["total_requests"].(int64) + totalErrs := stats["total_errors"].(int64) + if totalReqs > 0 && float64(totalErrs)/float64(totalReqs) > constants.MaxErrorRate { + m.recordAlert("错误率告警", fmt.Sprintf("错误率超过阈值: %.2f%%", float64(totalErrs)/float64(totalReqs)*100)) + } + + // 检查数据保存 + if lastSaveTime := m.collector.GetLastSaveTime(); time.Since(lastSaveTime) > constants.MaxSaveInterval*2 { + m.recordAlert("数据保存告警", "数据保存间隔过长") + } } func (m *Monitor) CheckLatency(latency time.Duration, bytes int64) { @@ -264,3 +284,11 @@ func (m *Monitor) cleanupWindows() { }) } } + +func (m *Monitor) recordAlert(title, message string) { + m.alerts <- Alert{ + Level: AlertLevelError, + Message: fmt.Sprintf("%s: %s", title, message), + Time: time.Now(), + } +}