proxy-go/internal/metrics/persistence.go

333 lines
9.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package metrics
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"proxy-go/internal/models"
"proxy-go/internal/utils"
"runtime"
"sync"
"sync/atomic"
"time"
)
// MetricsStorage 指标存储结构
type MetricsStorage struct {
collector *Collector
saveInterval time.Duration
dataDir string
stopChan chan struct{}
wg sync.WaitGroup
lastSaveTime time.Time
mutex sync.RWMutex
metricsFile string
statusCodeFile string
refererStatsFile string
}
// NewMetricsStorage 创建新的指标存储
func NewMetricsStorage(collector *Collector, dataDir string, saveInterval time.Duration) *MetricsStorage {
if saveInterval < time.Minute {
saveInterval = time.Minute
}
return &MetricsStorage{
collector: collector,
saveInterval: saveInterval,
dataDir: dataDir,
stopChan: make(chan struct{}),
metricsFile: filepath.Join(dataDir, "metrics.json"),
statusCodeFile: filepath.Join(dataDir, "status_codes.json"),
refererStatsFile: filepath.Join(dataDir, "referer_stats.json"),
}
}
// Start 启动定时保存任务
func (ms *MetricsStorage) Start() error {
// 确保数据目录存在
if err := os.MkdirAll(ms.dataDir, 0755); err != nil {
return fmt.Errorf("创建数据目录失败: %v", err)
}
// 尝试加载现有数据
if err := ms.LoadMetrics(); err != nil {
log.Printf("[MetricsStorage] 加载指标数据失败: %v", err)
// 加载失败不影响启动
}
ms.wg.Add(1)
go ms.runSaveTask()
log.Printf("[MetricsStorage] 指标存储服务已启动,保存间隔: %v", ms.saveInterval)
return nil
}
// Stop 停止定时保存任务
func (ms *MetricsStorage) Stop() {
close(ms.stopChan)
ms.wg.Wait()
// 在停止前保存一次数据
if err := ms.SaveMetrics(); err != nil {
log.Printf("[MetricsStorage] 停止时保存指标数据失败: %v", err)
}
log.Printf("[MetricsStorage] 指标存储服务已停止")
}
// runSaveTask 运行定时保存任务
func (ms *MetricsStorage) runSaveTask() {
defer ms.wg.Done()
ticker := time.NewTicker(ms.saveInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := ms.SaveMetrics(); err != nil {
log.Printf("[MetricsStorage] 保存指标数据失败: %v", err)
}
case <-ms.stopChan:
return
}
}
}
// SaveMetrics 保存指标数据
func (ms *MetricsStorage) SaveMetrics() error {
start := time.Now()
log.Printf("[MetricsStorage] 开始保存指标数据...")
// 获取当前指标数据
stats := ms.collector.GetStats()
// 保存基本指标 - 只保存必要的字段
basicMetrics := map[string]interface{}{
"uptime": stats["uptime"],
"total_bytes": stats["total_bytes"],
"avg_response_time": stats["avg_response_time"],
"save_time": time.Now().Format(time.RFC3339),
}
// 单独保存延迟统计,避免嵌套结构导致的内存占用
if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok {
basicMetrics["latency_min"] = latencyStats["min"]
basicMetrics["latency_max"] = latencyStats["max"]
}
if err := saveJSONToFile(ms.metricsFile, basicMetrics); err != nil {
return fmt.Errorf("保存基本指标失败: %v", err)
}
// 保存状态码统计
if err := saveJSONToFile(ms.statusCodeFile, stats["status_code_stats"]); err != nil {
return fmt.Errorf("保存状态码统计失败: %v", err)
}
// 保存引用来源统计 - 限制数量
topReferers := stats["top_referers"]
if err := saveJSONToFile(ms.refererStatsFile, topReferers); err != nil {
return fmt.Errorf("保存引用来源统计失败: %v", err)
}
// 单独保存延迟分布
if latencyStats, ok := stats["latency_stats"].(map[string]interface{}); ok {
if distribution, ok := latencyStats["distribution"]; ok {
if err := saveJSONToFile(filepath.Join(ms.dataDir, "latency_distribution.json"), distribution); err != nil {
log.Printf("[MetricsStorage] 保存延迟分布失败: %v", err)
}
}
}
ms.mutex.Lock()
ms.lastSaveTime = time.Now()
ms.mutex.Unlock()
// 强制进行一次GC
runtime.GC()
// 打印内存使用情况
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
log.Printf("[MetricsStorage] 指标数据保存完成,耗时: %v, 内存使用: %s",
time.Since(start), utils.FormatBytes(int64(mem.Alloc)))
return nil
}
// LoadMetrics 加载指标数据
func (ms *MetricsStorage) LoadMetrics() error {
start := time.Now()
log.Printf("[MetricsStorage] 开始加载指标数据...")
// 检查文件是否存在
if !fileExists(ms.metricsFile) {
return fmt.Errorf("指标数据文件不存在")
}
// 加载基本指标
var basicMetrics map[string]interface{}
if err := loadJSONFromFile(ms.metricsFile, &basicMetrics); err != nil {
return fmt.Errorf("加载基本指标失败: %v", err)
}
// 将加载的数据应用到收集器
// 1. 应用总字节数
if totalBytes, ok := basicMetrics["total_bytes"].(float64); ok {
atomic.StoreInt64(&ms.collector.totalBytes, int64(totalBytes))
}
// 2. 加载状态码统计(如果文件存在)
if fileExists(ms.statusCodeFile) {
var statusCodeStats map[string]interface{}
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
} else {
for statusCode, count := range statusCodeStats {
countValue, ok := count.(float64)
if !ok {
continue
}
// 创建或更新状态码统计
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
} else {
counter := new(int64)
*counter = int64(countValue)
ms.collector.statusCodeStats.Store(statusCode, counter)
}
}
log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats))
}
}
// 3. 加载引用来源统计(如果文件存在)
if fileExists(ms.refererStatsFile) {
var refererStats []map[string]interface{}
if err := loadJSONFromFile(ms.refererStatsFile, &refererStats); err != nil {
log.Printf("[MetricsStorage] 加载引用来源统计失败: %v", err)
} else {
// 只加载前20个引用来源统计
maxReferers := 20
if len(refererStats) > maxReferers {
refererStats = refererStats[:maxReferers]
}
for _, refererStat := range refererStats {
referer, ok := refererStat["path"].(string)
if !ok {
continue
}
requestCount, _ := refererStat["request_count"].(float64)
errorCount, _ := refererStat["error_count"].(float64)
bytesTransferred, _ := refererStat["bytes_transferred"].(float64)
// 创建或更新引用来源统计
var refererMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
} else {
refererMetrics = &models.PathMetrics{Path: referer}
ms.collector.refererStats.Store(referer, refererMetrics)
}
// 设置统计值
refererMetrics.RequestCount.Store(int64(requestCount))
refererMetrics.ErrorCount.Store(int64(errorCount))
refererMetrics.BytesTransferred.Store(int64(bytesTransferred))
}
log.Printf("[MetricsStorage] 加载了 %d 条引用来源统计", len(refererStats))
}
}
// 5. 加载延迟分布(如果文件存在)
latencyDistributionFile := filepath.Join(ms.dataDir, "latency_distribution.json")
if fileExists(latencyDistributionFile) {
var distribution map[string]interface{}
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
} else {
for bucket, count := range distribution {
countValue, ok := count.(float64)
if !ok {
continue
}
if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
}
}
log.Printf("[MetricsStorage] 加载了延迟分布数据")
}
}
ms.mutex.Lock()
if saveTime, ok := basicMetrics["save_time"].(string); ok {
if t, err := time.Parse(time.RFC3339, saveTime); err == nil {
ms.lastSaveTime = t
}
}
ms.mutex.Unlock()
// 强制进行一次GC
runtime.GC()
// 打印内存使用情况
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
log.Printf("[MetricsStorage] 指标数据加载完成,耗时: %v, 内存使用: %s",
time.Since(start), utils.FormatBytes(int64(mem.Alloc)))
return nil
}
// GetLastSaveTime 获取最后保存时间
func (ms *MetricsStorage) GetLastSaveTime() time.Time {
ms.mutex.RLock()
defer ms.mutex.RUnlock()
return ms.lastSaveTime
}
// 辅助函数保存JSON到文件
func saveJSONToFile(filename string, data interface{}) error {
// 创建临时文件
tempFile := filename + ".tmp"
// 将数据编码为JSON
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
// 写入临时文件
if err := os.WriteFile(tempFile, jsonData, 0644); err != nil {
return err
}
// 重命名临时文件为目标文件(原子操作)
return os.Rename(tempFile, filename)
}
// 辅助函数从文件加载JSON
func loadJSONFromFile(filename string, data interface{}) error {
// 读取文件内容
jsonData, err := os.ReadFile(filename)
if err != nil {
return err
}
// 解码JSON数据
return json.Unmarshal(jsonData, data)
}
// 辅助函数:检查文件是否存在
func fileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
}