diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 2814e9b..2e35b27 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -41,6 +41,12 @@ var ( // 单位常量 KB int64 = 1024 MB int64 = 1024 * KB + + // 不同类型数据的保留时间 + MetricsRetention = 90 * 24 * time.Hour // 基础指标保留90天 + StatusRetention = 30 * 24 * time.Hour // 状态码统计保留30天 + PathRetention = 7 * 24 * time.Hour // 路径统计保留7天 + RefererRetention = 7 * 24 * time.Hour // 引用来源保留7天 ) // UpdateFromConfig 从配置文件更新常量 diff --git a/internal/handler/metrics.go b/internal/handler/metrics.go index 54b1252..f8c6c53 100644 --- a/internal/handler/metrics.go +++ b/internal/handler/metrics.go @@ -392,6 +392,7 @@ var metricsTemplate = ` + gap: 20px; + align-items: center; + flex-wrap: wrap; ++ padding: 10px; + } + + #statusCodes .metric { @@ -404,6 +405,8 @@ var metricsTemplate = ` + border-radius: 20px; + margin: 0; + border: none; ++ min-width: 100px; ++ justify-content: space-between; + } + .status-badge { @@ -929,7 +932,7 @@ var metricsTemplate = ` btn.addEventListener('click', function() { document.querySelectorAll('.time-btn').forEach(b => b.classList.remove('active')); this.classList.add('active'); - loadHistoryData(parseInt(this.dataset.hours)); + loadHistoryData(parseFloat(this.dataset.hours)); }); }); @@ -1069,15 +1072,14 @@ func (h *ProxyHandler) MetricsAuthHandler(w http.ResponseWriter, r *http.Request // 添加历史数据查询接口 func (h *ProxyHandler) MetricsHistoryHandler(w http.ResponseWriter, r *http.Request) { - hours := 24 // 默认24小时 - if h := r.URL.Query().Get("hours"); h != "" { - if parsed, err := strconv.Atoi(h); err == nil && parsed > 0 { - hours = parsed - } + hours := r.URL.Query().Get("hours") + hoursFloat, err := strconv.ParseFloat(hours, 64) + if err != nil { + hoursFloat = 24.0 } collector := metrics.GetCollector() - metrics, err := collector.GetDB().GetRecentMetrics(hours) + metrics, err := collector.GetDB().GetRecentMetrics(hoursFloat) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/handler/proxy.go b/internal/handler/proxy.go index ad6670d..7a5ea33 100644 --- a/internal/handler/proxy.go +++ b/internal/handler/proxy.go @@ -228,7 +228,12 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error reading response", http.StatusInternalServerError) return } - written, _ := w.Write(body) + written, err := w.Write(body) + if err != nil { + if !isConnectionClosed(err) { + log.Printf("Error writing response: %v", err) + } + } collector.RecordRequest(r.URL.Path, resp.StatusCode, time.Since(start), int64(written), utils.GetClientIP(r), r) } else { // 大响应使用流式传输 @@ -288,3 +293,13 @@ func copyHeader(dst, src http.Header) { } } } + +// 添加辅助函数判断是否是连接关闭错误 +func isConnectionClosed(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "broken pipe") || + strings.Contains(err.Error(), "connection reset by peer") || + strings.Contains(err.Error(), "protocol wrong type for socket") +} diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index b36848a..81d302b 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -1,6 +1,7 @@ package metrics import ( + "database/sql" "fmt" "log" "net/http" @@ -69,6 +70,11 @@ func InitCollector(dbPath string, config *config.Config) error { lastMetrics.TotalRequests, lastMetrics.TotalErrors, lastMetrics.TotalBytes) } + // 加载最近5分钟的统计数据 + if err := globalCollector.LoadRecentStats(); err != nil { + log.Printf("Warning: Failed to load recent stats: %v", err) + } + globalCollector.cache = cache.NewCache(constants.CacheTTL) globalCollector.monitor = monitor.NewMonitor() @@ -274,10 +280,11 @@ func (c *Collector) GetStats() map[string]interface{} { // 延迟指标 currentTotalRequests := atomic.LoadInt64(&c.totalRequests) - if currentTotalRequests > 0 { - stats["avg_latency"] = c.latencySum.Load() / currentTotalRequests - } else { + latencySum := c.latencySum.Load() + if currentTotalRequests <= 0 || latencySum <= 0 { stats["avg_latency"] = int64(0) + } else { + stats["avg_latency"] = latencySum / currentTotalRequests } // 状态码统计 @@ -300,7 +307,7 @@ func (c *Collector) GetStats() map[string]interface{} { Path: key.(string), RequestCount: stats.Requests.Load(), ErrorCount: stats.Errors.Load(), - AvgLatency: FormatDuration(time.Duration(stats.LatencySum.Load() / stats.Requests.Load())), + AvgLatency: formatAvgLatency(stats.LatencySum.Load(), stats.Requests.Load()), BytesTransferred: stats.Bytes.Load(), }) return true @@ -441,3 +448,141 @@ func (c *Collector) SaveMetrics(stats map[string]interface{}) error { return c.db.SaveMetrics(stats) } + +// LoadRecentStats 加载最近的统计数据 +func (c *Collector) LoadRecentStats() error { + // 加载最近5分钟的数据 + row := c.db.DB.QueryRow(` + SELECT + total_requests, total_errors, total_bytes, avg_latency + FROM metrics_history + WHERE timestamp >= datetime('now', '-5', 'minutes') + ORDER BY timestamp DESC + LIMIT 1 + `) + + var metrics models.HistoricalMetrics + if err := row.Scan( + &metrics.TotalRequests, + &metrics.TotalErrors, + &metrics.TotalBytes, + &metrics.AvgLatency, + ); err != nil && err != sql.ErrNoRows { + return err + } + + // 更新持久化数据 + if metrics.TotalRequests > 0 { + c.persistentStats.totalRequests.Store(metrics.TotalRequests) + c.persistentStats.totalErrors.Store(metrics.TotalErrors) + c.persistentStats.totalBytes.Store(metrics.TotalBytes) + } + + // 加载状态码统计 + rows, err := c.db.DB.Query(` + SELECT status_group, SUM(count) as count + FROM status_code_history + WHERE timestamp >= datetime('now', '-5', 'minutes') + GROUP BY status_group + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var group string + var count int64 + if err := rows.Scan(&group, &count); err != nil { + return err + } + if len(group) > 0 { + idx := (int(group[0]) - '0') - 1 + if idx >= 0 && idx < len(c.statusStats) { + c.statusStats[idx].Store(count) + } + } + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error scanning status code rows: %v", err) + } + + // 加载路径统计 + rows, err = c.db.DB.Query(` + SELECT + path, + SUM(request_count) as requests, + SUM(error_count) as errors, + AVG(bytes_transferred) as bytes, + AVG(avg_latency) as latency + FROM popular_paths_history + WHERE timestamp >= datetime('now', '-5', 'minutes') + GROUP BY path + ORDER BY requests DESC + LIMIT 10 + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var path string + var requests, errors, bytes int64 + var latency float64 + if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil { + return err + } + stats := &models.PathStats{} + stats.Requests.Store(requests) + stats.Errors.Store(errors) + stats.Bytes.Store(bytes) + stats.LatencySum.Store(int64(latency)) + c.pathStats.Store(path, stats) + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error scanning path stats rows: %v", err) + } + + // 加载引用来源统计 + rows, err = c.db.DB.Query(` + SELECT + referer, + SUM(request_count) as requests + FROM referer_history + WHERE timestamp >= datetime('now', '-5', 'minutes') + GROUP BY referer + ORDER BY requests DESC + LIMIT 10 + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var referer string + var requests int64 + if err := rows.Scan(&referer, &requests); err != nil { + return err + } + stats := &models.PathStats{} + stats.Requests.Store(requests) + c.refererStats.Store(referer, stats) + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error scanning referer stats rows: %v", err) + } + + return nil +} + +func formatAvgLatency(latencySum, requests int64) string { + if requests <= 0 || latencySum <= 0 { + return "0 ms" + } + return FormatDuration(time.Duration(latencySum / requests)) +} diff --git a/internal/models/metrics.go b/internal/models/metrics.go index da87c2a..750de89 100644 --- a/internal/models/metrics.go +++ b/internal/models/metrics.go @@ -46,7 +46,13 @@ type PathMetrics struct { } type MetricsDB struct { - DB *sql.DB + DB *sql.DB + stats struct { + queries atomic.Int64 + slowQueries atomic.Int64 + errors atomic.Int64 + lastError atomic.Value // string + } } type PerformanceMetrics struct { @@ -68,11 +74,21 @@ func NewMetricsDB(dbPath string) (*MetricsDB, error) { db.SetConnMaxLifetime(time.Hour) // 设置数据库优化参数 - db.Exec("PRAGMA busy_timeout = 5000") // 设置忙等待超时 - db.Exec("PRAGMA journal_mode = WAL") // 使用 WAL 模式提高并发性能 - db.Exec("PRAGMA synchronous = NORMAL") // 在保证安全的前提下提高性能 - db.Exec("PRAGMA cache_size = -2000") // 使用2MB缓存 - db.Exec("PRAGMA temp_store = MEMORY") // 临时表使用内存 + if _, err := db.Exec("PRAGMA busy_timeout = 5000"); err != nil { + return nil, fmt.Errorf("failed to set busy_timeout: %v", err) + } + if _, err := db.Exec("PRAGMA journal_mode = WAL"); err != nil { + return nil, fmt.Errorf("failed to set journal_mode: %v", err) + } + if _, err := db.Exec("PRAGMA synchronous = NORMAL"); err != nil { + return nil, fmt.Errorf("failed to set synchronous: %v", err) + } + if _, err := db.Exec("PRAGMA cache_size = -2000"); err != nil { + return nil, fmt.Errorf("failed to set cache_size: %v", err) + } + if _, err := db.Exec("PRAGMA temp_store = MEMORY"); err != nil { + return nil, fmt.Errorf("failed to set temp_store: %v", err) + } // 创建必要的表 if err := initTables(db); err != nil { @@ -80,11 +96,13 @@ func NewMetricsDB(dbPath string) (*MetricsDB, error) { return nil, err } - return &MetricsDB{DB: db}, nil + mdb := &MetricsDB{DB: db} + mdb.stats.lastError.Store("") + return mdb, nil } func initTables(db *sql.DB) error { - // 创建指标历史表 + // 创指标历史表 _, err := db.Exec(` CREATE TABLE IF NOT EXISTS metrics_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -135,7 +153,7 @@ func initTables(db *sql.DB) error { CREATE INDEX IF NOT EXISTS idx_status_timestamp ON status_stats(timestamp); CREATE INDEX IF NOT EXISTS idx_path_timestamp ON path_stats(timestamp); - -- 复合索引,用于优化聚合查询 + -- 复合引,用于优化聚合查询 CREATE INDEX IF NOT EXISTS idx_metrics_timestamp_values ON metrics_history( timestamp, total_requests, @@ -163,7 +181,7 @@ func initTables(db *sql.DB) error { } // 添加新的表 - _, err = db.Exec(` + if _, err := db.Exec(` -- 性能指标表 CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -205,7 +223,36 @@ func initTables(db *sql.DB) error { CREATE INDEX IF NOT EXISTS idx_status_code_history_timestamp ON status_code_history(timestamp); CREATE INDEX IF NOT EXISTS idx_popular_paths_history_timestamp ON popular_paths_history(timestamp); CREATE INDEX IF NOT EXISTS idx_referer_history_timestamp ON referer_history(timestamp); - `) + `); err != nil { + return err + } + + // 添加新的索引 + if _, err := db.Exec(` + -- 为性能指标表添加复合索引 + CREATE INDEX IF NOT EXISTS idx_performance_metrics_composite ON performance_metrics( + timestamp, + avg_response_time, + requests_per_second, + bytes_per_second + ); + + -- 为状态码历史表添加复合索引 + CREATE INDEX IF NOT EXISTS idx_status_code_history_composite ON status_code_history( + timestamp, + status_group, + count + ); + + -- 为热门路径历史表添加复合索引 + CREATE INDEX IF NOT EXISTS idx_popular_paths_history_composite ON popular_paths_history( + timestamp, + path, + request_count + ); + `); err != nil { + return err + } // 启动定期清理任务 go cleanupRoutine(db) @@ -226,7 +273,10 @@ func cleanupRoutine(db *sql.DB) { // 检查数据库大小 var dbSize int64 row := db.QueryRow("SELECT page_count * page_size FROM pragma_page_count, pragma_page_size") - row.Scan(&dbSize) + if err := row.Scan(&dbSize); err != nil { + log.Printf("Error getting database size: %v", err) + continue + } log.Printf("Current database size: %s", FormatBytes(uint64(dbSize))) tx, err := db.Begin() @@ -236,33 +286,49 @@ func cleanupRoutine(db *sql.DB) { } // 优化清理性能 - tx.Exec("PRAGMA synchronous = NORMAL") - tx.Exec("PRAGMA journal_mode = WAL") - tx.Exec("PRAGMA temp_store = MEMORY") - tx.Exec("PRAGMA cache_size = -2000") - - // 先清理索引 - tx.Exec("ANALYZE") - tx.Exec("PRAGMA optimize") - - cutoff := time.Now().Add(-constants.DataRetention) - tables := []string{ - "metrics_history", - "status_stats", - "path_stats", - "performance_metrics", - "status_code_history", - "popular_paths_history", - "referer_history", + if _, err := tx.Exec("PRAGMA synchronous = NORMAL"); err != nil { + log.Printf("Error setting synchronous mode: %v", err) + } + if _, err := tx.Exec("PRAGMA journal_mode = WAL"); err != nil { + log.Printf("Error setting journal mode: %v", err) + } + if _, err := tx.Exec("PRAGMA temp_store = MEMORY"); err != nil { + log.Printf("Error setting temp store: %v", err) + } + if _, err := tx.Exec("PRAGMA cache_size = -2000"); err != nil { + log.Printf("Error setting cache size: %v", err) } - for _, table := range tables { - // 使用批量删除提高性能 + // 先清理索引 + if _, err := tx.Exec("ANALYZE"); err != nil { + log.Printf("Error running ANALYZE: %v", err) + } + if _, err := tx.Exec("PRAGMA optimize"); err != nil { + log.Printf("Error running optimize: %v", err) + } + + // 使用不同的保留时间清理不同类型的数据 + cleanupTables := []struct { + table string + retention time.Duration + }{ + {"metrics_history", constants.MetricsRetention}, + {"performance_metrics", constants.MetricsRetention}, + {"status_code_history", constants.StatusRetention}, + {"status_stats", constants.StatusRetention}, + {"popular_paths_history", constants.PathRetention}, + {"path_stats", constants.PathRetention}, + {"referer_history", constants.RefererRetention}, + } + + for _, t := range cleanupTables { + cutoff := time.Now().Add(-t.retention) + // 使用批删除提高性能 for { - result, err := tx.Exec(`DELETE FROM `+table+` WHERE timestamp < ? LIMIT 1000`, cutoff) + result, err := tx.Exec(`DELETE FROM `+t.table+` WHERE timestamp < ? LIMIT 1000`, cutoff) if err != nil { tx.Rollback() - log.Printf("Error cleaning %s: %v", table, err) + log.Printf("Error cleaning %s: %v", t.table, err) break } rows, _ := result.RowsAffected() @@ -276,9 +342,15 @@ func cleanupRoutine(db *sql.DB) { if err := tx.Commit(); err != nil { log.Printf("Error committing cleanup transaction: %v", err) } else { - log.Printf("Cleaned up %d old records in %v, freed %s", - totalDeleted, time.Since(start), - FormatBytes(uint64(dbSize-getDBSize(db)))) + newSize := getDBSize(db) + if newSize == 0 { + log.Printf("Cleaned up %d old records in %v, but failed to get new DB size", + totalDeleted, time.Since(start)) + } else { + log.Printf("Cleaned up %d old records in %v, freed %s", + totalDeleted, time.Since(start), + FormatBytes(uint64(dbSize-newSize))) + } } } } @@ -287,7 +359,10 @@ func cleanupRoutine(db *sql.DB) { func getDBSize(db *sql.DB) int64 { var size int64 row := db.QueryRow("SELECT page_count * page_size FROM pragma_page_count, pragma_page_size") - row.Scan(&size) + if err := row.Scan(&size); err != nil { + log.Printf("Error getting database size: %v", err) + return 0 + } return size } @@ -298,68 +373,59 @@ func (db *MetricsDB) SaveMetrics(stats map[string]interface{}) error { } defer tx.Rollback() - // 保存基础指标 - _, err = tx.Exec(` - INSERT INTO metrics_history ( - total_requests, total_errors, total_bytes, avg_latency - ) VALUES (?, ?, ?, ?)`, - stats["total_requests"], stats["total_errors"], - stats["total_bytes"], stats["avg_latency"], - ) - if err != nil { - return err + // 设置事务优化参数 + if _, err := tx.Exec("PRAGMA synchronous = NORMAL"); err != nil { + return fmt.Errorf("failed to set synchronous mode: %v", err) + } + if _, err := tx.Exec("PRAGMA journal_mode = WAL"); err != nil { + return fmt.Errorf("failed to set journal mode: %v", err) } - // 保存状态码统计 - statusStats := stats["status_code_stats"].(map[string]int64) + // 使用预处理语句提高性能 stmt, err := tx.Prepare(` - INSERT INTO status_stats (status_group, count) - VALUES (?, ?) - `) - if err != nil { - return err - } - defer stmt.Close() - - for group, count := range statusStats { - if _, err := stmt.Exec(group, count); err != nil { - return err - } - } - - // 保存路径统计 - pathStats := stats["top_paths"].([]PathMetrics) - stmt, err = tx.Prepare(` - INSERT INTO path_stats ( - path, requests, errors, bytes, avg_latency + INSERT INTO metrics_history ( + total_requests, total_errors, total_bytes, avg_latency, error_rate ) VALUES (?, ?, ?, ?, ?) `) if err != nil { - return err + return fmt.Errorf("failed to prepare statement: %v", err) + } + defer stmt.Close() + + // 类型断言检查 + totalReqs, ok := stats["total_requests"].(int64) + if !ok { + return fmt.Errorf("invalid total_requests type") + } + totalErrs, ok := stats["total_errors"].(int64) + if !ok { + return fmt.Errorf("invalid total_errors type") + } + totalBytes, ok := stats["total_bytes"].(int64) + if !ok { + return fmt.Errorf("invalid total_bytes type") + } + avgLatency, ok := stats["avg_latency"].(float64) + if !ok { + return fmt.Errorf("invalid avg_latency type") } - for _, p := range pathStats { - if _, err := stmt.Exec( - p.Path, p.RequestCount, p.ErrorCount, - p.BytesTransferred, p.AvgLatency, - ); err != nil { - return err - } + // 计算错误率 + var errorRate float64 + if totalReqs > 0 { + errorRate = float64(totalErrs) / float64(totalReqs) } - // 同时保存性能指标 - _, err = tx.Exec(` - INSERT INTO performance_metrics ( - avg_response_time, - requests_per_second, - bytes_per_second - ) VALUES (?, ?, ?)`, - stats["avg_latency"], - stats["requests_per_second"], - stats["bytes_per_second"], + // 保存基础指标 + _, err = stmt.Exec( + totalReqs, + totalErrs, + totalBytes, + int64(avgLatency), + errorRate, ) if err != nil { - return err + return fmt.Errorf("failed to save metrics: %v", err) } return tx.Commit() @@ -369,68 +435,107 @@ func (db *MetricsDB) Close() error { return db.DB.Close() } -func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { - // 设置查询优化参数 - db.DB.Exec("PRAGMA temp_store = MEMORY") - db.DB.Exec("PRAGMA cache_size = -4000") // 使用4MB缓存 - db.DB.Exec("PRAGMA mmap_size = 268435456") // 使用256MB内存映射 - - var interval string - if hours <= 24 { - if hours <= 1 { - interval = "%Y-%m-%d %H:%M:00" // 按分钟分组 - } else { - interval = "%Y-%m-%d %H:00:00" // 按小时分组 - } - } else if hours <= 168 { - interval = "%Y-%m-%d %H:00:00" // 按小时分组 - } else { - interval = "%Y-%m-%d 00:00:00" // 按天分组 +func (db *MetricsDB) GetRecentMetrics(hours float64) ([]HistoricalMetrics, error) { + start := time.Now() + var queryStats struct { + rowsProcessed int + cacheHits int64 + cacheSize int64 } - // 修改查询逻辑,使用窗口函数计算差值 + // 设置查询优化参数 + tx, err := db.DB.Begin() + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + if _, err := tx.Exec("PRAGMA temp_store = MEMORY"); err != nil { + return nil, fmt.Errorf("failed to set temp_store: %v", err) + } + if _, err := tx.Exec("PRAGMA cache_size = -4000"); err != nil { + return nil, fmt.Errorf("failed to set cache_size: %v", err) + } + if _, err := tx.Exec("PRAGMA mmap_size = 268435456"); err != nil { + return nil, fmt.Errorf("failed to set mmap_size: %v", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("failed to commit transaction: %v", err) + } + + // 计算查询时间范围 + endTime := time.Now() + startTime := endTime.Add(-time.Duration(hours * float64(time.Hour.Nanoseconds()))) + + // 格式化时间 + startTimeStr := startTime.Format("2006-01-02 15:04:05") + endTimeStr := endTime.Format("2006-01-02 15:04:05") + + // 处理小于1小时的情况 + if hours <= 0 { + hours = 0.5 // 30分钟 + } + + // 计算合适的时间间隔 + var interval string + var timeStep int + switch { + case hours <= 0.5: // 30分钟 + interval = "%Y-%m-%d %H:%M:00" + timeStep = 1 // 1分钟 + case hours <= 1: + interval = "%Y-%m-%d %H:%M:00" + timeStep = 1 // 1分钟 + case hours <= 24: + interval = "%Y-%m-%d %H:%M:00" + timeStep = 5 // 5分钟 + case hours <= 168: + interval = "%Y-%m-%d %H:00:00" + timeStep = 60 // 1小时 + default: + interval = "%Y-%m-%d 00:00:00" + timeStep = 1440 // 1天 + } + + // 获取查询统计 + row := db.DB.QueryRow("SELECT cache_hits, cache_misses, cache_size FROM pragma_stats") + var cacheMisses int64 + row.Scan(&queryStats.cacheHits, &cacheMisses, &queryStats.cacheSize) + + // 修改查询逻辑 rows, err := db.DB.Query(` - WITH time_series AS ( - -- 生成时间序列 - SELECT datetime('now', 'localtime') - - (CASE - WHEN ?2 <= 24 THEN (300 * n) -- 5分钟间隔 - WHEN ?2 <= 168 THEN (3600 * n) -- 1小时间隔 - ELSE (86400 * n) -- 1天间隔 - END) AS time_point - FROM ( - SELECT ROW_NUMBER() OVER () - 1 as n - FROM metrics_history - LIMIT (CASE - WHEN ?2 <= 24 THEN ?2 * 12 -- 5分钟间隔 - WHEN ?2 <= 168 THEN ?2 -- 1小时间隔 - ELSE ?2 / 24 -- 1天间隔 - END) - ) - WHERE time_point >= datetime('now', '-' || ?2 || ' hours', 'localtime') - AND time_point <= datetime('now', 'localtime') + WITH RECURSIVE + time_series(time_point) AS ( + SELECT datetime(?, 'localtime') + UNION ALL + SELECT datetime(time_point, '-' || ? || ' minutes') + FROM time_series + WHERE time_point > datetime(?, 'localtime') + LIMIT 1000 ), grouped_metrics AS ( SELECT - strftime(?1, timestamp, 'localtime') as group_time, + strftime(?, timestamp, 'localtime') as group_time, MAX(total_requests) as period_requests, MAX(total_errors) as period_errors, MAX(total_bytes) as period_bytes, AVG(avg_latency) as avg_latency FROM metrics_history - WHERE timestamp >= datetime('now', '-' || ?2 || ' hours', 'localtime') + WHERE timestamp BETWEEN ? AND ? GROUP BY group_time ) SELECT - strftime(?1, ts.time_point, 'localtime') as timestamp, + strftime(?, ts.time_point, 'localtime') as timestamp, COALESCE(m.period_requests - LAG(m.period_requests, 1) OVER (ORDER BY ts.time_point), 0) as total_requests, COALESCE(m.period_errors - LAG(m.period_errors, 1) OVER (ORDER BY ts.time_point), 0) as total_errors, COALESCE(m.period_bytes - LAG(m.period_bytes, 1) OVER (ORDER BY ts.time_point), 0) as total_bytes, COALESCE(m.avg_latency, 0) as avg_latency FROM time_series ts - LEFT JOIN grouped_metrics m ON strftime(?1, ts.time_point, 'localtime') = m.group_time + LEFT JOIN grouped_metrics m ON strftime(?, ts.time_point, 'localtime') = m.group_time ORDER BY timestamp DESC - `, interval, hours) + LIMIT 1000 + `, endTimeStr, timeStep, startTimeStr, interval, startTimeStr, endTimeStr, interval) if err != nil { return nil, err } @@ -468,6 +573,15 @@ func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { metrics = append(metrics, m) } + // 记录查询性能 + duration := time.Since(start) + if duration > time.Second { + log.Printf("Slow query warning: GetRecentMetrics(%v hours) took %v "+ + "(rows: %d, cache hits: %d, cache size: %s)", + hours, duration, queryStats.rowsProcessed, + queryStats.cacheHits, FormatBytes(uint64(queryStats.cacheSize))) + } + // 如果没有数据,返回一个空的记录 if len(metrics) == 0 { now := time.Now() @@ -485,6 +599,7 @@ func (db *MetricsDB) GetRecentMetrics(hours int) ([]HistoricalMetrics, error) { } func (db *MetricsDB) SaveFullMetrics(stats map[string]interface{}) error { + start := time.Now() tx, err := db.DB.Begin() if err != nil { return err @@ -495,40 +610,26 @@ func (db *MetricsDB) SaveFullMetrics(stats map[string]interface{}) error { startSize := getDBSize(db.DB) // 优化写入性能 - tx.Exec("PRAGMA synchronous = NORMAL") - tx.Exec("PRAGMA journal_mode = WAL") - tx.Exec("PRAGMA temp_store = MEMORY") - tx.Exec("PRAGMA cache_size = -2000") // 使用2MB缓存 - - // 使用批量插入提高性能 - const batchSize = 100 - - // 预分配语句 - stmts := make([]*sql.Stmt, 0, 4) - defer func() { - for _, stmt := range stmts { - stmt.Close() - } - }() - - // 保存性能指标 - _, err = tx.Exec(` - INSERT INTO performance_metrics ( - avg_response_time, - requests_per_second, - bytes_per_second - ) VALUES (?, ?, ?)`, - stats["avg_latency"], - stats["requests_per_second"], - stats["bytes_per_second"], - ) - if err != nil { - return err + if _, err := tx.Exec("PRAGMA synchronous = NORMAL"); err != nil { + return fmt.Errorf("failed to set synchronous mode: %v", err) + } + if _, err := tx.Exec("PRAGMA journal_mode = WAL"); err != nil { + return fmt.Errorf("failed to set journal mode: %v", err) + } + if _, err := tx.Exec("PRAGMA temp_store = MEMORY"); err != nil { + return fmt.Errorf("failed to set temp store: %v", err) + } + if _, err := tx.Exec("PRAGMA cache_size = -2000"); err != nil { + return fmt.Errorf("failed to set cache size: %v", err) } // 使用事务提高写入性能 - tx.Exec("PRAGMA synchronous = OFF") - tx.Exec("PRAGMA journal_mode = MEMORY") + if _, err := tx.Exec("PRAGMA synchronous = OFF"); err != nil { + return fmt.Errorf("failed to set synchronous mode: %v", err) + } + if _, err := tx.Exec("PRAGMA journal_mode = MEMORY"); err != nil { + return fmt.Errorf("failed to set journal mode: %v", err) + } // 保存状态码统计 statusStats := stats["status_code_stats"].(map[string]int64) @@ -589,10 +690,14 @@ func (db *MetricsDB) SaveFullMetrics(stats map[string]interface{}) error { return err } - // 记录写入的数据量 + // 记录写入的数据量和性能 endSize := getDBSize(db.DB) - log.Printf("Saved metrics: wrote %s to database", - FormatBytes(uint64(endSize-startSize))) + duration := time.Since(start) + log.Printf("Saved metrics: wrote %s to database in %v (%.2f MB/s)", + FormatBytes(uint64(endSize-startSize)), + duration, + float64(endSize-startSize)/(1024*1024)/duration.Seconds(), + ) return nil } @@ -676,3 +781,12 @@ func FormatBytes(bytes uint64) string { return fmt.Sprintf("%d Bytes", bytes) } } + +func (db *MetricsDB) GetStats() map[string]interface{} { + return map[string]interface{}{ + "total_queries": db.stats.queries.Load(), + "slow_queries": db.stats.slowQueries.Load(), + "total_errors": db.stats.errors.Load(), + "last_error": db.stats.lastError.Load(), + } +}