mirror of
https://github.com/woodchen-ink/proxy-go.git
synced 2025-07-18 08:31:55 +08:00
- Implemented a new function to load recent status statistics from the database, enhancing the collector's ability to track real-time metrics. - Added error logging for failed status stats loading to improve monitoring and debugging capabilities. - Moved database transaction optimization settings outside of the transaction scope in SaveMetrics, improving performance during metric saving operations. - Updated SQL queries in GetRecentMetrics to streamline time filtering logic, ensuring accurate retrieval of recent metrics. These changes enhance the metrics collection process and improve the overall performance and reliability of the metrics dashboard.
619 lines
15 KiB
Go
619 lines
15 KiB
Go
package metrics
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"log"
|
||
"net/http"
|
||
"proxy-go/internal/cache"
|
||
"proxy-go/internal/config"
|
||
"proxy-go/internal/constants"
|
||
"proxy-go/internal/models"
|
||
"proxy-go/internal/monitor"
|
||
"proxy-go/internal/utils"
|
||
"runtime"
|
||
"sort"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
type Collector struct {
|
||
startTime time.Time
|
||
activeRequests int64
|
||
totalRequests int64
|
||
totalErrors int64
|
||
totalBytes atomic.Int64
|
||
latencySum atomic.Int64
|
||
persistentStats struct {
|
||
totalRequests atomic.Int64
|
||
totalErrors atomic.Int64
|
||
totalBytes atomic.Int64
|
||
}
|
||
pathStats sync.Map
|
||
refererStats sync.Map
|
||
statusStats [6]atomic.Int64
|
||
latencyBuckets [10]atomic.Int64
|
||
recentRequests struct {
|
||
sync.RWMutex
|
||
items [1000]*models.RequestLog
|
||
cursor atomic.Int64
|
||
}
|
||
db *models.MetricsDB
|
||
cache *cache.Cache
|
||
monitor *monitor.Monitor
|
||
statsPool sync.Pool
|
||
}
|
||
|
||
var globalCollector *Collector
|
||
|
||
func InitCollector(dbPath string, config *config.Config) error {
|
||
db, err := models.NewMetricsDB(dbPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
globalCollector = &Collector{
|
||
startTime: time.Now(),
|
||
pathStats: sync.Map{},
|
||
statusStats: [6]atomic.Int64{},
|
||
latencyBuckets: [10]atomic.Int64{},
|
||
db: db,
|
||
}
|
||
|
||
// 加载历史数据
|
||
if lastMetrics, err := db.GetLastMetrics(); err == nil && lastMetrics != nil {
|
||
globalCollector.persistentStats.totalRequests.Store(lastMetrics.TotalRequests)
|
||
globalCollector.persistentStats.totalErrors.Store(lastMetrics.TotalErrors)
|
||
globalCollector.persistentStats.totalBytes.Store(lastMetrics.TotalBytes)
|
||
if err := loadRecentStatusStats(db); err != nil {
|
||
log.Printf("Warning: Failed to load recent status stats: %v", err)
|
||
}
|
||
log.Printf("Loaded historical metrics: requests=%d, errors=%d, bytes=%d",
|
||
lastMetrics.TotalRequests, lastMetrics.TotalErrors, lastMetrics.TotalBytes)
|
||
}
|
||
|
||
// 加载最近5分钟的统计数据
|
||
if err := globalCollector.LoadRecentStats(); err != nil {
|
||
log.Printf("Warning: Failed to load recent stats: %v", err)
|
||
}
|
||
|
||
globalCollector.cache = cache.NewCache(constants.CacheTTL)
|
||
globalCollector.monitor = monitor.NewMonitor()
|
||
|
||
// 如果配置了飞书webhook,则启用飞书告警
|
||
if config.Metrics.FeishuWebhook != "" {
|
||
globalCollector.monitor.AddHandler(
|
||
monitor.NewFeishuHandler(config.Metrics.FeishuWebhook),
|
||
)
|
||
log.Printf("Feishu alert enabled")
|
||
}
|
||
|
||
// 启动定时保存
|
||
go func() {
|
||
ticker := time.NewTicker(5 * time.Minute)
|
||
for range ticker.C {
|
||
stats := globalCollector.GetStats()
|
||
if err := db.SaveMetrics(stats); err != nil {
|
||
log.Printf("Error saving metrics: %v", err)
|
||
} else {
|
||
log.Printf("Metrics saved successfully")
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 启动每小时保存统计数据
|
||
go func() {
|
||
ticker := time.NewTicker(1 * time.Hour)
|
||
for range ticker.C {
|
||
stats := globalCollector.GetStats()
|
||
if err := db.SaveFullMetrics(stats); err != nil {
|
||
log.Printf("Error saving full metrics: %v", err)
|
||
} else {
|
||
log.Printf("Full metrics saved successfully")
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 设置程序退出时的处理
|
||
utils.SetupCloseHandler(func() {
|
||
log.Println("Saving final metrics before shutdown...")
|
||
// 确保所有正在进行的操作完成
|
||
time.Sleep(time.Second)
|
||
|
||
stats := globalCollector.GetStats()
|
||
if err := db.SaveMetrics(stats); err != nil {
|
||
log.Printf("Error saving final metrics: %v", err)
|
||
} else {
|
||
log.Printf("Basic metrics saved successfully")
|
||
}
|
||
|
||
// 保存完整统计数据
|
||
if err := db.SaveFullMetrics(stats); err != nil {
|
||
log.Printf("Error saving full metrics: %v", err)
|
||
} else {
|
||
log.Printf("Full metrics saved successfully")
|
||
}
|
||
|
||
// 等待数据写入完成
|
||
time.Sleep(time.Second)
|
||
|
||
db.Close()
|
||
log.Println("Database closed successfully")
|
||
})
|
||
|
||
globalCollector.statsPool = sync.Pool{
|
||
New: func() interface{} {
|
||
return make(map[string]interface{}, 20)
|
||
},
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func GetCollector() *Collector {
|
||
return globalCollector
|
||
}
|
||
|
||
func (c *Collector) BeginRequest() {
|
||
atomic.AddInt64(&c.activeRequests, 1)
|
||
}
|
||
|
||
func (c *Collector) EndRequest() {
|
||
atomic.AddInt64(&c.activeRequests, -1)
|
||
}
|
||
|
||
func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) {
|
||
// 更新总请求数
|
||
atomic.AddInt64(&c.totalRequests, 1)
|
||
|
||
// 更新总字节数
|
||
c.totalBytes.Add(bytes)
|
||
|
||
// 更新状态码统计
|
||
if status >= 100 && status < 600 {
|
||
c.statusStats[status/100-1].Add(1)
|
||
}
|
||
|
||
// 更新错误数
|
||
if status >= 400 {
|
||
atomic.AddInt64(&c.totalErrors, 1)
|
||
}
|
||
|
||
// 更新延迟分布
|
||
bucket := int(latency.Milliseconds() / 100)
|
||
if bucket < 10 {
|
||
c.latencyBuckets[bucket].Add(1)
|
||
}
|
||
|
||
// 更新路径统计
|
||
if stats, ok := c.pathStats.Load(path); ok {
|
||
pathStats := stats.(*models.PathStats)
|
||
pathStats.Requests.Add(1)
|
||
if status >= 400 {
|
||
pathStats.Errors.Add(1)
|
||
}
|
||
pathStats.Bytes.Add(bytes)
|
||
pathStats.LatencySum.Add(int64(latency))
|
||
} else {
|
||
newStats := &models.PathStats{}
|
||
newStats.Requests.Add(1)
|
||
if status >= 400 {
|
||
newStats.Errors.Add(1)
|
||
}
|
||
newStats.Bytes.Add(bytes)
|
||
newStats.LatencySum.Add(int64(latency))
|
||
c.pathStats.Store(path, newStats)
|
||
}
|
||
|
||
// 更新引用来源统计
|
||
if referer := r.Header.Get("Referer"); referer != "" {
|
||
if stats, ok := c.refererStats.Load(referer); ok {
|
||
stats.(*models.PathStats).Requests.Add(1)
|
||
} else {
|
||
newStats := &models.PathStats{}
|
||
newStats.Requests.Add(1)
|
||
c.refererStats.Store(referer, newStats)
|
||
}
|
||
}
|
||
|
||
// 记录最近的请求
|
||
log := &models.RequestLog{
|
||
Time: time.Now(),
|
||
Path: path,
|
||
Status: status,
|
||
Latency: latency,
|
||
BytesSent: bytes,
|
||
ClientIP: clientIP,
|
||
}
|
||
|
||
c.recentRequests.Lock()
|
||
cursor := c.recentRequests.cursor.Add(1) % 1000
|
||
c.recentRequests.items[cursor] = log
|
||
c.recentRequests.Unlock()
|
||
|
||
c.latencySum.Add(int64(latency))
|
||
|
||
// 更新错误统计
|
||
if status >= 400 {
|
||
c.monitor.RecordError()
|
||
}
|
||
c.monitor.RecordRequest()
|
||
|
||
// 检查延迟
|
||
c.monitor.CheckLatency(latency, bytes)
|
||
}
|
||
|
||
func (c *Collector) GetStats() map[string]interface{} {
|
||
// 先查缓存
|
||
if stats, ok := c.cache.Get("stats"); ok {
|
||
if statsMap, ok := stats.(map[string]interface{}); ok {
|
||
return statsMap
|
||
}
|
||
}
|
||
|
||
var m runtime.MemStats
|
||
runtime.ReadMemStats(&m)
|
||
|
||
// 确保所有字段都被初始化
|
||
stats := make(map[string]interface{})
|
||
|
||
// 基础指标 - 合并当前会话和持久化的数据
|
||
currentRequests := atomic.LoadInt64(&c.totalRequests)
|
||
currentErrors := atomic.LoadInt64(&c.totalErrors)
|
||
currentBytes := c.totalBytes.Load()
|
||
|
||
totalRequests := currentRequests + c.persistentStats.totalRequests.Load()
|
||
totalErrors := currentErrors + c.persistentStats.totalErrors.Load()
|
||
totalBytes := currentBytes + c.persistentStats.totalBytes.Load()
|
||
|
||
// 计算每秒指标
|
||
uptime := time.Since(c.startTime).Seconds()
|
||
stats["requests_per_second"] = float64(currentRequests) / Max(uptime, 1)
|
||
stats["bytes_per_second"] = float64(currentBytes) / Max(uptime, 1)
|
||
|
||
stats["active_requests"] = atomic.LoadInt64(&c.activeRequests)
|
||
stats["total_requests"] = totalRequests
|
||
stats["total_errors"] = totalErrors
|
||
stats["total_bytes"] = totalBytes
|
||
|
||
// 系统指标
|
||
stats["num_goroutine"] = runtime.NumGoroutine()
|
||
stats["memory_usage"] = FormatBytes(m.Alloc)
|
||
|
||
// 延迟指标
|
||
currentTotalRequests := atomic.LoadInt64(&c.totalRequests)
|
||
latencySum := c.latencySum.Load()
|
||
if currentTotalRequests <= 0 || latencySum <= 0 {
|
||
stats["avg_latency"] = int64(0)
|
||
} else {
|
||
stats["avg_latency"] = latencySum / currentTotalRequests
|
||
}
|
||
|
||
// 状态码统计
|
||
statusStats := make(map[string]int64)
|
||
for i := range c.statusStats {
|
||
statusStats[fmt.Sprintf("%dxx", i+1)] = c.statusStats[i].Load()
|
||
}
|
||
stats["status_code_stats"] = statusStats
|
||
|
||
// 获取Top 10路径统计
|
||
var pathMetrics []models.PathMetrics
|
||
var allPaths []models.PathMetrics
|
||
|
||
c.pathStats.Range(func(key, value interface{}) bool {
|
||
stats := value.(*models.PathStats)
|
||
if stats.Requests.Load() == 0 {
|
||
return true
|
||
}
|
||
allPaths = append(allPaths, models.PathMetrics{
|
||
Path: key.(string),
|
||
RequestCount: stats.Requests.Load(),
|
||
ErrorCount: stats.Errors.Load(),
|
||
AvgLatency: formatAvgLatency(stats.LatencySum.Load(), stats.Requests.Load()),
|
||
BytesTransferred: stats.Bytes.Load(),
|
||
})
|
||
return true
|
||
})
|
||
|
||
// 按请求数排序并获取前10个
|
||
sort.Slice(allPaths, func(i, j int) bool {
|
||
return allPaths[i].RequestCount > allPaths[j].RequestCount
|
||
})
|
||
|
||
if len(allPaths) > 10 {
|
||
pathMetrics = allPaths[:10]
|
||
} else {
|
||
pathMetrics = allPaths
|
||
}
|
||
stats["top_paths"] = pathMetrics
|
||
|
||
// 获取最近请求
|
||
stats["recent_requests"] = c.getRecentRequests()
|
||
|
||
// 获取Top 10引用来源
|
||
var refererMetrics []models.PathMetrics
|
||
var allReferers []models.PathMetrics
|
||
c.refererStats.Range(func(key, value interface{}) bool {
|
||
stats := value.(*models.PathStats)
|
||
if stats.Requests.Load() == 0 {
|
||
return true
|
||
}
|
||
allReferers = append(allReferers, models.PathMetrics{
|
||
Path: key.(string),
|
||
RequestCount: stats.Requests.Load(),
|
||
})
|
||
return true
|
||
})
|
||
|
||
// 按请求数排序并获取前10个
|
||
sort.Slice(allReferers, func(i, j int) bool {
|
||
return allReferers[i].RequestCount > allReferers[j].RequestCount
|
||
})
|
||
|
||
if len(allReferers) > 10 {
|
||
refererMetrics = allReferers[:10]
|
||
} else {
|
||
refererMetrics = allReferers
|
||
}
|
||
stats["top_referers"] = refererMetrics
|
||
|
||
// 检查告警
|
||
c.monitor.CheckMetrics(stats)
|
||
|
||
// 写入缓存
|
||
c.cache.Set("stats", stats)
|
||
|
||
return stats
|
||
}
|
||
|
||
func (c *Collector) getRecentRequests() []models.RequestLog {
|
||
var recentReqs []models.RequestLog
|
||
c.recentRequests.RLock()
|
||
defer c.recentRequests.RUnlock()
|
||
|
||
cursor := c.recentRequests.cursor.Load()
|
||
for i := 0; i < 10; i++ {
|
||
idx := (cursor - int64(i) + 1000) % 1000
|
||
if c.recentRequests.items[idx] != nil {
|
||
recentReqs = append(recentReqs, *c.recentRequests.items[idx])
|
||
}
|
||
}
|
||
return recentReqs
|
||
}
|
||
|
||
// 辅助函数
|
||
func FormatDuration(d time.Duration) string {
|
||
if d < time.Millisecond {
|
||
return fmt.Sprintf("%.2f μs", float64(d.Microseconds()))
|
||
}
|
||
if d < time.Second {
|
||
return fmt.Sprintf("%.2f ms", float64(d.Milliseconds()))
|
||
}
|
||
return fmt.Sprintf("%.2f s", d.Seconds())
|
||
}
|
||
|
||
func FormatBytes(bytes uint64) string {
|
||
const (
|
||
MB = 1024 * 1024
|
||
KB = 1024
|
||
)
|
||
|
||
switch {
|
||
case bytes >= MB:
|
||
return fmt.Sprintf("%.2f MB", float64(bytes)/MB)
|
||
case bytes >= KB:
|
||
return fmt.Sprintf("%.2f KB", float64(bytes)/KB)
|
||
default:
|
||
return fmt.Sprintf("%d Bytes", bytes)
|
||
}
|
||
}
|
||
|
||
func Max(a, b float64) float64 {
|
||
if a > b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
func (c *Collector) GetDB() *models.MetricsDB {
|
||
return c.db
|
||
}
|
||
|
||
func (c *Collector) SaveMetrics(stats map[string]interface{}) error {
|
||
// 更新持久化数据
|
||
c.persistentStats.totalRequests.Store(stats["total_requests"].(int64))
|
||
c.persistentStats.totalErrors.Store(stats["total_errors"].(int64))
|
||
c.persistentStats.totalBytes.Store(stats["total_bytes"].(int64))
|
||
|
||
// 重置当前会话计数器
|
||
atomic.StoreInt64(&c.totalRequests, 0)
|
||
atomic.StoreInt64(&c.totalErrors, 0)
|
||
c.totalBytes.Store(0)
|
||
c.latencySum.Store(0)
|
||
|
||
// 重置状态码统计
|
||
for i := range c.statusStats {
|
||
c.statusStats[i].Store(0)
|
||
}
|
||
|
||
// 重置路径统计
|
||
c.pathStats.Range(func(key, _ interface{}) bool {
|
||
c.pathStats.Delete(key)
|
||
return true
|
||
})
|
||
|
||
// 重置引用来源统计
|
||
c.refererStats.Range(func(key, _ interface{}) bool {
|
||
c.refererStats.Delete(key)
|
||
return true
|
||
})
|
||
|
||
return c.db.SaveMetrics(stats)
|
||
}
|
||
|
||
// LoadRecentStats 加载最近的统计数据
|
||
func (c *Collector) LoadRecentStats() error {
|
||
// 加载最近5分钟的数据
|
||
row := c.db.DB.QueryRow(`
|
||
SELECT
|
||
total_requests, total_errors, total_bytes, avg_latency
|
||
FROM metrics_history
|
||
WHERE timestamp >= datetime('now', '-5', 'minutes')
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
`)
|
||
|
||
var metrics models.HistoricalMetrics
|
||
if err := row.Scan(
|
||
&metrics.TotalRequests,
|
||
&metrics.TotalErrors,
|
||
&metrics.TotalBytes,
|
||
&metrics.AvgLatency,
|
||
); err != nil && err != sql.ErrNoRows {
|
||
return err
|
||
}
|
||
|
||
// 更新持久化数据
|
||
if metrics.TotalRequests > 0 {
|
||
c.persistentStats.totalRequests.Store(metrics.TotalRequests)
|
||
c.persistentStats.totalErrors.Store(metrics.TotalErrors)
|
||
c.persistentStats.totalBytes.Store(metrics.TotalBytes)
|
||
}
|
||
|
||
// 加载状态码统计
|
||
rows, err := c.db.DB.Query(`
|
||
SELECT status_group, SUM(count) as count
|
||
FROM status_code_history
|
||
WHERE timestamp >= datetime('now', '-5', 'minutes')
|
||
GROUP BY status_group
|
||
`)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer rows.Close()
|
||
|
||
for rows.Next() {
|
||
var group string
|
||
var count int64
|
||
if err := rows.Scan(&group, &count); err != nil {
|
||
return err
|
||
}
|
||
if len(group) > 0 {
|
||
idx := (int(group[0]) - '0') - 1
|
||
if idx >= 0 && idx < len(c.statusStats) {
|
||
c.statusStats[idx].Store(count)
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return fmt.Errorf("error scanning status code rows: %v", err)
|
||
}
|
||
|
||
// 加载路径统计
|
||
rows, err = c.db.DB.Query(`
|
||
SELECT
|
||
path,
|
||
SUM(request_count) as requests,
|
||
SUM(error_count) as errors,
|
||
AVG(bytes_transferred) as bytes,
|
||
AVG(avg_latency) as latency
|
||
FROM popular_paths_history
|
||
WHERE timestamp >= datetime('now', '-5', 'minutes')
|
||
GROUP BY path
|
||
ORDER BY requests DESC
|
||
LIMIT 10
|
||
`)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer rows.Close()
|
||
|
||
for rows.Next() {
|
||
var path string
|
||
var requests, errors, bytes int64
|
||
var latency float64
|
||
if err := rows.Scan(&path, &requests, &errors, &bytes, &latency); err != nil {
|
||
return err
|
||
}
|
||
stats := &models.PathStats{}
|
||
stats.Requests.Store(requests)
|
||
stats.Errors.Store(errors)
|
||
stats.Bytes.Store(bytes)
|
||
stats.LatencySum.Store(int64(latency))
|
||
c.pathStats.Store(path, stats)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return fmt.Errorf("error scanning path stats rows: %v", err)
|
||
}
|
||
|
||
// 加载引用来源统计
|
||
rows, err = c.db.DB.Query(`
|
||
SELECT
|
||
referer,
|
||
SUM(request_count) as requests
|
||
FROM referer_history
|
||
WHERE timestamp >= datetime('now', '-5', 'minutes')
|
||
GROUP BY referer
|
||
ORDER BY requests DESC
|
||
LIMIT 10
|
||
`)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer rows.Close()
|
||
|
||
for rows.Next() {
|
||
var referer string
|
||
var requests int64
|
||
if err := rows.Scan(&referer, &requests); err != nil {
|
||
return err
|
||
}
|
||
stats := &models.PathStats{}
|
||
stats.Requests.Store(requests)
|
||
c.refererStats.Store(referer, stats)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return fmt.Errorf("error scanning referer stats rows: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func formatAvgLatency(latencySum, requests int64) string {
|
||
if requests <= 0 || latencySum <= 0 {
|
||
return "0 ms"
|
||
}
|
||
return FormatDuration(time.Duration(latencySum / requests))
|
||
}
|
||
|
||
func loadRecentStatusStats(db *models.MetricsDB) error {
|
||
rows, err := db.DB.Query(`
|
||
SELECT status_group, count
|
||
FROM status_stats
|
||
WHERE timestamp >= datetime('now', '-5', 'minutes')
|
||
`)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer rows.Close()
|
||
|
||
for rows.Next() {
|
||
var group string
|
||
var count int64
|
||
if err := rows.Scan(&group, &count); err != nil {
|
||
return err
|
||
}
|
||
if len(group) > 0 {
|
||
idx := (int(group[0]) - '0') - 1
|
||
if idx >= 0 && idx < len(globalCollector.statusStats) {
|
||
globalCollector.statusStats[idx].Store(count)
|
||
}
|
||
}
|
||
}
|
||
return rows.Err()
|
||
}
|