feat(metrics): 完善指标存储服务集成

- 在主程序中添加指标存储服务初始化和停止逻辑
- 更新指标收集器的保存方法,支持外部指标存储服务
- 优化静态文件服务路径,使用新的 web/dist 目录
- 调整静态文件处理逻辑,支持更灵活的路由
This commit is contained in:
wood chen 2025-03-09 11:00:39 +08:00
parent e67a3880f5
commit f0c806292b
4 changed files with 370 additions and 0 deletions

View File

@ -323,6 +323,12 @@ func (c *Collector) GetStats() map[string]interface{} {
func (c *Collector) SaveMetrics(stats map[string]interface{}) error { func (c *Collector) SaveMetrics(stats map[string]interface{}) error {
lastSaveTime = time.Now() lastSaveTime = time.Now()
// 如果指标存储服务已初始化,则调用它来保存指标
if metricsStorage != nil {
return metricsStorage.SaveMetrics()
}
return nil return nil
} }

48
internal/metrics/init.go Normal file
View File

@ -0,0 +1,48 @@
package metrics
import (
"log"
"path/filepath"
"proxy-go/internal/config"
"time"
)
var (
metricsStorage *MetricsStorage
)
// InitMetricsStorage 初始化指标存储服务
func InitMetricsStorage(cfg *config.Config) error {
// 确保收集器已初始化
if err := InitCollector(cfg); err != nil {
return err
}
// 创建指标存储服务
dataDir := filepath.Join("data", "metrics")
saveInterval := 5 * time.Minute // 默认5分钟保存一次
metricsStorage = NewMetricsStorage(GetCollector(), dataDir, saveInterval)
// 启动指标存储服务
if err := metricsStorage.Start(); err != nil {
log.Printf("[Metrics] 启动指标存储服务失败: %v", err)
return err
}
log.Printf("[Metrics] 指标存储服务已初始化")
return nil
}
// StopMetricsStorage 停止指标存储服务
func StopMetricsStorage() {
if metricsStorage != nil {
metricsStorage.Stop()
log.Printf("[Metrics] 指标存储服务已停止")
}
}
// GetMetricsStorage 获取指标存储服务实例
func GetMetricsStorage() *MetricsStorage {
return metricsStorage
}

View File

@ -0,0 +1,306 @@
package metrics
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"proxy-go/internal/models"
"sync"
"sync/atomic"
"time"
)
// MetricsStorage 指标存储结构
type MetricsStorage struct {
collector *Collector
saveInterval time.Duration
dataDir string
stopChan chan struct{}
wg sync.WaitGroup
lastSaveTime time.Time
mutex sync.RWMutex
metricsFile string
pathStatsFile string
statusCodeFile string
}
// NewMetricsStorage 创建新的指标存储
func NewMetricsStorage(collector *Collector, dataDir string, saveInterval time.Duration) *MetricsStorage {
if saveInterval < time.Minute {
saveInterval = time.Minute
}
return &MetricsStorage{
collector: collector,
saveInterval: saveInterval,
dataDir: dataDir,
stopChan: make(chan struct{}),
metricsFile: filepath.Join(dataDir, "metrics.json"),
pathStatsFile: filepath.Join(dataDir, "path_stats.json"),
statusCodeFile: filepath.Join(dataDir, "status_codes.json"),
}
}
// Start 启动定时保存任务
func (ms *MetricsStorage) Start() error {
// 确保数据目录存在
if err := os.MkdirAll(ms.dataDir, 0755); err != nil {
return fmt.Errorf("创建数据目录失败: %v", err)
}
// 尝试加载现有数据
if err := ms.LoadMetrics(); err != nil {
log.Printf("[MetricsStorage] 加载指标数据失败: %v", err)
// 加载失败不影响启动
}
ms.wg.Add(1)
go ms.runSaveTask()
log.Printf("[MetricsStorage] 指标存储服务已启动,保存间隔: %v", ms.saveInterval)
return nil
}
// Stop 停止定时保存任务
func (ms *MetricsStorage) Stop() {
close(ms.stopChan)
ms.wg.Wait()
// 在停止前保存一次数据
if err := ms.SaveMetrics(); err != nil {
log.Printf("[MetricsStorage] 停止时保存指标数据失败: %v", err)
}
log.Printf("[MetricsStorage] 指标存储服务已停止")
}
// runSaveTask 运行定时保存任务
func (ms *MetricsStorage) runSaveTask() {
defer ms.wg.Done()
ticker := time.NewTicker(ms.saveInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := ms.SaveMetrics(); err != nil {
log.Printf("[MetricsStorage] 保存指标数据失败: %v", err)
}
case <-ms.stopChan:
return
}
}
}
// SaveMetrics 保存指标数据
func (ms *MetricsStorage) SaveMetrics() error {
start := time.Now()
log.Printf("[MetricsStorage] 开始保存指标数据...")
// 获取当前指标数据
stats := ms.collector.GetStats()
// 保存基本指标
basicMetrics := map[string]interface{}{
"uptime": stats["uptime"],
"total_bytes": stats["total_bytes"],
"avg_response_time": stats["avg_response_time"],
"requests_per_second": stats["requests_per_second"],
"bytes_per_second": stats["bytes_per_second"],
"latency_stats": stats["latency_stats"],
"bandwidth_history": stats["bandwidth_history"],
"current_bandwidth": stats["current_bandwidth"],
"save_time": time.Now().Format(time.RFC3339),
}
if err := saveJSONToFile(ms.metricsFile, basicMetrics); err != nil {
return fmt.Errorf("保存基本指标失败: %v", err)
}
// 保存路径统计
if err := saveJSONToFile(ms.pathStatsFile, stats["top_paths"]); err != nil {
return fmt.Errorf("保存路径统计失败: %v", err)
}
// 保存状态码统计
if err := saveJSONToFile(ms.statusCodeFile, stats["status_code_stats"]); err != nil {
return fmt.Errorf("保存状态码统计失败: %v", err)
}
ms.mutex.Lock()
ms.lastSaveTime = time.Now()
ms.mutex.Unlock()
log.Printf("[MetricsStorage] 指标数据保存完成,耗时: %v", time.Since(start))
return nil
}
// LoadMetrics 加载指标数据
func (ms *MetricsStorage) LoadMetrics() error {
start := time.Now()
log.Printf("[MetricsStorage] 开始加载指标数据...")
// 检查文件是否存在
if !fileExists(ms.metricsFile) || !fileExists(ms.pathStatsFile) || !fileExists(ms.statusCodeFile) {
return fmt.Errorf("指标数据文件不存在")
}
// 加载基本指标
var basicMetrics map[string]interface{}
if err := loadJSONFromFile(ms.metricsFile, &basicMetrics); err != nil {
return fmt.Errorf("加载基本指标失败: %v", err)
}
// 加载路径统计
var pathStats []map[string]interface{}
if err := loadJSONFromFile(ms.pathStatsFile, &pathStats); err != nil {
return fmt.Errorf("加载路径统计失败: %v", err)
}
// 加载状态码统计
var statusCodeStats map[string]interface{}
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
return fmt.Errorf("加载状态码统计失败: %v", err)
}
// 将加载的数据应用到收集器
// 1. 应用总字节数
if totalBytes, ok := basicMetrics["total_bytes"].(float64); ok {
atomic.StoreInt64(&ms.collector.totalBytes, int64(totalBytes))
}
// 2. 应用路径统计
for _, pathStat := range pathStats {
path, ok := pathStat["path"].(string)
if !ok {
continue
}
requestCount, _ := pathStat["request_count"].(float64)
errorCount, _ := pathStat["error_count"].(float64)
bytesTransferred, _ := pathStat["bytes_transferred"].(float64)
// 创建或更新路径统计
var pathMetrics *models.PathMetrics
if existingMetrics, ok := ms.collector.pathStats.Load(path); ok {
pathMetrics = existingMetrics.(*models.PathMetrics)
} else {
pathMetrics = &models.PathMetrics{Path: path}
ms.collector.pathStats.Store(path, pathMetrics)
}
// 设置统计值
pathMetrics.RequestCount.Store(int64(requestCount))
pathMetrics.ErrorCount.Store(int64(errorCount))
pathMetrics.BytesTransferred.Store(int64(bytesTransferred))
}
// 3. 应用状态码统计
for statusCode, count := range statusCodeStats {
countValue, ok := count.(float64)
if !ok {
continue
}
// 创建或更新状态码统计
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
} else {
counter := new(int64)
*counter = int64(countValue)
ms.collector.statusCodeStats.Store(statusCode, counter)
}
}
// 4. 应用延迟分布桶(如果有)
if latencyStats, ok := basicMetrics["latency_stats"].(map[string]interface{}); ok {
if distribution, ok := latencyStats["distribution"].(map[string]interface{}); ok {
for bucket, count := range distribution {
countValue, ok := count.(float64)
if !ok {
continue
}
if bucketCounter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
atomic.StoreInt64(bucketCounter.(*int64), int64(countValue))
}
}
}
}
// 5. 应用带宽历史(如果有)
if bandwidthHistory, ok := basicMetrics["bandwidth_history"].(map[string]interface{}); ok {
ms.collector.bandwidthStats.Lock()
ms.collector.bandwidthStats.history = make(map[string]int64)
for timeKey, bandwidth := range bandwidthHistory {
bandwidthValue, ok := bandwidth.(string)
if !ok {
continue
}
// 解析带宽值(假设格式为 "X.XX MB/s"
var bytesValue float64
fmt.Sscanf(bandwidthValue, "%f", &bytesValue)
ms.collector.bandwidthStats.history[timeKey] = int64(bytesValue)
}
ms.collector.bandwidthStats.Unlock()
}
ms.mutex.Lock()
if saveTime, ok := basicMetrics["save_time"].(string); ok {
if t, err := time.Parse(time.RFC3339, saveTime); err == nil {
ms.lastSaveTime = t
}
}
ms.mutex.Unlock()
log.Printf("[MetricsStorage] 指标数据加载完成,耗时: %v", time.Since(start))
return nil
}
// GetLastSaveTime 获取最后保存时间
func (ms *MetricsStorage) GetLastSaveTime() time.Time {
ms.mutex.RLock()
defer ms.mutex.RUnlock()
return ms.lastSaveTime
}
// 辅助函数保存JSON到文件
func saveJSONToFile(filename string, data interface{}) error {
// 创建临时文件
tempFile := filename + ".tmp"
// 将数据编码为JSON
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
// 写入临时文件
if err := os.WriteFile(tempFile, jsonData, 0644); err != nil {
return err
}
// 重命名临时文件为目标文件(原子操作)
return os.Rename(tempFile, filename)
}
// 辅助函数从文件加载JSON
func loadJSONFromFile(filename string, data interface{}) error {
// 读取文件内容
jsonData, err := os.ReadFile(filename)
if err != nil {
return err
}
// 解码JSON数据
return json.Unmarshal(jsonData, data)
}
// 辅助函数:检查文件是否存在
func fileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
}

10
main.go
View File

@ -31,6 +31,12 @@ func main() {
log.Fatal("Error initializing metrics collector:", err) log.Fatal("Error initializing metrics collector:", err)
} }
// 初始化指标存储服务
if err := metrics.InitMetricsStorage(cfg); err != nil {
log.Printf("Warning: Failed to initialize metrics storage: %v", err)
// 不致命,继续运行
}
// 创建压缩管理器 // 创建压缩管理器
compManager := compression.NewManager(compression.Config{ compManager := compression.NewManager(compression.Config{
Gzip: compression.CompressorConfig(cfg.Compression.Gzip), Gzip: compression.CompressorConfig(cfg.Compression.Gzip),
@ -169,6 +175,10 @@ func main() {
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan <-sigChan
log.Println("Shutting down server...") log.Println("Shutting down server...")
// 停止指标存储服务
metrics.StopMetricsStorage()
if err := server.Close(); err != nil { if err := server.Close(); err != nil {
log.Printf("Error during server shutdown: %v\n", err) log.Printf("Error during server shutdown: %v\n", err)
} }