diff --git a/data/config.json b/data/config.json
index 7ca302e..c02e22b 100644
--- a/data/config.json
+++ b/data/config.json
@@ -56,6 +56,15 @@
"MediumLatency": "8s",
"LargeLatency": "30s",
"HugeLatency": "300s"
+ },
+ "Performance": {
+ "MaxRequestsPerMinute": 1000,
+ "MaxBytesPerMinute": 104857600,
+ "MaxSaveInterval": "15m"
+ },
+ "Validation": {
+ "max_error_rate": 0.8,
+ "max_data_deviation": 0.01
}
}
}
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 1ee96c3..caca1c4 100644
--- a/go.mod
+++ b/go.mod
@@ -5,21 +5,4 @@ go 1.23.1
require (
github.com/andybalholm/brotli v1.1.1
golang.org/x/time v0.8.0
- modernc.org/sqlite v1.34.2
-)
-
-require (
- github.com/dustin/go-humanize v1.0.1 // indirect
- github.com/google/uuid v1.6.0 // indirect
- github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
- github.com/mattn/go-isatty v0.0.20 // indirect
- github.com/ncruces/go-strftime v0.1.9 // indirect
- github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
- golang.org/x/sys v0.22.0 // indirect
- modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
- modernc.org/libc v1.55.3 // indirect
- modernc.org/mathutil v1.6.0 // indirect
- modernc.org/memory v1.8.0 // indirect
- modernc.org/strutil v1.2.0 // indirect
- modernc.org/token v1.1.0 // indirect
)
diff --git a/go.sum b/go.sum
index 395259e..69caaa3 100644
--- a/go.sum
+++ b/go.sum
@@ -1,55 +1,6 @@
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
-github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
-github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
-github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
-github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
-github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
-github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
-github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
-github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
-github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
-github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
-github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
-github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
-golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
-golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
-golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
-golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
-golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
-golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
-modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
-modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
-modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
-modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s=
-modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
-modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
-modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
-modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
-modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI=
-modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4=
-modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
-modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
-modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
-modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
-modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
-modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
-modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
-modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
-modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
-modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
-modernc.org/sqlite v1.34.2 h1:J9n76TPsfYYkFkZ9Uy1QphILYifiVEwwOT7yP5b++2Y=
-modernc.org/sqlite v1.34.2/go.mod h1:dnR723UrTtjKpoHCAMN0Q/gZ9MT4r+iRvIBb9umWFkU=
-modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
-modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
-modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
-modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
diff --git a/internal/config/types.go b/internal/config/types.go
index 4522085..b900677 100644
--- a/internal/config/types.go
+++ b/internal/config/types.go
@@ -59,6 +59,12 @@ type MetricsConfig struct {
LargeLatency time.Duration `json:"LargeLatency"` // 大文件最大延迟
HugeLatency time.Duration `json:"HugeLatency"` // 超大文件最大延迟
} `json:"Latency"`
+ // 性能监控配置
+ Performance struct {
+ MaxRequestsPerMinute int64 `json:"MaxRequestsPerMinute"`
+ MaxBytesPerMinute int64 `json:"MaxBytesPerMinute"`
+ MaxSaveInterval time.Duration `json:"MaxSaveInterval"`
+ } `json:"Performance"`
// 加载配置
Load struct {
RetryCount int `json:"retry_count"`
diff --git a/internal/constants/constants.go b/internal/constants/constants.go
index 7008fd1..fe2d4da 100644
--- a/internal/constants/constants.go
+++ b/internal/constants/constants.go
@@ -10,11 +10,6 @@ var (
CacheTTL = 5 * time.Minute // 缓存过期时间
MaxCacheSize = 10000 // 最大缓存大小
- // 数据库相关
- CleanupInterval = 24 * time.Hour // 清理间隔
- DataRetention = 90 * 24 * time.Hour // 数据保留时间
- BatchSize = 100 // 批量写入大小
-
// 指标相关
MetricsInterval = 5 * time.Minute // 指标收集间隔
MaxPathsStored = 1000 // 最大存储路径数
@@ -42,29 +37,14 @@ var (
KB int64 = 1024
MB int64 = 1024 * KB
- // 不同类型数据的保留时间
- MetricsRetention = 90 * 24 * time.Hour // 基础指标保留90天
- StatusRetention = 30 * 24 * time.Hour // 状态码统计保留30天
- PathRetention = 7 * 24 * time.Hour // 路径统计保留7天
- RefererRetention = 7 * 24 * time.Hour // 引用来源保留7天
-
- // 性能监控阈值
- MaxRequestsPerMinute = 1000 // 每分钟最大请求数
- MaxBytesPerMinute = 100 * 1024 * 1024 // 每分钟最大流量 (100MB)
-
- // 数据加载相关
- LoadRetryCount = 3 // 加载重试次数
- LoadRetryInterval = time.Second // 重试间隔
- LoadTimeout = 30 * time.Second // 加载超时时间
-
- // 数据保存相关
- MinSaveInterval = 5 * time.Minute // 最小保存间隔
- MaxSaveInterval = 15 * time.Minute // 最大保存间隔
- DefaultSaveInterval = 10 * time.Minute // 默认保存间隔
-
// 数据验证相关
MaxErrorRate = 0.8 // 最大错误率
MaxDataDeviation = 0.01 // 最大数据偏差(1%)
+
+ // 性能监控阈值
+ MaxRequestsPerMinute int64 = 1000 // 每分钟最大请求数
+ MaxBytesPerMinute int64 = 100 * 1024 * 1024 // 每分钟最大流量 (100MB)
+ MaxSaveInterval = 15 * time.Minute // 最大保存间隔
)
// UpdateFromConfig 从配置文件更新常量
@@ -112,28 +92,6 @@ func UpdateFromConfig(cfg *config.Config) {
HugeFileLatency = cfg.Metrics.Latency.HugeLatency
}
- // 数据加载相关
- if cfg.Metrics.Load.RetryCount > 0 {
- LoadRetryCount = cfg.Metrics.Load.RetryCount
- }
- if cfg.Metrics.Load.RetryInterval > 0 {
- LoadRetryInterval = cfg.Metrics.Load.RetryInterval
- }
- if cfg.Metrics.Load.Timeout > 0 {
- LoadTimeout = cfg.Metrics.Load.Timeout
- }
-
- // 数据保存相关
- if cfg.Metrics.Save.MinInterval > 0 {
- MinSaveInterval = cfg.Metrics.Save.MinInterval
- }
- if cfg.Metrics.Save.MaxInterval > 0 {
- MaxSaveInterval = cfg.Metrics.Save.MaxInterval
- }
- if cfg.Metrics.Save.DefaultInterval > 0 {
- DefaultSaveInterval = cfg.Metrics.Save.DefaultInterval
- }
-
// 数据验证相关
if cfg.Metrics.Validation.MaxErrorRate > 0 {
MaxErrorRate = cfg.Metrics.Validation.MaxErrorRate
@@ -141,4 +99,15 @@ func UpdateFromConfig(cfg *config.Config) {
if cfg.Metrics.Validation.MaxDataDeviation > 0 {
MaxDataDeviation = cfg.Metrics.Validation.MaxDataDeviation
}
+
+ // 性能监控阈值
+ if cfg.Metrics.Performance.MaxRequestsPerMinute > 0 {
+ MaxRequestsPerMinute = cfg.Metrics.Performance.MaxRequestsPerMinute
+ }
+ if cfg.Metrics.Performance.MaxBytesPerMinute > 0 {
+ MaxBytesPerMinute = cfg.Metrics.Performance.MaxBytesPerMinute
+ }
+ if cfg.Metrics.Performance.MaxSaveInterval > 0 {
+ MaxSaveInterval = cfg.Metrics.Performance.MaxSaveInterval
+ }
}
diff --git a/internal/errors/errors.go b/internal/errors/errors.go
index a54c576..15adeab 100644
--- a/internal/errors/errors.go
+++ b/internal/errors/errors.go
@@ -3,8 +3,7 @@ package errors
type ErrorCode int
const (
- ErrDatabase ErrorCode = iota + 1
- ErrInvalidConfig
+ ErrInvalidConfig ErrorCode = iota + 1
ErrRateLimit
ErrMetricsCollection
)
diff --git a/internal/handler/metrics.go b/internal/handler/metrics.go
index d3275cb..a976638 100644
--- a/internal/handler/metrics.go
+++ b/internal/handler/metrics.go
@@ -2,11 +2,11 @@ package handler
import (
"encoding/json"
+ "fmt"
"log"
"net/http"
"proxy-go/internal/metrics"
"proxy-go/internal/models"
- "strconv"
"strings"
"time"
)
@@ -533,7 +533,7 @@ var metricsTemplate = `
@@ -600,40 +600,8 @@ var metricsTemplate = `
导出数据
-
-
-
-
-
请求数
-
-
-
-
错误率%
-
-
-
-
流量MB
-
-
-
+
+ 显示最近30天的数据
@@ -912,7 +880,7 @@ var metricsTemplate = `
}
function updateChart(canvasId, chartKey, labels, data, label, valueGetter, color, options) {
- // 确保 canvas 元素存在
+ // 保 canvas 元素存在
const canvas = document.getElementById(canvasId);
if (!canvas) {
console.error('Canvas element ' + canvasId + ' not found');
@@ -1090,23 +1058,37 @@ func (h *ProxyHandler) MetricsAuthHandler(w http.ResponseWriter, r *http.Request
// 添加历史数据查询接口
func (h *ProxyHandler) MetricsHistoryHandler(w http.ResponseWriter, r *http.Request) {
- hours := r.URL.Query().Get("hours")
- hoursFloat, err := strconv.ParseFloat(hours, 64)
- if err != nil {
- hoursFloat = 24.0
- }
-
collector := metrics.GetCollector()
- metrics, err := collector.GetDB().GetRecentMetrics(hoursFloat)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
+ metrics := collector.GetHistoricalData()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
}
+// 辅助函数:解析延迟字符串
+func parseLatency(latency string) (float64, error) {
+ var value float64
+ var unit string
+ _, err := fmt.Sscanf(latency, "%f %s", &value, &unit)
+ if err != nil {
+ return 0, err
+ }
+
+ // 根据单位转换为毫秒
+ switch unit {
+ case "μs":
+ value = value / 1000 // 微秒转毫秒
+ case "ms":
+ // 已经是毫秒
+ case "s":
+ value = value * 1000 // 秒转毫秒
+ default:
+ return 0, fmt.Errorf("unknown unit: %s", unit)
+ }
+
+ return value, nil
+}
+
// 添加安全的类型转换辅助函数
func safeStatusCodeStats(v interface{}) map[string]int64 {
if v == nil {
diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go
index b0d9b1f..155d9b6 100644
--- a/internal/metrics/collector.go
+++ b/internal/metrics/collector.go
@@ -1,11 +1,9 @@
package metrics
import (
- "database/sql"
"encoding/json"
"fmt"
"log"
- "math/rand"
"net/http"
"os"
"path"
@@ -14,7 +12,6 @@ import (
"proxy-go/internal/constants"
"proxy-go/internal/models"
"proxy-go/internal/monitor"
- "proxy-go/internal/utils"
"runtime"
"sort"
"sync"
@@ -23,17 +20,12 @@ import (
)
type Collector struct {
- startTime time.Time
- activeRequests int64
- totalRequests int64
- totalErrors int64
- totalBytes atomic.Int64
- latencySum atomic.Int64
- persistentStats struct {
- totalRequests atomic.Int64
- totalErrors atomic.Int64
- totalBytes atomic.Int64
- }
+ startTime time.Time
+ activeRequests int64
+ totalRequests int64
+ totalErrors int64
+ totalBytes atomic.Int64
+ latencySum atomic.Int64
pathStats sync.Map
refererStats sync.Map
statusStats [6]atomic.Int64
@@ -43,10 +35,15 @@ type Collector struct {
items [1000]*models.RequestLog
cursor atomic.Int64
}
- db *models.MetricsDB
cache *cache.Cache
monitor *monitor.Monitor
statsPool sync.Pool
+
+ // 添加历史数据存储
+ historicalData struct {
+ sync.RWMutex
+ items []models.HistoricalMetrics
+ }
}
var globalCollector *Collector
@@ -57,27 +54,21 @@ const (
lowThreshold = 0.2 // 低变化率阈值
)
-func InitCollector(dbPath string, config *config.Config) error {
- db, err := models.NewMetricsDB(dbPath)
- if err != nil {
- return err
- }
-
+func InitCollector(config *config.Config) error {
globalCollector = &Collector{
startTime: time.Now(),
pathStats: sync.Map{},
statusStats: [6]atomic.Int64{},
latencyBuckets: [10]atomic.Int64{},
- db: db,
}
- // 1. 先初始化 cache
+ // 初始化 cache
globalCollector.cache = cache.NewCache(constants.CacheTTL)
- // 2. 初始化监控器
+ // 初始化监控器
globalCollector.monitor = monitor.NewMonitor(globalCollector)
- // 3. 如果配置了飞书webhook,则启用飞书告警
+ // 如果配置了飞书webhook,则启用飞书告警
if config.Metrics.FeishuWebhook != "" {
globalCollector.monitor.AddHandler(
monitor.NewFeishuHandler(config.Metrics.FeishuWebhook),
@@ -85,93 +76,24 @@ func InitCollector(dbPath string, config *config.Config) error {
log.Printf("Feishu alert enabled")
}
- // 4. 初始化对象池
+ // 初始化对象池
globalCollector.statsPool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{}, 20)
},
}
- // 5. 设置最后保存时间
+ // 设置最后保存时间
lastSaveTime = time.Now()
- // 6. 加载历史数据
- if lastMetrics, err := db.GetLastMetrics(); err == nil && lastMetrics != nil {
- globalCollector.persistentStats.totalRequests.Store(lastMetrics.TotalRequests)
- globalCollector.persistentStats.totalErrors.Store(lastMetrics.TotalErrors)
- globalCollector.persistentStats.totalBytes.Store(lastMetrics.TotalBytes)
+ // 初始化历史数据存储
+ globalCollector.historicalData.items = make([]models.HistoricalMetrics, 0, 1000)
- // 确保在加载历史数据后立即保存一次,以更新所有统计信息
- stats := globalCollector.GetStats()
- if err := db.SaveAllMetrics(stats); err != nil {
- log.Printf("Warning: Failed to save initial metrics: %v", err)
- }
+ // 启动定期保存历史数据的goroutine
+ go globalCollector.recordHistoricalData()
- if err := globalCollector.LoadRecentStats(); err != nil {
- log.Printf("Warning: Failed to load recent stats: %v", err)
- }
- log.Printf("Loaded historical metrics: requests=%d, errors=%d, bytes=%d",
- lastMetrics.TotalRequests, lastMetrics.TotalErrors, lastMetrics.TotalBytes)
- }
-
- // 7. 启动定时保存
- go func() {
- time.Sleep(time.Duration(rand.Int63n(60)) * time.Second)
- var (
- saveInterval = 10 * time.Minute
- minInterval = 5 * time.Minute
- maxInterval = 15 * time.Minute
- )
- ticker := time.NewTicker(saveInterval)
- lastChangeTime := time.Now()
-
- for range ticker.C {
- stats := globalCollector.GetStats()
- start := time.Now()
-
- // 根据数据变化频率动态调整保存间隔
- changeRate := calculateChangeRate(stats)
- // 避免频繁调整
- if time.Since(lastChangeTime) > time.Minute {
- if changeRate > highThreshold && saveInterval > minInterval {
- saveInterval = saveInterval - time.Minute
- lastChangeTime = time.Now()
- } else if changeRate < lowThreshold && saveInterval < maxInterval {
- saveInterval = saveInterval + time.Minute
- lastChangeTime = time.Now()
- }
- ticker.Reset(saveInterval)
- log.Printf("Adjusted save interval to %v (change rate: %.2f)",
- saveInterval, changeRate)
- }
-
- if err := db.SaveAllMetrics(stats); err != nil {
- log.Printf("Error saving metrics: %v", err)
- } else {
- log.Printf("Metrics saved in %v", time.Since(start))
- }
- }
- }()
-
- // 设置程序退出时的处理
- utils.SetupCloseHandler(func() {
- log.Println("Saving final metrics before shutdown...")
- // 确保所有正在进行的操作完成
- time.Sleep(time.Second)
-
- stats := globalCollector.GetStats()
- if err := db.SaveAllMetrics(stats); err != nil {
- log.Printf("Error saving final metrics: %v", err)
- } else {
- log.Printf("Basic metrics saved successfully")
- }
-
- // 等待数据写入完成
- time.Sleep(time.Second)
-
- db.Close()
- log.Println("Database closed successfully")
- })
+ // 启动定期清理历史数据的goroutine
+ go globalCollector.cleanHistoricalData()
return nil
}
@@ -277,36 +199,30 @@ func (c *Collector) GetStats() map[string]interface{} {
}
}
- // 确保所有字段都被初始化
stats := c.statsPool.Get().(map[string]interface{})
defer c.statsPool.Put(stats)
var m runtime.MemStats
runtime.ReadMemStats(&m)
- // 基础指标
uptime := time.Since(c.startTime)
currentRequests := atomic.LoadInt64(&c.totalRequests)
currentErrors := atomic.LoadInt64(&c.totalErrors)
currentBytes := c.totalBytes.Load()
- totalRequests := currentRequests + c.persistentStats.totalRequests.Load()
- totalErrors := currentErrors + c.persistentStats.totalErrors.Load()
- totalBytes := currentBytes + c.persistentStats.totalBytes.Load()
-
// 计算错误率
var errorRate float64
- if totalRequests > 0 {
- errorRate = float64(totalErrors) / float64(totalRequests)
+ if currentRequests > 0 {
+ errorRate = float64(currentErrors) / float64(currentRequests)
}
// 基础指标
stats["uptime"] = uptime.String()
stats["active_requests"] = atomic.LoadInt64(&c.activeRequests)
- stats["total_requests"] = totalRequests
- stats["total_errors"] = totalErrors
+ stats["total_requests"] = currentRequests
+ stats["total_errors"] = currentErrors
stats["error_rate"] = errorRate
- stats["total_bytes"] = totalBytes
+ stats["total_bytes"] = currentBytes
stats["bytes_per_second"] = float64(currentBytes) / Max(uptime.Seconds(), 1)
stats["requests_per_second"] = float64(currentRequests) / Max(uptime.Seconds(), 1)
@@ -444,112 +360,60 @@ func Max(a, b float64) float64 {
return b
}
-func (c *Collector) GetDB() *models.MetricsDB {
- return c.db
-}
-
func (c *Collector) SaveMetrics(stats map[string]interface{}) error {
- // 更新持久化数据
- if totalReqs, ok := stats["total_requests"].(int64); ok {
- c.persistentStats.totalRequests.Store(totalReqs)
- }
- if totalErrs, ok := stats["total_errors"].(int64); ok {
- c.persistentStats.totalErrors.Store(totalErrs)
- }
- if totalBytes, ok := stats["total_bytes"].(int64); ok {
- c.persistentStats.totalBytes.Store(totalBytes)
- }
-
- // 在重置前记录当前值用于日志
- oldRequests := atomic.LoadInt64(&c.totalRequests)
- oldErrors := atomic.LoadInt64(&c.totalErrors)
- oldBytes := c.totalBytes.Load()
-
- // 重置当前会话计数器
- atomic.StoreInt64(&c.totalRequests, 0)
- atomic.StoreInt64(&c.totalErrors, 0)
- c.totalBytes.Store(0)
- c.latencySum.Store(0)
-
- // 重置状态码统计
- for i := range c.statusStats {
- c.statusStats[i].Store(0)
- }
-
- // 重置路径统计
- c.pathStats.Range(func(key, _ interface{}) bool {
- c.pathStats.Delete(key)
- return true
- })
-
- // 重置引用来源统计
- c.refererStats.Range(func(key, _ interface{}) bool {
- c.refererStats.Delete(key)
- return true
- })
-
- // 保存到数据库
- err := c.db.SaveMetrics(stats)
- if err == nil {
- // 记录重置日志
- log.Printf("Reset counters: requests=%d->0, errors=%d->0, bytes=%d->0",
- oldRequests, oldErrors, oldBytes)
- lastSaveTime = time.Now() // 更新最后保存时间
- }
-
- return err
-}
-
-// LoadRecentStats 加载最近的统计数据
-func (c *Collector) LoadRecentStats() error {
- start := time.Now()
- log.Printf("Starting to load recent stats...")
-
- var err error
- // 添加重试机制
- for retryCount := 0; retryCount < 3; retryCount++ {
- if err = c.loadRecentStatsInternal(); err == nil {
- // 添加数据验证
- if err = c.validateLoadedData(); err != nil {
- log.Printf("Data validation failed: %v", err)
- continue
- }
- break
- }
- log.Printf("Retry %d/3: Failed to load stats: %v", retryCount+1, err)
- time.Sleep(time.Second)
- }
-
- if err != nil {
- return fmt.Errorf("failed to load stats after retries: %v", err)
- }
-
- log.Printf("Successfully loaded all stats in %v", time.Since(start))
+ lastSaveTime = time.Now()
return nil
}
-// loadRecentStatsInternal 内部加载函数
-func (c *Collector) loadRecentStatsInternal() error {
- loadStart := time.Now()
- // 先加载基础指标
- if err := c.loadBasicMetrics(); err != nil {
- return fmt.Errorf("failed to load basic metrics: %v", err)
- }
- log.Printf("Loaded basic metrics in %v", time.Since(loadStart))
+// LoadRecentStats 简化为只进行数据验证
+func (c *Collector) LoadRecentStats() error {
+ start := time.Now()
+ log.Printf("Starting to validate stats...")
- // 再加载状态码统计
- statusStart := time.Now()
- if err := c.loadStatusStats(); err != nil {
- return fmt.Errorf("failed to load status stats: %v", err)
+ if err := c.validateLoadedData(); err != nil {
+ return fmt.Errorf("data validation failed: %v", err)
}
- log.Printf("Loaded status codes in %v", time.Since(statusStart))
- // 最后加载路径和引用来源统计
- pathStart := time.Now()
- if err := c.loadPathAndRefererStats(); err != nil {
- return fmt.Errorf("failed to load path and referer stats: %v", err)
+ log.Printf("Successfully validated stats in %v", time.Since(start))
+ return nil
+}
+
+// validateLoadedData 验证当前数据的有效性
+func (c *Collector) validateLoadedData() error {
+ // 验证基础指标
+ if atomic.LoadInt64(&c.totalRequests) < 0 ||
+ atomic.LoadInt64(&c.totalErrors) < 0 ||
+ c.totalBytes.Load() < 0 {
+ return fmt.Errorf("invalid stats values")
+ }
+
+ // 验证错误数不能大于总请求数
+ if atomic.LoadInt64(&c.totalErrors) > atomic.LoadInt64(&c.totalRequests) {
+ return fmt.Errorf("total errors exceeds total requests")
+ }
+
+ // 验证状态码统计
+ for i := range c.statusStats {
+ if c.statusStats[i].Load() < 0 {
+ return fmt.Errorf("invalid status code count at index %d", i)
+ }
+ }
+
+ // 验证路径统计
+ var totalPathRequests int64
+ c.pathStats.Range(func(_, value interface{}) bool {
+ stats := value.(*models.PathStats)
+ if stats.Requests.Load() < 0 || stats.Errors.Load() < 0 {
+ return false
+ }
+ totalPathRequests += stats.Requests.Load()
+ return true
+ })
+
+ // 验证总数一致性
+ if totalPathRequests > atomic.LoadInt64(&c.totalRequests) {
+ return fmt.Errorf("path stats total exceeds total requests")
}
- log.Printf("Loaded path and referer stats in %v", time.Since(pathStart))
return nil
}
@@ -587,7 +451,7 @@ func calculateChangeRate(stats map[string]interface{}) float64 {
// CheckDataConsistency 检查数据一致性
func (c *Collector) CheckDataConsistency() error {
- totalReqs := c.persistentStats.totalRequests.Load() + atomic.LoadInt64(&c.totalRequests)
+ totalReqs := atomic.LoadInt64(&c.totalRequests)
// 检查状态码统计
var statusTotal int64
@@ -636,179 +500,6 @@ func abs(x int64) int64 {
return x
}
-func (c *Collector) validateLoadedData() error {
- // 验证基础指标
- if c.persistentStats.totalRequests.Load() < 0 ||
- c.persistentStats.totalErrors.Load() < 0 ||
- c.persistentStats.totalBytes.Load() < 0 {
- return fmt.Errorf("invalid persistent stats values")
- }
-
- // 验证错误数不能大于总请求数
- if c.persistentStats.totalErrors.Load() > c.persistentStats.totalRequests.Load() {
- return fmt.Errorf("total errors exceeds total requests")
- }
-
- // 验证状态码统计
- for i := range c.statusStats {
- if c.statusStats[i].Load() < 0 {
- return fmt.Errorf("invalid status code count at index %d", i)
- }
- }
-
- // 验证路径统计
- var totalPathRequests int64
- c.pathStats.Range(func(_, value interface{}) bool {
- stats := value.(*models.PathStats)
- if stats.Requests.Load() < 0 || stats.Errors.Load() < 0 {
- return false
- }
- totalPathRequests += stats.Requests.Load()
- return true
- })
-
- // 验证总数一致性
- if totalPathRequests > c.persistentStats.totalRequests.Load() {
- return fmt.Errorf("path stats total exceeds total requests")
- }
-
- return nil
-}
-
-// loadBasicMetrics 加载基础指标
-func (c *Collector) loadBasicMetrics() error {
- // 加载最近5分钟的数据
- row := c.db.DB.QueryRow(`
- SELECT
- total_requests, total_errors, total_bytes, avg_latency
- FROM metrics_history
- WHERE timestamp >= datetime('now', '-24', 'hours')
- ORDER BY timestamp DESC
- LIMIT 1
- `)
-
- var metrics models.HistoricalMetrics
- if err := row.Scan(
- &metrics.TotalRequests,
- &metrics.TotalErrors,
- &metrics.TotalBytes,
- &metrics.AvgLatency,
- ); err != nil && err != sql.ErrNoRows {
- return err
- }
-
- // 更新持久化数据
- if metrics.TotalRequests > 0 {
- c.persistentStats.totalRequests.Store(metrics.TotalRequests)
- c.persistentStats.totalErrors.Store(metrics.TotalErrors)
- c.persistentStats.totalBytes.Store(metrics.TotalBytes)
- log.Printf("Loaded persistent stats: requests=%d, errors=%d, bytes=%d",
- metrics.TotalRequests, metrics.TotalErrors, metrics.TotalBytes)
- }
- return nil
-}
-
-// loadStatusStats 加载状态码统计
-func (c *Collector) loadStatusStats() error {
- rows, err := c.db.DB.Query(`
- SELECT status_group, SUM(count) as count
- FROM status_code_history
- WHERE timestamp >= datetime('now', '-24', 'hours')
- GROUP BY status_group
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- var totalStatusCodes int64
- for rows.Next() {
- var group string
- var count int64
- if err := rows.Scan(&group, &count); err != nil {
- return err
- }
- if len(group) > 0 {
- idx := (int(group[0]) - '0') - 1
- if idx >= 0 && idx < len(c.statusStats) {
- c.statusStats[idx].Store(count)
- totalStatusCodes += count
- }
- }
- }
- return rows.Err()
-}
-
-// loadPathAndRefererStats 加载路径和引用来源统计
-func (c *Collector) loadPathAndRefererStats() error {
- // 加载路径统计
- rows, err := c.db.DB.Query(`
- SELECT
- path,
- SUM(request_count) as requests,
- SUM(error_count) as errors,
- AVG(bytes_transferred) as bytes,
- AVG(avg_latency) as latency
- FROM popular_paths_history
- WHERE timestamp >= datetime('now', '-24', 'hours')
- GROUP BY path
- ORDER BY requests DESC
- LIMIT 10
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- var path string
- var requests, errors, bytes int64
- var latency float64
- if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil {
- return err
- }
- stats := &models.PathStats{}
- stats.Requests.Store(requests)
- stats.Errors.Store(errors)
- stats.Bytes.Store(bytes)
- stats.LatencySum.Store(int64(latency))
- c.pathStats.Store(path, stats)
- }
-
- if err := rows.Err(); err != nil {
- return err
- }
-
- // 加载引用来源统计
- rows, err = c.db.DB.Query(`
- SELECT
- referer,
- SUM(request_count) as requests
- FROM referer_history
- WHERE timestamp >= datetime('now', '-24', 'hours')
- GROUP BY referer
- ORDER BY requests DESC
- LIMIT 10
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- var referer string
- var count int64
- if err := rows.Scan(&referer, &count); err != nil {
- return err
- }
- stats := &models.PathStats{}
- stats.Requests.Store(count)
- c.refererStats.Store(referer, stats)
- }
-
- return rows.Err()
-}
-
// SaveBackup 保存数据备份
func (c *Collector) SaveBackup() error {
stats := c.GetStats()
@@ -841,13 +532,13 @@ func (c *Collector) LoadBackup(backupFile string) error {
func (c *Collector) RestoreFromBackup(stats map[string]interface{}) error {
// 恢复基础指标
if totalReqs, ok := stats["total_requests"].(int64); ok {
- c.persistentStats.totalRequests.Store(totalReqs)
+ atomic.StoreInt64(&c.totalRequests, totalReqs)
}
if totalErrs, ok := stats["total_errors"].(int64); ok {
- c.persistentStats.totalErrors.Store(totalErrs)
+ atomic.StoreInt64(&c.totalErrors, totalErrs)
}
if totalBytes, ok := stats["total_bytes"].(int64); ok {
- c.persistentStats.totalBytes.Store(totalBytes)
+ c.totalBytes.Store(totalBytes)
}
// 恢复状态码统计
@@ -871,3 +562,82 @@ var lastSaveTime time.Time
func (c *Collector) GetLastSaveTime() time.Time {
return lastSaveTime
}
+
+// 定期记录历史数据
+func (c *Collector) recordHistoricalData() {
+ ticker := time.NewTicker(5 * time.Minute)
+ for range ticker.C {
+ stats := c.GetStats()
+
+ metric := models.HistoricalMetrics{
+ Timestamp: time.Now().Format("2006-01-02 15:04:05"),
+ TotalRequests: stats["total_requests"].(int64),
+ TotalErrors: stats["total_errors"].(int64),
+ TotalBytes: stats["total_bytes"].(int64),
+ ErrorRate: stats["error_rate"].(float64),
+ }
+
+ if avgLatencyStr, ok := stats["avg_response_time"].(string); ok {
+ if d, err := parseLatency(avgLatencyStr); err == nil {
+ metric.AvgLatency = d
+ }
+ }
+
+ c.historicalData.Lock()
+ c.historicalData.items = append(c.historicalData.items, metric)
+ c.historicalData.Unlock()
+ }
+}
+
+// 定期清理30天前的数据
+func (c *Collector) cleanHistoricalData() {
+ ticker := time.NewTicker(1 * time.Hour)
+ for range ticker.C {
+ threshold := time.Now().Add(-30 * 24 * time.Hour)
+
+ c.historicalData.Lock()
+ newItems := make([]models.HistoricalMetrics, 0)
+ for _, item := range c.historicalData.items {
+ timestamp, err := time.Parse("2006-01-02 15:04:05", item.Timestamp)
+ if err == nil && timestamp.After(threshold) {
+ newItems = append(newItems, item)
+ }
+ }
+ c.historicalData.items = newItems
+ c.historicalData.Unlock()
+ }
+}
+
+// GetHistoricalData 获取历史数据
+func (c *Collector) GetHistoricalData() []models.HistoricalMetrics {
+ c.historicalData.RLock()
+ defer c.historicalData.RUnlock()
+
+ result := make([]models.HistoricalMetrics, len(c.historicalData.items))
+ copy(result, c.historicalData.items)
+ return result
+}
+
+// parseLatency 解析延迟字符串
+func parseLatency(latency string) (float64, error) {
+ var value float64
+ var unit string
+ _, err := fmt.Sscanf(latency, "%f %s", &value, &unit)
+ if err != nil {
+ return 0, err
+ }
+
+ // 根据单位转换为毫秒
+ switch unit {
+ case "μs":
+ value = value / 1000 // 微秒转毫秒
+ case "ms":
+ // 已经是毫秒
+ case "s":
+ value = value * 1000 // 秒转毫秒
+ default:
+ return 0, fmt.Errorf("unknown unit: %s", unit)
+ }
+
+ return value, nil
+}
diff --git a/internal/metrics/collector_test.go b/internal/metrics/collector_test.go
deleted file mode 100644
index f86b20f..0000000
--- a/internal/metrics/collector_test.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package metrics
-
-import (
- "sync"
- "sync/atomic"
- "testing"
- "time"
-)
-
-// NewTestCollector creates a new collector for testing
-func NewTestCollector() *Collector {
- return &Collector{
- startTime: time.Now(),
- pathStats: sync.Map{},
- statusStats: [6]atomic.Int64{},
- latencyBuckets: [10]atomic.Int64{},
- }
-}
-
-func TestDataConsistency(t *testing.T) {
- c := NewTestCollector()
-
- // 测试基础指标
- c.RecordRequest("/test", 200, time.Second, 1024, "127.0.0.1", nil)
- if err := c.CheckDataConsistency(); err != nil {
- t.Errorf("Data consistency check failed: %v", err)
- }
-
- // 测试错误数据
- c.persistentStats.totalErrors.Store(100)
- c.persistentStats.totalRequests.Store(50)
- if err := c.CheckDataConsistency(); err == nil {
- t.Error("Expected error for invalid data")
- }
-}
diff --git a/internal/models/metrics.go b/internal/models/metrics.go
index caeb422..7a0a3eb 100644
--- a/internal/models/metrics.go
+++ b/internal/models/metrics.go
@@ -1,17 +1,8 @@
package models
import (
- "context"
- "database/sql"
- "fmt"
- "log"
- "proxy-go/internal/constants"
- "strings"
- "sync"
"sync/atomic"
"time"
-
- _ "modernc.org/sqlite"
)
type RequestLog struct {
@@ -46,1003 +37,3 @@ type PathMetrics struct {
AvgLatency string `json:"avg_latency"`
BytesTransferred int64 `json:"bytes_transferred"`
}
-
-type MetricsDB struct {
- DB *sql.DB
- stats struct {
- queries atomic.Int64
- slowQueries atomic.Int64
- errors atomic.Int64
- lastError atomic.Value // string
- }
-}
-
-type PerformanceMetrics struct {
- Timestamp string `json:"timestamp"`
- AvgResponseTime int64 `json:"avg_response_time"`
- RequestsPerSecond float64 `json:"requests_per_second"`
- BytesPerSecond float64 `json:"bytes_per_second"`
-}
-
-func NewMetricsDB(dbPath string) (*MetricsDB, error) {
- db, err := sql.Open("sqlite", dbPath)
- if err != nil {
- return nil, err
- }
-
- // 设置连接池参数
- db.SetMaxOpenConns(1) // SQLite 只支持一个写连接
- db.SetMaxIdleConns(1)
- db.SetConnMaxLifetime(time.Hour)
-
- // 设置数据库优化参数
- if _, err := db.Exec("PRAGMA busy_timeout = 5000"); err != nil {
- return nil, fmt.Errorf("failed to set busy_timeout: %v", err)
- }
- if _, err := db.Exec("PRAGMA journal_mode = WAL"); err != nil {
- return nil, fmt.Errorf("failed to set journal_mode: %v", err)
- }
- if _, err := db.Exec("PRAGMA synchronous = NORMAL"); err != nil {
- return nil, fmt.Errorf("failed to set synchronous: %v", err)
- }
- if _, err := db.Exec("PRAGMA cache_size = -2000"); err != nil {
- return nil, fmt.Errorf("failed to set cache_size: %v", err)
- }
- if _, err := db.Exec("PRAGMA temp_store = MEMORY"); err != nil {
- return nil, fmt.Errorf("failed to set temp_store: %v", err)
- }
-
- // 创建必要的表
- if err := initTables(db); err != nil {
- db.Close()
- return nil, err
- }
-
- mdb := &MetricsDB{DB: db}
- mdb.stats.lastError.Store("")
- return mdb, nil
-}
-
-func initTables(db *sql.DB) error {
- // 创指标历史表
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS metrics_history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- total_requests INTEGER,
- total_errors INTEGER,
- total_bytes INTEGER,
- avg_latency INTEGER,
- error_rate REAL
- )
- `)
- if err != nil {
- return err
- }
-
- // 创建状态码统计表
- _, err = db.Exec(`
- CREATE TABLE IF NOT EXISTS status_stats (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- status_group TEXT,
- count INTEGER
- )
- `)
- if err != nil {
- return err
- }
-
- // 创建路径统计表
- _, err = db.Exec(`
- CREATE TABLE IF NOT EXISTS path_stats (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- path TEXT,
- requests INTEGER,
- errors INTEGER,
- bytes INTEGER,
- avg_latency INTEGER
- )
- `)
- if err != nil {
- return err
- }
-
- // 添加索引以提高查询性能
- _, err = db.Exec(`
- CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics_history(timestamp);
- CREATE INDEX IF NOT EXISTS idx_status_timestamp ON status_stats(timestamp);
- CREATE INDEX IF NOT EXISTS idx_path_timestamp ON path_stats(timestamp);
-
- -- 复合引,用于优化聚合查询
- CREATE INDEX IF NOT EXISTS idx_metrics_timestamp_values ON metrics_history(
- timestamp,
- total_requests,
- total_errors,
- total_bytes,
- avg_latency
- );
-
- -- 路径统计的复合索引
- CREATE INDEX IF NOT EXISTS idx_path_stats_composite ON path_stats(
- timestamp,
- path,
- requests
- );
-
- -- 状态码统计的复合索引
- CREATE INDEX IF NOT EXISTS idx_status_stats_composite ON status_stats(
- timestamp,
- status_group,
- count
- );
- `)
- if err != nil {
- return err
- }
-
- // 添加新的表
- if _, err := db.Exec(`
- -- 性能指标表
- CREATE TABLE IF NOT EXISTS performance_metrics (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- avg_response_time INTEGER,
- requests_per_second REAL,
- bytes_per_second REAL
- );
-
- -- 状态码统计历史表
- CREATE TABLE IF NOT EXISTS status_code_history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- status_group TEXT,
- count INTEGER
- );
-
- -- 热门路径历史表
- CREATE TABLE IF NOT EXISTS popular_paths_history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- path TEXT,
- request_count INTEGER,
- error_count INTEGER,
- avg_latency TEXT,
- bytes_transferred INTEGER
- );
-
- -- 引用来源历史表
- CREATE TABLE IF NOT EXISTS referer_history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- referer TEXT,
- request_count INTEGER
- );
-
- -- 为新表添加索引
- CREATE INDEX IF NOT EXISTS idx_performance_timestamp ON performance_metrics(timestamp);
- CREATE INDEX IF NOT EXISTS idx_status_code_history_timestamp ON status_code_history(timestamp);
- CREATE INDEX IF NOT EXISTS idx_popular_paths_history_timestamp ON popular_paths_history(timestamp);
- CREATE INDEX IF NOT EXISTS idx_referer_history_timestamp ON referer_history(timestamp);
- `); err != nil {
- return err
- }
-
- // 添加新的索引
- if _, err := db.Exec(`
- -- 为性能指标表添加复合索引
- CREATE INDEX IF NOT EXISTS idx_performance_metrics_composite ON performance_metrics(
- timestamp,
- avg_response_time,
- requests_per_second,
- bytes_per_second
- );
-
- -- 为状态码历史表添加复合索引
- CREATE INDEX IF NOT EXISTS idx_status_code_history_composite ON status_code_history(
- timestamp,
- status_group,
- count
- );
-
- -- 为热门路径历史表添加复合索引
- CREATE INDEX IF NOT EXISTS idx_popular_paths_history_composite ON popular_paths_history(
- timestamp,
- path,
- request_count
- );
- `); err != nil {
- return err
- }
-
- // 启动定期清理任务
- go cleanupRoutine(db)
-
- return nil
-}
-
-// 定期清理旧数据
-func cleanupRoutine(db *sql.DB) {
- // 避免在启动时就立即清理
- time.Sleep(5 * time.Minute)
-
- ticker := time.NewTicker(constants.CleanupInterval)
- for range ticker.C {
- start := time.Now()
- var totalDeleted int64
-
- // 检查数据库大小
- var dbSize int64
- row := db.QueryRow("SELECT page_count * page_size FROM pragma_page_count, pragma_page_size")
- if err := row.Scan(&dbSize); err != nil {
- log.Printf("Error getting database size: %v", err)
- continue
- }
- log.Printf("Current database size: %s", FormatBytes(uint64(dbSize)))
-
- tx, err := db.Begin()
- if err != nil {
- log.Printf("Error starting cleanup transaction: %v", err)
- continue
- }
-
- // 优化理性能
- if _, err := tx.Exec("PRAGMA synchronous = NORMAL"); err != nil {
- log.Printf("Error setting synchronous mode: %v", err)
- }
- if _, err := tx.Exec("PRAGMA journal_mode = WAL"); err != nil {
- log.Printf("Error setting journal mode: %v", err)
- }
- if _, err := tx.Exec("PRAGMA temp_store = MEMORY"); err != nil {
- log.Printf("Error setting temp store: %v", err)
- }
- if _, err := tx.Exec("PRAGMA cache_size = -2000"); err != nil {
- log.Printf("Error setting cache size: %v", err)
- }
-
- // 先清理索引
- if _, err := tx.Exec("ANALYZE"); err != nil {
- log.Printf("Error running ANALYZE: %v", err)
- }
- if _, err := tx.Exec("PRAGMA optimize"); err != nil {
- log.Printf("Error running optimize: %v", err)
- }
-
- // 使用不同的保留时间清理不同类型的数据
- cleanupTables := []struct {
- table string
- retention time.Duration
- }{
- {"metrics_history", constants.MetricsRetention},
- {"performance_metrics", constants.MetricsRetention},
- {"status_code_history", constants.StatusRetention},
- {"status_stats", constants.StatusRetention},
- {"popular_paths_history", constants.PathRetention},
- {"path_stats", constants.PathRetention},
- {"referer_history", constants.RefererRetention},
- }
-
- for _, t := range cleanupTables {
- cutoff := time.Now().Add(-t.retention)
- // 使用批删除提高性能
- for {
- result, err := tx.Exec(`DELETE FROM `+t.table+` WHERE timestamp < ? LIMIT 1000`, cutoff)
- if err != nil {
- tx.Rollback()
- log.Printf("Error cleaning %s: %v", t.table, err)
- break
- }
- rows, _ := result.RowsAffected()
- totalDeleted += rows
- if rows < 1000 {
- break
- }
- }
- }
-
- if err := tx.Commit(); err != nil {
- log.Printf("Error committing cleanup transaction: %v", err)
- } else {
- newSize := getDBSize(db)
- if newSize == 0 {
- log.Printf("Cleaned up %d old records in %v, but failed to get new DB size",
- totalDeleted, time.Since(start))
- } else {
- log.Printf("Cleaned up %d old records in %v, freed %s",
- totalDeleted, time.Since(start),
- FormatBytes(uint64(dbSize-newSize)))
- }
- }
- }
-}
-
-// 获取数据库大小
-func getDBSize(db *sql.DB) int64 {
- var size int64
- row := db.QueryRow("SELECT page_count * page_size FROM pragma_page_count, pragma_page_size")
- if err := row.Scan(&size); err != nil {
- log.Printf("Error getting database size: %v", err)
- return 0
- }
- return size
-}
-
-func (db *MetricsDB) SaveMetrics(stats map[string]interface{}) error {
- tx, err := db.DB.Begin()
- if err != nil {
- return err
- }
- defer tx.Rollback()
-
- // 保存基础指标
- stmt, err := tx.Prepare(`
- INSERT INTO metrics_history (
- total_requests, total_errors, total_bytes, avg_latency, error_rate
- ) VALUES (?, ?, ?, ?, ?)
- `)
- if err != nil {
- return fmt.Errorf("failed to prepare statement: %v", err)
- }
- defer stmt.Close()
-
- // 类型断言检查
- totalReqs, ok := stats["total_requests"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_requests type")
- }
- totalErrs, ok := stats["total_errors"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_errors type")
- }
- totalBytes, ok := stats["total_bytes"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_bytes type")
- }
- avgLatency, ok := stats["avg_latency"].(int64)
- if !ok {
- return fmt.Errorf("invalid avg_latency type")
- }
-
- // 计算错误率
- var errorRate float64
- if totalReqs > 0 {
- errorRate = float64(totalErrs) / float64(totalReqs)
- }
-
- // 保存基础指标
- _, err = stmt.Exec(
- totalReqs,
- totalErrs,
- totalBytes,
- avgLatency,
- errorRate,
- )
- if err != nil {
- return fmt.Errorf("failed to save metrics: %v", err)
- }
-
- // 保存状态码统计
- statusStats := stats["status_code_stats"].(map[string]int64)
- values := make([]string, 0, len(statusStats))
- args := make([]interface{}, 0, len(statusStats)*2)
-
- for group, count := range statusStats {
- values = append(values, "(?, ?)")
- args = append(args, group, count)
- }
-
- query := "INSERT INTO status_code_history (status_group, count) VALUES " +
- strings.Join(values, ",")
-
- if _, err := tx.Exec(query, args...); err != nil {
- return err
- }
-
- // 保存热门路径
- pathStats := stats["top_paths"].([]PathMetrics)
- pathStmt, err := tx.Prepare(`
- INSERT INTO popular_paths_history (
- path, request_count, error_count, avg_latency, bytes_transferred
- ) VALUES (?, ?, ?, ?, ?)
- `)
- if err != nil {
- return err
- }
- defer pathStmt.Close()
-
- for _, p := range pathStats {
- if _, err := pathStmt.Exec(
- p.Path, p.RequestCount, p.ErrorCount,
- p.AvgLatency, p.BytesTransferred,
- ); err != nil {
- return err
- }
- }
-
- // 保存引用来源
- refererStats := stats["top_referers"].([]PathMetrics)
- refererStmt, err := tx.Prepare(`
- INSERT INTO referer_history (referer, request_count)
- VALUES (?, ?)
- `)
- if err != nil {
- return err
- }
- defer refererStmt.Close()
-
- for _, r := range refererStats {
- if _, err := refererStmt.Exec(r.Path, r.RequestCount); err != nil {
- return err
- }
- }
-
- return tx.Commit()
-}
-
-func (db *MetricsDB) Close() error {
- return db.DB.Close()
-}
-
-func (db *MetricsDB) GetRecentMetrics(hours float64) ([]HistoricalMetrics, error) {
- start := time.Now()
- var queryStats struct {
- rowsProcessed int
- cacheHits int64
- cacheSize int64
- }
-
- // 添加查询超时
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
-
- // 处理小于1小时的情况
- if hours <= 0 {
- hours = 0.5 // 30分钟
- }
-
- // 计算合适的时间间隔
- var interval string
- var timeStep int
- switch {
- case hours <= 0.5: // 30分钟
- interval = "%Y-%m-%d %H:%M:00"
- timeStep = 1 // 1分钟
- case hours <= 1:
- interval = "%Y-%m-%d %H:%M:00"
- timeStep = 1 // 1分钟
- case hours <= 24:
- interval = "%Y-%m-%d %H:%M:00"
- timeStep = 5 // 5分钟
- case hours <= 168:
- interval = "%Y-%m-%d %H:00:00"
- timeStep = 60 // 1小时
- default:
- interval = "%Y-%m-%d 00:00:00"
- timeStep = 1440 // 1天
- }
-
- // 修改查询逻辑,优化性能
- rows, err := db.DB.QueryContext(ctx, `
- WITH RECURSIVE
- time_points(ts) AS (
- SELECT strftime(?, datetime('now', 'localtime'))
- UNION ALL
- SELECT strftime(?, datetime(ts, '-' || ? || ' minutes'))
- FROM time_points
- WHERE ts > strftime(?, datetime('now', '-' || ? || ' hours', 'localtime'))
- LIMIT ?
- ),
- base_metrics AS (
- -- 获取每个时间点的累计值
- SELECT
- strftime(?, timestamp) as group_time,
- total_requests,
- total_errors,
- total_bytes,
- avg_latency
- FROM metrics_history
- WHERE timestamp >= datetime('now', '-' || ? || ' hours', 'localtime')
- AND timestamp < datetime('now', 'localtime')
- ),
- grouped_metrics AS (
- -- 获取每个时间点的最大值
- SELECT
- group_time,
- MAX(total_requests) as period_requests,
- MAX(total_errors) as period_errors,
- MAX(total_bytes) as period_bytes,
- AVG(avg_latency) as avg_latency
- FROM base_metrics
- GROUP BY group_time
- )
- SELECT
- tp.ts as timestamp,
- -- 计算每个时间点的增量
- COALESCE(
- CASE
- WHEN LAG(m.period_requests) OVER w IS NULL THEN m.period_requests
- ELSE m.period_requests - LAG(m.period_requests) OVER w
- END,
- 0
- ) as total_requests,
- COALESCE(
- CASE
- WHEN LAG(m.period_errors) OVER w IS NULL THEN m.period_errors
- ELSE m.period_errors - LAG(m.period_errors) OVER w
- END,
- 0
- ) as total_errors,
- COALESCE(
- CASE
- WHEN LAG(m.period_bytes) OVER w IS NULL THEN m.period_bytes
- ELSE m.period_bytes - LAG(m.period_bytes) OVER w
- END,
- 0
- ) as total_bytes,
- COALESCE(m.avg_latency, 0) as avg_latency
- FROM time_points tp
- LEFT JOIN grouped_metrics m ON tp.ts = m.group_time
- WINDOW w AS (ORDER BY tp.ts)
- ORDER BY timestamp DESC
- LIMIT ?
- `, interval, interval, timeStep, interval, hours, 1000, interval, hours, 1000)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
-
- var metrics []HistoricalMetrics
- for rows.Next() {
- var m HistoricalMetrics
- err := rows.Scan(
- &m.Timestamp,
- &m.TotalRequests,
- &m.TotalErrors,
- &m.TotalBytes,
- &m.AvgLatency,
- )
- if err != nil {
- return nil, err
- }
-
- // 确保数值非负
- if m.TotalRequests < 0 {
- m.TotalRequests = 0
- }
- if m.TotalErrors < 0 {
- m.TotalErrors = 0
- }
- if m.TotalBytes < 0 {
- m.TotalBytes = 0
- }
-
- // 计算错误率
- if m.TotalRequests > 0 {
- m.ErrorRate = float64(m.TotalErrors) / float64(m.TotalRequests)
- }
- metrics = append(metrics, m)
- }
-
- // 记录查询性能
- duration := time.Since(start)
- if duration > time.Second {
- log.Printf("Slow query warning: GetRecentMetrics(%v hours) took %v "+
- "(rows: %d, cache hits: %d, cache size: %s)",
- hours, duration, queryStats.rowsProcessed,
- queryStats.cacheHits, FormatBytes(uint64(queryStats.cacheSize)))
- }
-
- // 如果没有数据,返回一个空的记录
- if len(metrics) == 0 {
- now := time.Now()
- metrics = append(metrics, HistoricalMetrics{
- Timestamp: now.Format("2006-01-02 15:04:05"),
- TotalRequests: 0,
- TotalErrors: 0,
- TotalBytes: 0,
- ErrorRate: 0,
- AvgLatency: 0,
- })
- }
-
- return metrics, rows.Err()
-}
-
-func (db *MetricsDB) SaveFullMetrics(stats map[string]interface{}) error {
- start := time.Now()
- tx, err := db.DB.Begin()
- if err != nil {
- return err
- }
- defer tx.Rollback()
-
- // 开始时记录数据库大小
- startSize := getDBSize(db.DB)
-
- // 优化写入性能
- if _, err := tx.Exec("PRAGMA synchronous = NORMAL"); err != nil {
- return fmt.Errorf("failed to set synchronous mode: %v", err)
- }
- if _, err := tx.Exec("PRAGMA journal_mode = WAL"); err != nil {
- return fmt.Errorf("failed to set journal mode: %v", err)
- }
- if _, err := tx.Exec("PRAGMA temp_store = MEMORY"); err != nil {
- return fmt.Errorf("failed to set temp store: %v", err)
- }
- if _, err := tx.Exec("PRAGMA cache_size = -2000"); err != nil {
- return fmt.Errorf("failed to set cache size: %v", err)
- }
-
- // 使用事务提高写入性能
- if _, err := tx.Exec("PRAGMA synchronous = OFF"); err != nil {
- return fmt.Errorf("failed to set synchronous mode: %v", err)
- }
- if _, err := tx.Exec("PRAGMA journal_mode = MEMORY"); err != nil {
- return fmt.Errorf("failed to set journal mode: %v", err)
- }
-
- // 保存状态码统计
- statusStats := stats["status_code_stats"].(map[string]int64)
- values := make([]string, 0, len(statusStats))
- args := make([]interface{}, 0, len(statusStats)*2)
-
- for group, count := range statusStats {
- values = append(values, "(?, ?)")
- args = append(args, group, count)
- }
-
- query := "INSERT INTO status_code_history (status_group, count) VALUES " +
- strings.Join(values, ",")
-
- if _, err := tx.Exec(query, args...); err != nil {
- return err
- }
-
- // 保存热门路径
- pathStats := stats["top_paths"].([]PathMetrics)
- pathStmt, err := tx.Prepare(`
- INSERT INTO popular_paths_history (
- path, request_count, error_count, avg_latency, bytes_transferred
- ) VALUES (?, ?, ?, ?, ?)
- `)
- if err != nil {
- return err
- }
- defer pathStmt.Close()
-
- for _, p := range pathStats {
- if _, err := pathStmt.Exec(
- p.Path, p.RequestCount, p.ErrorCount,
- p.AvgLatency, p.BytesTransferred,
- ); err != nil {
- return err
- }
- }
-
- // 保存引用来源
- refererStats := stats["top_referers"].([]PathMetrics)
- refererStmt, err := tx.Prepare(`
- INSERT INTO referer_history (referer, request_count)
- VALUES (?, ?)
- `)
- if err != nil {
- return err
- }
- defer refererStmt.Close()
-
- for _, r := range refererStats {
- if _, err := refererStmt.Exec(r.Path, r.RequestCount); err != nil {
- return err
- }
- }
-
- if err := tx.Commit(); err != nil {
- return err
- }
-
- // 记录写入的数据量和性能
- endSize := getDBSize(db.DB)
- duration := time.Since(start)
- log.Printf("Saved metrics: wrote %s to database in %v (%.2f MB/s)",
- FormatBytes(uint64(endSize-startSize)),
- duration,
- float64(endSize-startSize)/(1024*1024)/duration.Seconds(),
- )
-
- return nil
-}
-
-func (db *MetricsDB) GetLastMetrics() (*HistoricalMetrics, error) {
- row := db.DB.QueryRow(`
- SELECT
- total_requests,
- total_errors,
- total_bytes,
- avg_latency
- FROM metrics_history
- ORDER BY timestamp DESC
- LIMIT 1
- `)
-
- var metrics HistoricalMetrics
- err := row.Scan(
- &metrics.TotalRequests,
- &metrics.TotalErrors,
- &metrics.TotalBytes,
- &metrics.AvgLatency,
- )
- if err == sql.ErrNoRows {
- return nil, nil
- }
- if err != nil {
- return nil, err
- }
- return &metrics, nil
-}
-
-func (db *MetricsDB) GetRecentPerformanceMetrics(hours int) ([]PerformanceMetrics, error) {
- rows, err := db.DB.Query(`
- SELECT
- strftime('%Y-%m-%d %H:%M:00', timestamp, 'localtime') as ts,
- AVG(avg_response_time) as avg_response_time,
- AVG(requests_per_second) as requests_per_second,
- AVG(bytes_per_second) as bytes_per_second
- FROM performance_metrics
- WHERE timestamp >= datetime('now', '-' || ? || ' hours', 'localtime')
- GROUP BY ts
- ORDER BY ts DESC
- `, hours)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
-
- var metrics []PerformanceMetrics
- for rows.Next() {
- var m PerformanceMetrics
- err := rows.Scan(
- &m.Timestamp,
- &m.AvgResponseTime,
- &m.RequestsPerSecond,
- &m.BytesPerSecond,
- )
- if err != nil {
- return nil, err
- }
- metrics = append(metrics, m)
- }
-
- return metrics, rows.Err()
-}
-
-// FormatBytes 格式化字节大小
-func FormatBytes(bytes uint64) string {
- const (
- MB = 1024 * 1024
- KB = 1024
- )
-
- switch {
- case bytes >= MB:
- return fmt.Sprintf("%.2f MB", float64(bytes)/MB)
- case bytes >= KB:
- return fmt.Sprintf("%.2f KB", float64(bytes)/KB)
- default:
- return fmt.Sprintf("%d Bytes", bytes)
- }
-}
-
-func (db *MetricsDB) GetStats() map[string]interface{} {
- return map[string]interface{}{
- "total_queries": db.stats.queries.Load(),
- "slow_queries": db.stats.slowQueries.Load(),
- "total_errors": db.stats.errors.Load(),
- "last_error": db.stats.lastError.Load(),
- }
-}
-
-func (db *MetricsDB) LoadRecentStats(statusStats *[6]atomic.Int64, pathStats *sync.Map, refererStats *sync.Map) error {
- // 加载状态码统计
- rows, err := db.DB.Query(`
- SELECT status_group, SUM(count) as count
- FROM status_code_history
- WHERE timestamp >= datetime('now', '-5', 'minutes')
- GROUP BY status_group
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- var group string
- var count int64
- if err := rows.Scan(&group, &count); err != nil {
- return err
- }
- if len(group) > 0 {
- idx := (int(group[0]) - '0') - 1
- if idx >= 0 && idx < len(statusStats) {
- statusStats[idx].Store(count)
- }
- }
- }
-
- // 加载路径统计
- rows, err = db.DB.Query(`
- SELECT
- path,
- SUM(request_count) as requests,
- SUM(error_count) as errors,
- AVG(bytes_transferred) as bytes,
- AVG(avg_latency) as latency
- FROM popular_paths_history
- WHERE timestamp >= datetime('now', '-5', 'minutes')
- GROUP BY path
- ORDER BY requests DESC
- LIMIT 10
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- var path string
- var requests, errors, bytes int64
- var latency float64
- if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil {
- return err
- }
- stats := &PathStats{}
- stats.Requests.Store(requests)
- stats.Errors.Store(errors)
- stats.Bytes.Store(bytes)
- stats.LatencySum.Store(int64(latency))
- pathStats.Store(path, stats)
- }
-
- // 加载引用来源统计
- rows, err = db.DB.Query(`
- SELECT
- referer,
- SUM(request_count) as requests
- FROM referer_history
- WHERE timestamp >= datetime('now', '-5', 'minutes')
- GROUP BY referer
- ORDER BY requests DESC
- LIMIT 10
- `)
- if err != nil {
- return err
- }
- defer rows.Close()
-
- for rows.Next() {
- var referer string
- var requests int64
- if err := rows.Scan(&referer, &requests); err != nil {
- return err
- }
- stats := &PathStats{}
- stats.Requests.Store(requests)
- refererStats.Store(referer, stats)
- }
-
- return nil
-}
-
-// SaveAllMetrics 合并所有指标的保存
-func (db *MetricsDB) SaveAllMetrics(stats map[string]interface{}) error {
- start := time.Now()
- tx, err := db.DB.Begin()
- if err != nil {
- return err
- }
- defer tx.Rollback()
-
- // 保存基础指标
- stmt, err := tx.Prepare(`
- INSERT INTO metrics_history (
- total_requests, total_errors, total_bytes, avg_latency, error_rate
- ) VALUES (?, ?, ?, ?, ?)
- `)
- if err != nil {
- return fmt.Errorf("failed to prepare statement: %v", err)
- }
- defer stmt.Close()
-
- // 类型断言检查
- totalReqs, ok := stats["total_requests"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_requests type")
- }
- totalErrs, ok := stats["total_errors"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_errors type")
- }
- totalBytes, ok := stats["total_bytes"].(int64)
- if !ok {
- return fmt.Errorf("invalid total_bytes type")
- }
- avgLatency, ok := stats["avg_latency"].(int64)
- if !ok {
- return fmt.Errorf("invalid avg_latency type")
- }
-
- // 计算错误率
- var errorRate float64
- if totalReqs > 0 {
- errorRate = float64(totalErrs) / float64(totalReqs)
- }
-
- // 保存基础指标
- _, err = stmt.Exec(totalReqs, totalErrs, totalBytes, avgLatency, errorRate)
- if err != nil {
- return fmt.Errorf("failed to save basic metrics: %v", err)
- }
-
- // 保存状态码统计
- statusStats := stats["status_code_stats"].(map[string]int64)
- values := make([]string, 0, len(statusStats))
- args := make([]interface{}, 0, len(statusStats)*2)
- for group, count := range statusStats {
- values = append(values, "(?, ?)")
- args = append(args, group, count)
- }
- if len(values) > 0 {
- query := "INSERT INTO status_code_history (status_group, count) VALUES " +
- strings.Join(values, ",")
- if _, err := tx.Exec(query, args...); err != nil {
- return fmt.Errorf("failed to save status stats: %v", err)
- }
- }
-
- // 保存路径统计
- if pathStats, ok := stats["top_paths"].([]PathMetrics); ok && len(pathStats) > 0 {
- pathStmt, err := tx.Prepare(`
- INSERT INTO popular_paths_history (
- path, request_count, error_count, avg_latency, bytes_transferred
- ) VALUES (?, ?, ?, ?, ?)
- `)
- if err != nil {
- return fmt.Errorf("failed to prepare path statement: %v", err)
- }
- defer pathStmt.Close()
-
- for _, p := range pathStats {
- if _, err := pathStmt.Exec(
- p.Path, p.RequestCount, p.ErrorCount,
- p.AvgLatency, p.BytesTransferred,
- ); err != nil {
- return fmt.Errorf("failed to save path stats: %v", err)
- }
- }
- }
-
- // 保存引用来源
- if refererStats, ok := stats["top_referers"].([]PathMetrics); ok && len(refererStats) > 0 {
- refererStmt, err := tx.Prepare(`
- INSERT INTO referer_history (referer, request_count)
- VALUES (?, ?)
- `)
- if err != nil {
- return fmt.Errorf("failed to prepare referer statement: %v", err)
- }
- defer refererStmt.Close()
-
- for _, r := range refererStats {
- if _, err := refererStmt.Exec(r.Path, r.RequestCount); err != nil {
- return fmt.Errorf("failed to save referer stats: %v", err)
- }
- }
- }
-
- if err := tx.Commit(); err != nil {
- return fmt.Errorf("failed to commit transaction: %v", err)
- }
-
- log.Printf("Saved all metrics in %v", time.Since(start))
- return nil
-}
diff --git a/internal/storage/db.go b/internal/storage/db.go
deleted file mode 100644
index 90b2e0d..0000000
--- a/internal/storage/db.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package storage
-
-import (
- "database/sql"
- "log"
-
- _ "modernc.org/sqlite"
-)
-
-func InitDB(db *sql.DB) error {
- // 优化SQLite配置
- _, err := db.Exec(`
- PRAGMA journal_mode = WAL;
- PRAGMA synchronous = NORMAL;
- PRAGMA cache_size = 1000000;
- PRAGMA temp_store = MEMORY;
- `)
- if err != nil {
- return err
- }
-
- // 创建表
- if err := initTables(db); err != nil {
- return err
- }
-
- // 启动定期清理
- go cleanupRoutine(db)
-
- return nil
-}
-
-func initTables(db *sql.DB) error {
- // 基础指标表
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS metrics_history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- total_requests INTEGER,
- total_errors INTEGER,
- total_bytes INTEGER,
- avg_latency INTEGER
- )
- `)
- if err != nil {
- return err
- }
-
- // 状态码统计表
- _, err = db.Exec(`
- CREATE TABLE IF NOT EXISTS status_stats (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- status_group TEXT,
- count INTEGER
- )
- `)
- if err != nil {
- return err
- }
-
- // 路径统计表
- _, err = db.Exec(`
- CREATE TABLE IF NOT EXISTS path_stats (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
- path TEXT,
- requests INTEGER,
- errors INTEGER,
- bytes INTEGER,
- avg_latency INTEGER
- )
- `)
- if err != nil {
- return err
- }
-
- // 添加索引
- _, err = db.Exec(`
- CREATE INDEX IF NOT EXISTS idx_timestamp ON metrics_history(timestamp);
- CREATE INDEX IF NOT EXISTS idx_path ON path_stats(path);
- `)
- return err
-}
-
-func cleanupRoutine(db *sql.DB) {
- // 批量删除而不是单条删除
- tx, err := db.Begin()
- if err != nil {
- log.Printf("Error starting transaction: %v", err)
- return
- }
- defer tx.Rollback()
-
- // 保留90天的数据
- _, err = tx.Exec(`
- DELETE FROM metrics_history
- WHERE timestamp < datetime('now', '-90 days')
- `)
- if err != nil {
- log.Printf("Error cleaning old data: %v", err)
- }
-
- // 清理状态码统计
- _, err = tx.Exec(`
- DELETE FROM status_stats
- WHERE timestamp < datetime('now', '-90 days')
- `)
- if err != nil {
- log.Printf("Error cleaning old data: %v", err)
- }
-
- // 清理路径统计
- _, err = tx.Exec(`
- DELETE FROM path_stats
- WHERE timestamp < datetime('now', '-90 days')
- `)
- if err != nil {
- log.Printf("Error cleaning old data: %v", err)
- }
-
- if err := tx.Commit(); err != nil {
- log.Printf("Error committing transaction: %v", err)
- }
-}
diff --git a/main.go b/main.go
index 4b3f5cd..b873bb2 100644
--- a/main.go
+++ b/main.go
@@ -26,7 +26,7 @@ func main() {
constants.UpdateFromConfig(cfg)
// 初始化指标收集器
- if err := metrics.InitCollector("data/metrics.db", cfg); err != nil {
+ if err := metrics.InitCollector(cfg); err != nil {
log.Fatal("Error initializing metrics collector:", err)
}