From c416c76790d754e29f313ead632259709ffd4c1d Mon Sep 17 00:00:00 2001 From: wood chen Date: Thu, 5 Dec 2024 06:47:39 +0800 Subject: [PATCH] feat(metrics): implement full metrics saving and enhance database schema - Added functionality to save full metrics periodically and on application shutdown, improving data persistence. - Introduced new database tables for performance metrics, status code history, popular paths, and referer history, enhancing data tracking capabilities. - Updated the GetRecentMetrics function to utilize window functions for more accurate metrics retrieval and ensure non-negative values. - Improved data handling by ensuring empty records are returned instead of null when no metrics are available. These changes significantly enhance the metrics collection and storage process, providing a more robust framework for performance monitoring. --- internal/metrics/collector.go | 26 +++++ internal/models/metrics.go | 195 ++++++++++++++++++++++++++++++---- internal/utils/signal.go | 17 +++ internal/utils/utils.go | 21 +++- 4 files changed, 237 insertions(+), 22 deletions(-) create mode 100644 internal/utils/signal.go diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index 07873f0..79946eb 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -9,6 +9,7 @@ import ( "proxy-go/internal/constants" "proxy-go/internal/models" "proxy-go/internal/monitor" + "proxy-go/internal/utils" "runtime" "sort" "sync" @@ -78,6 +79,31 @@ func InitCollector(dbPath string, config *config.Config) error { } }() + // 启动每小时保存统计数据 + go func() { + ticker := time.NewTicker(1 * time.Hour) + for range ticker.C { + stats := globalCollector.GetStats() + if err := db.SaveFullMetrics(stats); err != nil { + log.Printf("Error saving full metrics: %v", err) + } else { + log.Printf("Full metrics saved successfully") + } + } + }() + + // 设置程序退出时的处理 + utils.SetupCloseHandler(func() { + log.Println("Saving final metrics before shutdown...") + stats := globalCollector.GetStats() + if err := db.SaveFullMetrics(stats); err != nil { + log.Printf("Error saving final metrics: %v", err) + } else { + log.Printf("Final metrics saved successfully") + } + db.Close() + }) + globalCollector.statsPool = sync.Pool{ New: func() interface{} { return make(map[string]interface{}, 20) diff --git a/internal/models/metrics.go b/internal/models/metrics.go index 1ac00e0..0e1a152 100644 --- a/internal/models/metrics.go +++ b/internal/models/metrics.go @@ -141,6 +141,51 @@ func initTables(db *sql.DB) error { return err } + // 添加新的表 + _, err = db.Exec(` + -- 性能指标表 + CREATE TABLE IF NOT EXISTS performance_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + avg_response_time INTEGER, + requests_per_second REAL, + bytes_per_second REAL + ); + + -- 状态码统计历史表 + CREATE TABLE IF NOT EXISTS status_code_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + status_group TEXT, + count INTEGER + ); + + -- 热门路径历史表 + CREATE TABLE IF NOT EXISTS popular_paths_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + path TEXT, + request_count INTEGER, + error_count INTEGER, + avg_latency TEXT, + bytes_transferred INTEGER + ); + + -- 引用来源历史表 + CREATE TABLE IF NOT EXISTS referer_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + referer TEXT, + request_count INTEGER + ); + + -- 为新表添加索引 + CREATE INDEX IF NOT EXISTS idx_performance_timestamp ON performance_metrics(timestamp); + CREATE INDEX IF NOT EXISTS idx_status_code_history_timestamp ON status_code_history(timestamp); + CREATE INDEX IF NOT EXISTS idx_popular_paths_history_timestamp ON popular_paths_history(timestamp); + CREATE INDEX IF NOT EXISTS idx_referer_history_timestamp ON referer_history(timestamp); + `) + // 启动定期清理任务 go cleanupRoutine(db) @@ -256,37 +301,54 @@ func (db *MetricsDB) Close() error { func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { var interval string if hours <= 24 { - interval = "%Y-%m-%d %H:%M:00" + interval = "%Y-%m-%d %H:%M:00" // 按分钟分组 } else if hours <= 168 { - interval = "%Y-%m-%d %H:00:00" + interval = "%Y-%m-%d %H:00:00" // 按小时分组 } else { - interval = "%Y-%m-%d 00:00:00" + interval = "%Y-%m-%d 00:00:00" // 按天分组 } - // 修改查询,计算每个时间段的增量 + // 修改查询逻辑,使用窗口函数计算差值 rows, err := db.DB.Query(` - WITH grouped_metrics AS ( + WITH time_series AS ( + -- 生成时间序列 + SELECT datetime('now', '-' || ?2 || ' hours', 'localtime') + + (CASE + WHEN ?2 <= 24 THEN (300 * n) -- 5分钟间隔 + WHEN ?2 <= 168 THEN (3600 * n) -- 1小时间隔 + ELSE (86400 * n) -- 1天间隔 + END) AS time_point + FROM ( + SELECT ROW_NUMBER() OVER () - 1 as n + FROM metrics_history + LIMIT (CASE + WHEN ?2 <= 24 THEN ?2 * 12 -- 5分钟间隔 + WHEN ?2 <= 168 THEN ?2 -- 1小时间隔 + ELSE ?2 / 24 -- 1天间隔 + END) + ) + WHERE time_point <= datetime('now', 'localtime') + ), + grouped_metrics AS ( SELECT strftime(?1, timestamp, 'localtime') as group_time, - SUM(total_requests) as total_requests, - SUM(total_errors) as total_errors, - SUM(total_bytes) as total_bytes, - CAST(AVG(CAST(avg_latency AS FLOAT)) AS FLOAT) as avg_latency, - LAG(SUM(total_requests)) OVER (ORDER BY strftime(?1, timestamp, 'localtime')) as prev_requests, - LAG(SUM(total_errors)) OVER (ORDER BY strftime(?1, timestamp, 'localtime')) as prev_errors, - LAG(SUM(total_bytes)) OVER (ORDER BY strftime(?1, timestamp, 'localtime')) as prev_bytes + MAX(total_requests) as period_requests, + MAX(total_errors) as period_errors, + MAX(total_bytes) as period_bytes, + AVG(avg_latency) as avg_latency FROM metrics_history WHERE timestamp >= datetime('now', '-' || ?2 || ' hours', 'localtime') - GROUP BY group_time - ORDER BY group_time DESC + GROUP BY group_time ) SELECT - group_time as timestamp, - COALESCE(total_requests - prev_requests, total_requests) as total_requests, - COALESCE(total_errors - prev_errors, total_errors) as total_errors, - COALESCE(total_bytes - prev_bytes, total_bytes) as total_bytes, - avg_latency - FROM grouped_metrics + strftime(?1, ts.time_point, 'localtime') as timestamp, + COALESCE(m.period_requests - LAG(m.period_requests, 1) OVER (ORDER BY ts.time_point), 0) as total_requests, + COALESCE(m.period_errors - LAG(m.period_errors, 1) OVER (ORDER BY ts.time_point), 0) as total_errors, + COALESCE(m.period_bytes - LAG(m.period_bytes, 1) OVER (ORDER BY ts.time_point), 0) as total_bytes, + COALESCE(m.avg_latency, 0) as avg_latency + FROM time_series ts + LEFT JOIN grouped_metrics m ON strftime(?1, ts.time_point, 'localtime') = m.group_time + ORDER BY timestamp DESC `, interval, hours) if err != nil { return nil, err @@ -306,13 +368,26 @@ func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { if err != nil { return nil, err } + + // 确保数值非负 + if m.TotalRequests < 0 { + m.TotalRequests = 0 + } + if m.TotalErrors < 0 { + m.TotalErrors = 0 + } + if m.TotalBytes < 0 { + m.TotalBytes = 0 + } + + // 计算错误率 if m.TotalRequests > 0 { m.ErrorRate = float64(m.TotalErrors) / float64(m.TotalRequests) } metrics = append(metrics, m) } - // 如果没有数据,返回一个空的记录而不是 null + // 如果没有数据,返回一个空的记录 if len(metrics) == 0 { now := time.Now() metrics = append(metrics, HistoricalMetrics{ @@ -327,3 +402,81 @@ func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { return metrics, rows.Err() } + +func (db *MetricsDB) SaveFullMetrics(stats map[string]interface{}) error { + tx, err := db.DB.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + // 保存性能指标 + _, err = tx.Exec(` + INSERT INTO performance_metrics ( + avg_response_time, + requests_per_second, + bytes_per_second + ) VALUES (?, ?, ?)`, + stats["avg_latency"], + stats["requests_per_second"], + stats["bytes_per_second"], + ) + if err != nil { + return err + } + + // 保存状态码统计 + statusStats := stats["status_code_stats"].(map[string]int64) + stmt, err := tx.Prepare(` + INSERT INTO status_code_history (status_group, count) + VALUES (?, ?) + `) + if err != nil { + return err + } + defer stmt.Close() + + for group, count := range statusStats { + if _, err := stmt.Exec(group, count); err != nil { + return err + } + } + + // 保存热门路径 + pathStats := stats["top_paths"].([]PathMetrics) + stmt, err = tx.Prepare(` + INSERT INTO popular_paths_history ( + path, request_count, error_count, avg_latency, bytes_transferred + ) VALUES (?, ?, ?, ?, ?) + `) + if err != nil { + return err + } + + for _, p := range pathStats { + if _, err := stmt.Exec( + p.Path, p.RequestCount, p.ErrorCount, + p.AvgLatency, p.BytesTransferred, + ); err != nil { + return err + } + } + + // 保存引用来源 + refererStats := stats["top_referers"].([]PathMetrics) + stmt, err = tx.Prepare(` + INSERT INTO referer_history (referer, request_count) + VALUES (?, ?) + `) + if err != nil { + return err + } + + for _, r := range refererStats { + if _, err := stmt.Exec(r.Path, r.RequestCount); err != nil { + return err + } + } + + return tx.Commit() +} diff --git a/internal/utils/signal.go b/internal/utils/signal.go new file mode 100644 index 0000000..61f985a --- /dev/null +++ b/internal/utils/signal.go @@ -0,0 +1,17 @@ +package utils + +import ( + "os" + "os/signal" + "syscall" +) + +func SetupCloseHandler(callback func()) { + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + callback() + os.Exit(0) + }() +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 082852b..e894b11 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -8,6 +8,7 @@ import ( "net/http" "path/filepath" "proxy-go/internal/config" + "sort" "strings" "sync" "time" @@ -32,12 +33,30 @@ func init() { ticker := time.NewTicker(time.Minute) for range ticker.C { now := time.Now() + var items []struct { + key interface{} + timestamp time.Time + } sizeCache.Range(func(key, value interface{}) bool { - if cache := value.(fileSizeCache); now.Sub(cache.timestamp) > cacheTTL { + cache := value.(fileSizeCache) + if now.Sub(cache.timestamp) > cacheTTL { sizeCache.Delete(key) + } else { + items = append(items, struct { + key interface{} + timestamp time.Time + }{key, cache.timestamp}) } return true }) + if len(items) > maxCacheSize { + sort.Slice(items, func(i, j int) bool { + return items[i].timestamp.Before(items[j].timestamp) + }) + for i := 0; i < len(items)/2; i++ { + sizeCache.Delete(items[i].key) + } + } } }() }