diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 2e35b27..2fcdf96 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -47,6 +47,10 @@ var ( StatusRetention = 30 * 24 * time.Hour // 状态码统计保留30天 PathRetention = 7 * 24 * time.Hour // 路径统计保留7天 RefererRetention = 7 * 24 * time.Hour // 引用来源保留7天 + + // 性能监控阈值 + MaxRequestsPerMinute = 1000 // 每分钟最大请求数 + MaxBytesPerMinute = 100 * 1024 * 1024 // 每分钟最大流量 (100MB) ) // UpdateFromConfig 从配置文件更新常量 diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index e41f17a..6ac81c5 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -48,6 +48,12 @@ type Collector struct { var globalCollector *Collector +const ( + // 数据变化率阈值 + highThreshold = 0.8 // 高变化率阈值 + lowThreshold = 0.2 // 低变化率阈值 +) + func InitCollector(dbPath string, config *config.Config) error { db, err := models.NewMetricsDB(dbPath) if err != nil { @@ -87,23 +93,37 @@ func InitCollector(dbPath string, config *config.Config) error { // 启动定时保存 go func() { - // 避免所有实例同时保存 time.Sleep(time.Duration(rand.Int63n(60)) * time.Second) - ticker := time.NewTicker(10 * time.Minute) + var ( + saveInterval = 10 * time.Minute + minInterval = 5 * time.Minute + maxInterval = 15 * time.Minute + ) + ticker := time.NewTicker(saveInterval) + lastChangeTime := time.Now() + for range ticker.C { stats := globalCollector.GetStats() start := time.Now() - // 保存基础指标和完整统计数据 - if err := db.SaveMetrics(stats); err != nil { - log.Printf("Error saving metrics: %v", err) - } else { - log.Printf("Basic metrics saved successfully") + // 根据数据变化频率动态调整保存间隔 + changeRate := calculateChangeRate(stats) + // 避免频繁调整 + if time.Since(lastChangeTime) > time.Minute { + if changeRate > highThreshold && saveInterval > minInterval { + saveInterval = saveInterval - time.Minute + lastChangeTime = time.Now() + } else if changeRate < lowThreshold && saveInterval < maxInterval { + saveInterval = saveInterval + time.Minute + lastChangeTime = time.Now() + } + ticker.Reset(saveInterval) + log.Printf("Adjusted save interval to %v (change rate: %.2f)", + saveInterval, changeRate) } - // 同时保存完整统计数据 - if err := db.SaveFullMetrics(stats); err != nil { - log.Printf("Error saving full metrics: %v", err) + if err := db.SaveAllMetrics(stats); err != nil { + log.Printf("Error saving metrics: %v", err) } else { log.Printf("Metrics saved in %v", time.Since(start)) } @@ -117,19 +137,12 @@ func InitCollector(dbPath string, config *config.Config) error { time.Sleep(time.Second) stats := globalCollector.GetStats() - if err := db.SaveMetrics(stats); err != nil { + if err := db.SaveAllMetrics(stats); err != nil { log.Printf("Error saving final metrics: %v", err) } else { log.Printf("Basic metrics saved successfully") } - // 保存完整统计数据 - if err := db.SaveFullMetrics(stats); err != nil { - log.Printf("Error saving full metrics: %v", err) - } else { - log.Printf("Full metrics saved successfully") - } - // 等待数据写入完成 time.Sleep(time.Second) @@ -451,6 +464,28 @@ func (c *Collector) SaveMetrics(stats map[string]interface{}) error { // LoadRecentStats 加载最近的统计数据 func (c *Collector) LoadRecentStats() error { + start := time.Now() + var err error + + // 添加重试机制 + for retryCount := 0; retryCount < 3; retryCount++ { + if err = c.loadRecentStatsInternal(); err == nil { + break + } + log.Printf("Retry %d: Failed to load stats: %v", retryCount+1, err) + time.Sleep(time.Second) + } + + if err != nil { + return fmt.Errorf("failed to load stats after retries: %v", err) + } + + log.Printf("Loaded all stats in %v", time.Since(start)) + return nil +} + +// loadRecentStatsInternal 内部加载函数 +func (c *Collector) loadRecentStatsInternal() error { // 加载最近5分钟的数据 row := c.db.DB.QueryRow(` SELECT @@ -476,6 +511,8 @@ func (c *Collector) LoadRecentStats() error { c.persistentStats.totalRequests.Store(metrics.TotalRequests) c.persistentStats.totalErrors.Store(metrics.TotalErrors) c.persistentStats.totalBytes.Store(metrics.TotalBytes) + log.Printf("Loaded persistent stats: requests=%d, errors=%d, bytes=%d", + metrics.TotalRequests, metrics.TotalErrors, metrics.TotalBytes) } // 加载状态码统计 @@ -490,6 +527,7 @@ func (c *Collector) LoadRecentStats() error { } defer rows.Close() + var totalStatusCodes int64 for rows.Next() { var group string var count int64 @@ -500,82 +538,11 @@ func (c *Collector) LoadRecentStats() error { idx := (int(group[0]) - '0') - 1 if idx >= 0 && idx < len(c.statusStats) { c.statusStats[idx].Store(count) + totalStatusCodes += count } } } - - if err := rows.Err(); err != nil { - return fmt.Errorf("error scanning status code rows: %v", err) - } - - // 加载路径统计 - rows, err = c.db.DB.Query(` - SELECT - path, - SUM(request_count) as requests, - SUM(error_count) as errors, - AVG(bytes_transferred) as bytes, - AVG(avg_latency) as latency - FROM popular_paths_history - WHERE timestamp >= datetime('now', '-5', 'minutes') - GROUP BY path - ORDER BY requests DESC - LIMIT 10 - `) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var path string - var requests, errors, bytes int64 - var latency float64 - if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil { - return err - } - stats := &models.PathStats{} - stats.Requests.Store(requests) - stats.Errors.Store(errors) - stats.Bytes.Store(bytes) - stats.LatencySum.Store(int64(latency)) - c.pathStats.Store(path, stats) - } - - if err := rows.Err(); err != nil { - return fmt.Errorf("error scanning path stats rows: %v", err) - } - - // 加载引用来源统计 - rows, err = c.db.DB.Query(` - SELECT - referer, - SUM(request_count) as requests - FROM referer_history - WHERE timestamp >= datetime('now', '-5', 'minutes') - GROUP BY referer - ORDER BY requests DESC - LIMIT 10 - `) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var referer string - var requests int64 - if err := rows.Scan(&referer, &requests); err != nil { - return err - } - stats := &models.PathStats{} - stats.Requests.Store(requests) - c.refererStats.Store(referer, stats) - } - - if err := rows.Err(); err != nil { - return fmt.Errorf("error scanning referer stats rows: %v", err) - } + log.Printf("Loaded status codes stats: total=%d", totalStatusCodes) return nil } @@ -586,3 +553,63 @@ func formatAvgLatency(latencySum, requests int64) string { } return FormatDuration(time.Duration(latencySum / requests)) } + +// calculateChangeRate 计算数据变化率 +func calculateChangeRate(stats map[string]interface{}) float64 { + // 获取当前值 + currentReqs, _ := stats["total_requests"].(int64) + currentErrs, _ := stats["total_errors"].(int64) + currentBytes, _ := stats["total_bytes"].(int64) + + // 计算变化率 (可以根据实际需求调整计算方法) + var changeRate float64 + if currentReqs > 0 { + // 计算请求数的变化率 + reqRate := float64(currentReqs) / float64(constants.MaxRequestsPerMinute) + // 计算错误率的变化 + errRate := float64(currentErrs) / float64(currentReqs) + // 计算流量的变化率 + bytesRate := float64(currentBytes) / float64(constants.MaxBytesPerMinute) + + // 综合评分 + changeRate = (reqRate + errRate + bytesRate) / 3 + } + + return changeRate +} + +// CheckDataConsistency 检查数据一致性 +func (c *Collector) CheckDataConsistency() error { + stats := c.GetStats() + + // 检查基础指标 + totalReqs := stats["total_requests"].(int64) + if totalReqs < 0 { + return fmt.Errorf("invalid total_requests: %d", totalReqs) + } + + // 检查状态码统计 + statusStats := stats["status_code_stats"].(map[string]int64) + var statusTotal int64 + for _, count := range statusStats { + if count < 0 { + return fmt.Errorf("invalid status code count: %d", count) + } + statusTotal += count + } + + // 允许一定的误差 + if abs(statusTotal-totalReqs) > totalReqs/100 { + log.Printf("Warning: status code total (%d) doesn't match total requests (%d)", + statusTotal, totalReqs) + } + + return nil +} + +func abs(x int64) int64 { + if x < 0 { + return -x + } + return x +} diff --git a/internal/models/metrics.go b/internal/models/metrics.go index 1cfd7d3..caeb422 100644 --- a/internal/models/metrics.go +++ b/internal/models/metrics.go @@ -369,21 +369,13 @@ func getDBSize(db *sql.DB) int64 { } func (db *MetricsDB) SaveMetrics(stats map[string]interface{}) error { - // 设置事务优化参数 - 移到事务外 - if _, err := db.DB.Exec("PRAGMA synchronous = NORMAL"); err != nil { - return fmt.Errorf("failed to set synchronous mode: %v", err) - } - if _, err := db.DB.Exec("PRAGMA journal_mode = WAL"); err != nil { - return fmt.Errorf("failed to set journal mode: %v", err) - } - tx, err := db.DB.Begin() if err != nil { return err } defer tx.Rollback() - // 使用预处理语句提高性能 + // 保存基础指标 stmt, err := tx.Prepare(` INSERT INTO metrics_history ( total_requests, total_errors, total_bytes, avg_latency, error_rate @@ -430,6 +422,61 @@ func (db *MetricsDB) SaveMetrics(stats map[string]interface{}) error { return fmt.Errorf("failed to save metrics: %v", err) } + // 保存状态码统计 + statusStats := stats["status_code_stats"].(map[string]int64) + values := make([]string, 0, len(statusStats)) + args := make([]interface{}, 0, len(statusStats)*2) + + for group, count := range statusStats { + values = append(values, "(?, ?)") + args = append(args, group, count) + } + + query := "INSERT INTO status_code_history (status_group, count) VALUES " + + strings.Join(values, ",") + + if _, err := tx.Exec(query, args...); err != nil { + return err + } + + // 保存热门路径 + pathStats := stats["top_paths"].([]PathMetrics) + pathStmt, err := tx.Prepare(` + INSERT INTO popular_paths_history ( + path, request_count, error_count, avg_latency, bytes_transferred + ) VALUES (?, ?, ?, ?, ?) + `) + if err != nil { + return err + } + defer pathStmt.Close() + + for _, p := range pathStats { + if _, err := pathStmt.Exec( + p.Path, p.RequestCount, p.ErrorCount, + p.AvgLatency, p.BytesTransferred, + ); err != nil { + return err + } + } + + // 保存引用来源 + refererStats := stats["top_referers"].([]PathMetrics) + refererStmt, err := tx.Prepare(` + INSERT INTO referer_history (referer, request_count) + VALUES (?, ?) + `) + if err != nil { + return err + } + defer refererStmt.Close() + + for _, r := range refererStats { + if _, err := refererStmt.Exec(r.Path, r.RequestCount); err != nil { + return err + } + } + return tx.Commit() } @@ -885,3 +932,117 @@ func (db *MetricsDB) LoadRecentStats(statusStats *[6]atomic.Int64, pathStats *sy return nil } + +// SaveAllMetrics 合并所有指标的保存 +func (db *MetricsDB) SaveAllMetrics(stats map[string]interface{}) error { + start := time.Now() + tx, err := db.DB.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + // 保存基础指标 + stmt, err := tx.Prepare(` + INSERT INTO metrics_history ( + total_requests, total_errors, total_bytes, avg_latency, error_rate + ) VALUES (?, ?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare statement: %v", err) + } + defer stmt.Close() + + // 类型断言检查 + totalReqs, ok := stats["total_requests"].(int64) + if !ok { + return fmt.Errorf("invalid total_requests type") + } + totalErrs, ok := stats["total_errors"].(int64) + if !ok { + return fmt.Errorf("invalid total_errors type") + } + totalBytes, ok := stats["total_bytes"].(int64) + if !ok { + return fmt.Errorf("invalid total_bytes type") + } + avgLatency, ok := stats["avg_latency"].(int64) + if !ok { + return fmt.Errorf("invalid avg_latency type") + } + + // 计算错误率 + var errorRate float64 + if totalReqs > 0 { + errorRate = float64(totalErrs) / float64(totalReqs) + } + + // 保存基础指标 + _, err = stmt.Exec(totalReqs, totalErrs, totalBytes, avgLatency, errorRate) + if err != nil { + return fmt.Errorf("failed to save basic metrics: %v", err) + } + + // 保存状态码统计 + statusStats := stats["status_code_stats"].(map[string]int64) + values := make([]string, 0, len(statusStats)) + args := make([]interface{}, 0, len(statusStats)*2) + for group, count := range statusStats { + values = append(values, "(?, ?)") + args = append(args, group, count) + } + if len(values) > 0 { + query := "INSERT INTO status_code_history (status_group, count) VALUES " + + strings.Join(values, ",") + if _, err := tx.Exec(query, args...); err != nil { + return fmt.Errorf("failed to save status stats: %v", err) + } + } + + // 保存路径统计 + if pathStats, ok := stats["top_paths"].([]PathMetrics); ok && len(pathStats) > 0 { + pathStmt, err := tx.Prepare(` + INSERT INTO popular_paths_history ( + path, request_count, error_count, avg_latency, bytes_transferred + ) VALUES (?, ?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare path statement: %v", err) + } + defer pathStmt.Close() + + for _, p := range pathStats { + if _, err := pathStmt.Exec( + p.Path, p.RequestCount, p.ErrorCount, + p.AvgLatency, p.BytesTransferred, + ); err != nil { + return fmt.Errorf("failed to save path stats: %v", err) + } + } + } + + // 保存引用来源 + if refererStats, ok := stats["top_referers"].([]PathMetrics); ok && len(refererStats) > 0 { + refererStmt, err := tx.Prepare(` + INSERT INTO referer_history (referer, request_count) + VALUES (?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare referer statement: %v", err) + } + defer refererStmt.Close() + + for _, r := range refererStats { + if _, err := refererStmt.Exec(r.Path, r.RequestCount); err != nil { + return fmt.Errorf("failed to save referer stats: %v", err) + } + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %v", err) + } + + log.Printf("Saved all metrics in %v", time.Since(start)) + return nil +}