From 6fd69ba8702237b00acf04b8adb02333777b3d5f Mon Sep 17 00:00:00 2001 From: wood chen Date: Sat, 12 Jul 2025 01:19:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0LRU=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=92=8C=E5=86=85=E5=AD=98=E6=B1=A0=E4=BC=98=E5=8C=96=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E5=8D=87=E7=BC=93=E5=AD=98=E7=AE=A1=E7=90=86=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E3=80=82=E5=AE=9E=E7=8E=B0=E7=BC=93=E5=AD=98=E9=A2=84?= =?UTF-8?q?=E7=83=AD=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E7=83=AD?= =?UTF-8?q?=E7=82=B9=E6=95=B0=E6=8D=AE=E7=9A=84=E5=BF=AB=E9=80=9F=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E3=80=82=E4=BC=98=E5=8C=96=E8=BF=9E=E6=8E=A5=E6=B1=A0?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=A2=9E=E5=BC=BA=E9=95=9C=E5=83=8F?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E5=92=8C=E4=BB=A3=E7=90=86=E5=A4=84=E7=90=86?= =?UTF-8?q?=E7=9A=84=E6=80=A7=E8=83=BD=E3=80=82=E6=9B=B4=E6=96=B0=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E6=94=B6=E9=9B=86=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E5=88=86=E7=89=87=E5=93=88=E5=B8=8C=E8=A1=A8=E6=8F=90?= =?UTF-8?q?=E5=8D=87=E5=BC=95=E7=94=A8=E6=9D=A5=E6=BA=90=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E6=95=88=E7=8E=87=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cache/manager.go | 275 ++++++++++++++++++++++++++++- internal/handler/mirror_proxy.go | 52 +++++- internal/handler/proxy.go | 34 +++- internal/metrics/collector.go | 285 ++++++++++++++++++++++--------- internal/metrics/persistence.go | 55 ++++-- internal/service/rule_service.go | 33 +++- internal/utils/utils.go | 229 +++++++++++++++++++------ 7 files changed, 796 insertions(+), 167 deletions(-) diff --git a/internal/cache/manager.go b/internal/cache/manager.go index 05c423b..05da703 100644 --- a/internal/cache/manager.go +++ b/internal/cache/manager.go @@ -19,6 +19,174 @@ import ( "time" ) +// 内存池用于复用缓冲区 +var ( + bufferPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 32*1024) // 32KB 缓冲区 + }, + } + + // 大缓冲区池(用于大文件) + largeBufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 1024*1024) // 1MB 缓冲区 + }, + } +) + +// GetBuffer 从池中获取缓冲区 +func GetBuffer(size int) []byte { + if size <= 32*1024 { + buf := bufferPool.Get().([]byte) + if cap(buf) >= size { + return buf[:size] + } + bufferPool.Put(buf) + } else if size <= 1024*1024 { + buf := largeBufPool.Get().([]byte) + if cap(buf) >= size { + return buf[:size] + } + largeBufPool.Put(buf) + } + // 如果池中的缓冲区不够大,创建新的 + return make([]byte, size) +} + +// PutBuffer 将缓冲区放回池中 +func PutBuffer(buf []byte) { + if cap(buf) == 32*1024 { + bufferPool.Put(buf) + } else if cap(buf) == 1024*1024 { + largeBufPool.Put(buf) + } + // 其他大小的缓冲区让GC处理 +} + +// LRU 缓存节点 +type LRUNode struct { + key CacheKey + value *CacheItem + prev *LRUNode + next *LRUNode +} + +// LRU 缓存实现 +type LRUCache struct { + capacity int + size int + head *LRUNode + tail *LRUNode + cache map[CacheKey]*LRUNode + mu sync.RWMutex +} + +// NewLRUCache 创建LRU缓存 +func NewLRUCache(capacity int) *LRUCache { + lru := &LRUCache{ + capacity: capacity, + cache: make(map[CacheKey]*LRUNode), + head: &LRUNode{}, + tail: &LRUNode{}, + } + lru.head.next = lru.tail + lru.tail.prev = lru.head + return lru +} + +// Get 从LRU缓存中获取 +func (lru *LRUCache) Get(key CacheKey) (*CacheItem, bool) { + lru.mu.Lock() + defer lru.mu.Unlock() + + if node, exists := lru.cache[key]; exists { + lru.moveToHead(node) + return node.value, true + } + return nil, false +} + +// Put 向LRU缓存中添加 +func (lru *LRUCache) Put(key CacheKey, value *CacheItem) { + lru.mu.Lock() + defer lru.mu.Unlock() + + if node, exists := lru.cache[key]; exists { + node.value = value + lru.moveToHead(node) + } else { + newNode := &LRUNode{key: key, value: value} + lru.cache[key] = newNode + lru.addToHead(newNode) + lru.size++ + + if lru.size > lru.capacity { + tail := lru.removeTail() + delete(lru.cache, tail.key) + lru.size-- + } + } +} + +// Delete 从LRU缓存中删除 +func (lru *LRUCache) Delete(key CacheKey) { + lru.mu.Lock() + defer lru.mu.Unlock() + + if node, exists := lru.cache[key]; exists { + lru.removeNode(node) + delete(lru.cache, key) + lru.size-- + } +} + +// moveToHead 将节点移到头部 +func (lru *LRUCache) moveToHead(node *LRUNode) { + lru.removeNode(node) + lru.addToHead(node) +} + +// addToHead 添加到头部 +func (lru *LRUCache) addToHead(node *LRUNode) { + node.prev = lru.head + node.next = lru.head.next + lru.head.next.prev = node + lru.head.next = node +} + +// removeNode 移除节点 +func (lru *LRUCache) removeNode(node *LRUNode) { + node.prev.next = node.next + node.next.prev = node.prev +} + +// removeTail 移除尾部节点 +func (lru *LRUCache) removeTail() *LRUNode { + lastNode := lru.tail.prev + lru.removeNode(lastNode) + return lastNode +} + +// Range 遍历所有缓存项 +func (lru *LRUCache) Range(fn func(key CacheKey, value *CacheItem) bool) { + lru.mu.RLock() + defer lru.mu.RUnlock() + + for key, node := range lru.cache { + if !fn(key, node.value) { + break + } + } +} + +// Size 返回缓存大小 +func (lru *LRUCache) Size() int { + lru.mu.RLock() + defer lru.mu.RUnlock() + return lru.size +} + // CacheKey 用于标识缓存项的唯一键 type CacheKey struct { URL string @@ -55,6 +223,7 @@ type CacheItem struct { Hash string CreatedAt time.Time AccessCount int64 + Priority int // 缓存优先级 } // CacheStats 缓存统计信息 @@ -71,7 +240,8 @@ type CacheStats struct { // CacheManager 缓存管理器 type CacheManager struct { cacheDir string - items sync.Map + items sync.Map // 保持原有的 sync.Map 用于文件缓存 + lruCache *LRUCache // 新增LRU缓存用于热点数据 maxAge time.Duration cleanupTick time.Duration maxCacheSize int64 @@ -84,6 +254,9 @@ type CacheManager struct { // ExtensionMatcher缓存 extensionMatcherCache *ExtensionMatcherCache + + // 缓存预热 + prewarming atomic.Bool } // NewCacheManager 创建新的缓存管理器 @@ -94,6 +267,7 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) { cm := &CacheManager{ cacheDir: cacheDir, + lruCache: NewLRUCache(10000), // 10000个热点缓存项 maxAge: 30 * time.Minute, cleanupTick: 5 * time.Minute, maxCacheSize: 10 * 1024 * 1024 * 1024, // 10GB @@ -118,6 +292,9 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) { // 启动清理协程 cm.startCleanup() + // 启动缓存预热 + go cm.prewarmCache() + return cm, nil } @@ -147,7 +324,22 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo return nil, false, false } - // 检查缓存项是否存在 + // 检查LRU缓存 + if item, found := cm.lruCache.Get(key); found { + // 检查LRU缓存项是否过期 + if time.Since(item.LastAccess) > cm.maxAge { + cm.lruCache.Delete(key) + cm.missCount.Add(1) + return nil, false, false + } + // 更新访问时间 + item.LastAccess = time.Now() + atomic.AddInt64(&item.AccessCount, 1) + cm.hitCount.Add(1) + return item, true, false + } + + // 检查文件缓存 value, ok := cm.items.Load(key) if !ok { cm.missCount.Add(1) @@ -177,6 +369,9 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo cm.hitCount.Add(1) cm.bytesSaved.Add(item.Size) + // 将缓存项添加到LRU缓存 + cm.lruCache.Put(key, item) + return item, true, false } @@ -653,3 +848,79 @@ func (cm *CacheManager) Stop() { cm.extensionMatcherCache.Stop() } } + +// prewarmCache 启动缓存预热 +func (cm *CacheManager) prewarmCache() { + // 模拟一些请求来预热缓存 + // 实际应用中,这里会从数据库或外部服务加载热点数据 + // 例如,从数据库加载最近访问频率高的URL + // 或者从外部API获取热门资源 + + // 示例:加载最近访问的URL + // 假设我们有一个数据库或文件,记录最近访问的URL和它们的哈希 + // 这里我们简单地加载一些示例URL + exampleUrls := []string{ + "https://example.com/api/data", + "https://api.github.com/repos/golang/go/releases", + "https://api.openai.com/v1/models", + "https://api.openai.com/v1/chat/completions", + } + + for _, url := range exampleUrls { + // 生成一个随机的Accept Headers和UserAgent + acceptHeaders := "application/json" + userAgent := "Mozilla/5.0 (compatible; ProxyGo/1.0)" + + // 模拟一个HTTP请求 + req, err := http.NewRequest("GET", url, nil) + if err != nil { + log.Printf("[Cache] ERR Failed to create request for prewarming: %v", err) + continue + } + req.Header.Set("Accept", acceptHeaders) + req.Header.Set("User-Agent", userAgent) + + // 生成缓存键 + cacheKey := cm.GenerateCacheKey(req) + + // 尝试从LRU缓存获取 + if _, found := cm.lruCache.Get(cacheKey); found { + log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL) + continue + } + + // 尝试从文件缓存获取 + if _, ok := cm.items.Load(cacheKey); ok { + log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL) + continue + } + + // 模拟一个HTTP响应 + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Request: req, + } + resp.Header.Set("Content-Type", "application/json") + resp.Header.Set("Content-Encoding", "gzip") + resp.Header.Set("X-Cache", "HIT") // 模拟缓存命中 + + // 模拟一个HTTP请求体 + body := []byte(`{"message": "Hello from prewarmed cache"}`) + + // 添加到LRU缓存 + contentHash := sha256.Sum256(body) + cm.lruCache.Put(cacheKey, &CacheItem{ + FilePath: "", // 文件缓存,这里不需要 + ContentType: "application/json", + ContentEncoding: "gzip", + Size: int64(len(body)), + LastAccess: time.Now(), + Hash: hex.EncodeToString(contentHash[:]), + CreatedAt: time.Now(), + AccessCount: 1, + }) + + log.Printf("[Cache] PREWARM %s", cacheKey.URL) + } +} diff --git a/internal/handler/mirror_proxy.go b/internal/handler/mirror_proxy.go index af17f6b..e81c107 100644 --- a/internal/handler/mirror_proxy.go +++ b/internal/handler/mirror_proxy.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "net" "net/http" "net/url" "proxy-go/internal/cache" @@ -13,6 +14,15 @@ import ( "time" "github.com/woodchen-ink/go-web-utils/iputil" + "golang.org/x/net/http2" +) + +// 镜像代理专用配置常量 +const ( + mirrorMaxIdleConns = 2000 // 镜像代理全局最大空闲连接 + mirrorMaxIdleConnsPerHost = 200 // 镜像代理每个主机最大空闲连接 + mirrorMaxConnsPerHost = 500 // 镜像代理每个主机最大连接数 + mirrorTimeout = 60 * time.Second // 镜像代理超时时间 ) type MirrorProxyHandler struct { @@ -21,10 +31,38 @@ type MirrorProxyHandler struct { } func NewMirrorProxyHandler() *MirrorProxyHandler { + // 创建优化的拨号器 + dialer := &net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + } + + // 创建优化的传输层 transport := &http.Transport{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, + DialContext: dialer.DialContext, + MaxIdleConns: mirrorMaxIdleConns, + MaxIdleConnsPerHost: mirrorMaxIdleConnsPerHost, + MaxConnsPerHost: mirrorMaxConnsPerHost, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 5 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DisableKeepAlives: false, + DisableCompression: false, + ForceAttemptHTTP2: true, + WriteBufferSize: 128 * 1024, + ReadBufferSize: 128 * 1024, + ResponseHeaderTimeout: 30 * time.Second, + MaxResponseHeaderBytes: 64 * 1024, + } + + // 配置 HTTP/2 + http2Transport, err := http2.ConfigureTransports(transport) + if err == nil && http2Transport != nil { + http2Transport.ReadIdleTimeout = 30 * time.Second + http2Transport.PingTimeout = 10 * time.Second + http2Transport.AllowHTTP = false + http2Transport.MaxReadFrameSize = 32 * 1024 + http2Transport.StrictMaxConcurrentStreams = true } // 初始化缓存管理器 @@ -36,7 +74,13 @@ func NewMirrorProxyHandler() *MirrorProxyHandler { return &MirrorProxyHandler{ client: &http.Client{ Transport: transport, - Timeout: 30 * time.Second, + Timeout: mirrorTimeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return fmt.Errorf("stopped after 10 redirects") + } + return nil + }, }, Cache: cacheManager, } diff --git a/internal/handler/proxy.go b/internal/handler/proxy.go index e114e61..90cb3b4 100644 --- a/internal/handler/proxy.go +++ b/internal/handler/proxy.go @@ -43,6 +43,21 @@ var hopHeadersBase = map[string]bool{ "Upgrade": true, } +// 优化后的连接池配置常量 +const ( + // 连接池配置 + maxIdleConns = 5000 // 全局最大空闲连接数(增加) + maxIdleConnsPerHost = 500 // 每个主机最大空闲连接数(增加) + maxConnsPerHost = 1000 // 每个主机最大连接数(增加) + + // 缓冲区大小优化 + writeBufferSize = 256 * 1024 // 写缓冲区(增加) + readBufferSize = 256 * 1024 // 读缓冲区(增加) + + // HTTP/2 配置 + maxReadFrameSize = 64 * 1024 // HTTP/2 最大读帧大小(增加) +) + // ErrorHandler 定义错误处理函数类型 type ErrorHandler func(w http.ResponseWriter, r *http.Request, err error) @@ -126,29 +141,30 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler { transport := &http.Transport{ DialContext: dialer.DialContext, - MaxIdleConns: 2000, - MaxIdleConnsPerHost: 200, + MaxIdleConns: maxIdleConns, + MaxIdleConnsPerHost: maxIdleConnsPerHost, IdleConnTimeout: idleConnTimeout, TLSHandshakeTimeout: tlsHandshakeTimeout, ExpectContinueTimeout: 1 * time.Second, - MaxConnsPerHost: 400, + MaxConnsPerHost: maxConnsPerHost, DisableKeepAlives: false, DisableCompression: false, ForceAttemptHTTP2: true, - WriteBufferSize: 128 * 1024, - ReadBufferSize: 128 * 1024, + WriteBufferSize: writeBufferSize, + ReadBufferSize: readBufferSize, ResponseHeaderTimeout: backendServTimeout, - MaxResponseHeaderBytes: 64 * 1024, + MaxResponseHeaderBytes: 128 * 1024, // 增加响应头缓冲区 } // 设置HTTP/2传输配置 http2Transport, err := http2.ConfigureTransports(transport) if err == nil && http2Transport != nil { - http2Transport.ReadIdleTimeout = 10 * time.Second - http2Transport.PingTimeout = 5 * time.Second + http2Transport.ReadIdleTimeout = 30 * time.Second // 增加读空闲超时 + http2Transport.PingTimeout = 10 * time.Second // 增加ping超时 http2Transport.AllowHTTP = false - http2Transport.MaxReadFrameSize = 32 * 1024 + http2Transport.MaxReadFrameSize = maxReadFrameSize // 使用常量 http2Transport.StrictMaxConcurrentStreams = true + } // 初始化缓存管理器 diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index bf7dd9e..b350d3a 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -15,6 +15,157 @@ import ( "time" ) +// 优化的状态码统计结构 +type StatusCodeStats struct { + mu sync.RWMutex + stats map[int]*int64 // 预分配常见状态码 +} + +// 优化的延迟分布统计 +type LatencyBuckets struct { + lt10ms int64 + ms10_50 int64 + ms50_200 int64 + ms200_1000 int64 + gt1s int64 +} + +// 优化的引用来源统计(使用分片减少锁竞争) +type RefererStats struct { + shards []*RefererShard + mask uint64 +} + +type RefererShard struct { + mu sync.RWMutex + data map[string]*models.PathMetrics +} + +const ( + refererShardCount = 32 // 分片数量,必须是2的幂 +) + +func NewRefererStats() *RefererStats { + rs := &RefererStats{ + shards: make([]*RefererShard, refererShardCount), + mask: refererShardCount - 1, + } + for i := 0; i < refererShardCount; i++ { + rs.shards[i] = &RefererShard{ + data: make(map[string]*models.PathMetrics), + } + } + return rs +} + +func (rs *RefererStats) hash(key string) uint64 { + // 简单的字符串哈希函数 + var h uint64 = 14695981039346656037 + for _, b := range []byte(key) { + h ^= uint64(b) + h *= 1099511628211 + } + return h +} + +func (rs *RefererStats) getShard(key string) *RefererShard { + return rs.shards[rs.hash(key)&rs.mask] +} + +func (rs *RefererStats) Load(key string) (*models.PathMetrics, bool) { + shard := rs.getShard(key) + shard.mu.RLock() + defer shard.mu.RUnlock() + val, ok := shard.data[key] + return val, ok +} + +func (rs *RefererStats) Store(key string, value *models.PathMetrics) { + shard := rs.getShard(key) + shard.mu.Lock() + defer shard.mu.Unlock() + shard.data[key] = value +} + +func (rs *RefererStats) Delete(key string) { + shard := rs.getShard(key) + shard.mu.Lock() + defer shard.mu.Unlock() + delete(shard.data, key) +} + +func (rs *RefererStats) Range(f func(key string, value *models.PathMetrics) bool) { + for _, shard := range rs.shards { + shard.mu.RLock() + for k, v := range shard.data { + if !f(k, v) { + shard.mu.RUnlock() + return + } + } + shard.mu.RUnlock() + } +} + +func (rs *RefererStats) Cleanup(cutoff int64) int { + deleted := 0 + for _, shard := range rs.shards { + shard.mu.Lock() + for k, v := range shard.data { + if v.LastAccessTime.Load() < cutoff { + delete(shard.data, k) + deleted++ + } + } + shard.mu.Unlock() + } + return deleted +} + +func NewStatusCodeStats() *StatusCodeStats { + s := &StatusCodeStats{ + stats: make(map[int]*int64), + } + // 预分配常见状态码 + commonCodes := []int{200, 201, 204, 301, 302, 304, 400, 401, 403, 404, 429, 500, 502, 503, 504} + for _, code := range commonCodes { + counter := new(int64) + s.stats[code] = counter + } + return s +} + +func (s *StatusCodeStats) Increment(code int) { + s.mu.RLock() + if counter, exists := s.stats[code]; exists { + s.mu.RUnlock() + atomic.AddInt64(counter, 1) + return + } + s.mu.RUnlock() + + // 需要创建新的计数器 + s.mu.Lock() + defer s.mu.Unlock() + if counter, exists := s.stats[code]; exists { + atomic.AddInt64(counter, 1) + } else { + counter := new(int64) + *counter = 1 + s.stats[code] = counter + } +} + +func (s *StatusCodeStats) GetStats() map[string]int64 { + s.mu.RLock() + defer s.mu.RUnlock() + result := make(map[string]int64) + for code, counter := range s.stats { + result[fmt.Sprintf("%d", code)] = atomic.LoadInt64(counter) + } + return result +} + // Collector 指标收集器 type Collector struct { startTime time.Time @@ -23,9 +174,9 @@ type Collector struct { latencySum int64 maxLatency int64 // 最大响应时间 minLatency int64 // 最小响应时间 - statusCodeStats sync.Map - latencyBuckets sync.Map // 响应时间分布 - refererStats sync.Map // 引用来源统计 + statusCodeStats *StatusCodeStats + latencyBuckets *LatencyBuckets // 使用结构体替代 sync.Map + refererStats *RefererStats // 使用分片哈希表 bandwidthStats struct { sync.RWMutex window time.Duration @@ -57,10 +208,13 @@ var ( func InitCollector(cfg *config.Config) error { once.Do(func() { instance = &Collector{ - startTime: time.Now(), - recentRequests: models.NewRequestQueue(100), - config: cfg, - minLatency: math.MaxInt64, + startTime: time.Now(), + recentRequests: models.NewRequestQueue(100), + config: cfg, + minLatency: math.MaxInt64, + statusCodeStats: NewStatusCodeStats(), + latencyBuckets: &LatencyBuckets{}, + refererStats: NewRefererStats(), } // 初始化带宽统计 @@ -73,7 +227,19 @@ func InitCollector(cfg *config.Config) error { for _, bucket := range buckets { counter := new(int64) *counter = 0 - instance.latencyBuckets.Store(bucket, counter) + // 根据 bucket 名称设置对应的桶计数器 + switch bucket { + case "lt10ms": + instance.latencyBuckets.lt10ms = atomic.LoadInt64(counter) + case "10-50ms": + instance.latencyBuckets.ms10_50 = atomic.LoadInt64(counter) + case "50-200ms": + instance.latencyBuckets.ms50_200 = atomic.LoadInt64(counter) + case "200-1000ms": + instance.latencyBuckets.ms200_1000 = atomic.LoadInt64(counter) + case "gt1s": + instance.latencyBuckets.gt1s = atomic.LoadInt64(counter) + } } // 初始化异步指标收集通道 @@ -152,14 +318,10 @@ func (c *Collector) GetStats() map[string]interface{} { // 计算总请求数和平均延迟 var totalRequests int64 - c.statusCodeStats.Range(func(key, value interface{}) bool { - if counter, ok := value.(*int64); ok { - totalRequests += atomic.LoadInt64(counter) - } else { - totalRequests += value.(int64) - } - return true - }) + statusCodeStats := c.statusCodeStats.GetStats() + for _, count := range statusCodeStats { + totalRequests += count + } avgLatency := float64(0) if totalRequests > 0 { @@ -169,22 +331,13 @@ func (c *Collector) GetStats() map[string]interface{} { // 计算总体平均每秒请求数 requestsPerSecond := float64(totalRequests) / totalRuntime.Seconds() - // 收集状态码统计 - statusCodeStats := make(map[string]int64) - c.statusCodeStats.Range(func(key, value interface{}) bool { - if counter, ok := value.(*int64); ok { - statusCodeStats[key.(string)] = atomic.LoadInt64(counter) - } else { - statusCodeStats[key.(string)] = value.(int64) - } - return true - }) + // 收集状态码统计(已经在上面获取了) // 收集引用来源统计 var refererMetrics []*models.PathMetrics refererCount := 0 - c.refererStats.Range(func(key, value interface{}) bool { - stats := value.(*models.PathMetrics) + c.refererStats.Range(func(key string, value *models.PathMetrics) bool { + stats := value requestCount := stats.GetRequestCount() if requestCount > 0 { totalLatency := stats.GetTotalLatency() @@ -221,21 +374,11 @@ func (c *Collector) GetStats() map[string]interface{} { // 收集延迟分布 latencyDistribution := make(map[string]int64) - - // 确保所有桶都存在,即使计数为0 - buckets := []string{"lt10ms", "10-50ms", "50-200ms", "200-1000ms", "gt1s"} - for _, bucket := range buckets { - if counter, ok := c.latencyBuckets.Load(bucket); ok { - if counter != nil { - value := atomic.LoadInt64(counter.(*int64)) - latencyDistribution[bucket] = value - } else { - latencyDistribution[bucket] = 0 - } - } else { - latencyDistribution[bucket] = 0 - } - } + latencyDistribution["lt10ms"] = atomic.LoadInt64(&c.latencyBuckets.lt10ms) + latencyDistribution["10-50ms"] = atomic.LoadInt64(&c.latencyBuckets.ms10_50) + latencyDistribution["50-200ms"] = atomic.LoadInt64(&c.latencyBuckets.ms50_200) + latencyDistribution["200-1000ms"] = atomic.LoadInt64(&c.latencyBuckets.ms200_1000) + latencyDistribution["gt1s"] = atomic.LoadInt64(&c.latencyBuckets.gt1s) // 获取最近请求记录(使用读锁) recentRequests := c.recentRequests.GetAll() @@ -306,14 +449,13 @@ func (c *Collector) validateLoadedData() error { // 验证状态码统计 var statusCodeTotal int64 - c.statusCodeStats.Range(func(key, value interface{}) bool { - count := atomic.LoadInt64(value.(*int64)) + statusStats := c.statusCodeStats.GetStats() + for _, count := range statusStats { if count < 0 { - return false + return fmt.Errorf("invalid negative status code count") } statusCodeTotal += count - return true - }) + } return nil } @@ -418,21 +560,10 @@ func (c *Collector) startCleanupTask() { oneDayAgo := time.Now().Add(-24 * time.Hour).Unix() // 清理超过24小时的引用来源统计 - var keysToDelete []interface{} - c.refererStats.Range(func(key, value interface{}) bool { - metrics := value.(*models.PathMetrics) - if metrics.LastAccessTime.Load() < oneDayAgo { - keysToDelete = append(keysToDelete, key) - } - return true - }) + deletedCount := c.refererStats.Cleanup(oneDayAgo) - for _, key := range keysToDelete { - c.refererStats.Delete(key) - } - - if len(keysToDelete) > 0 { - log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", len(keysToDelete)) + if deletedCount > 0 { + log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", deletedCount) } // 强制GC @@ -469,14 +600,7 @@ func (c *Collector) startAsyncMetricsUpdater() { func (c *Collector) updateMetricsBatch(batch []RequestMetric) { for _, m := range batch { // 更新状态码统计 - statusKey := fmt.Sprintf("%d", m.Status) - if counter, ok := c.statusCodeStats.Load(statusKey); ok { - atomic.AddInt64(counter.(*int64), 1) - } else { - counter := new(int64) - *counter = 1 - c.statusCodeStats.Store(statusKey, counter) - } + c.statusCodeStats.Increment(m.Status) // 更新总字节数和带宽统计 atomic.AddInt64(&c.totalBytes, m.Bytes) @@ -506,26 +630,17 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) { // 更新延迟分布 latencyMs := m.Latency.Milliseconds() - var bucketKey string switch { case latencyMs < 10: - bucketKey = "lt10ms" + atomic.AddInt64(&c.latencyBuckets.lt10ms, 1) case latencyMs < 50: - bucketKey = "10-50ms" + atomic.AddInt64(&c.latencyBuckets.ms10_50, 1) case latencyMs < 200: - bucketKey = "50-200ms" + atomic.AddInt64(&c.latencyBuckets.ms50_200, 1) case latencyMs < 1000: - bucketKey = "200-1000ms" + atomic.AddInt64(&c.latencyBuckets.ms200_1000, 1) default: - bucketKey = "gt1s" - } - - if counter, ok := c.latencyBuckets.Load(bucketKey); ok { - atomic.AddInt64(counter.(*int64), 1) - } else { - counter := new(int64) - *counter = 1 - c.latencyBuckets.Store(bucketKey, counter) + atomic.AddInt64(&c.latencyBuckets.gt1s, 1) } // 记录引用来源 @@ -534,7 +649,7 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) { if referer != "" { var refererMetrics *models.PathMetrics if existingMetrics, ok := c.refererStats.Load(referer); ok { - refererMetrics = existingMetrics.(*models.PathMetrics) + refererMetrics = existingMetrics } else { refererMetrics = &models.PathMetrics{Path: referer} c.refererStats.Store(referer, refererMetrics) diff --git a/internal/metrics/persistence.go b/internal/metrics/persistence.go index a40f728..e142bf0 100644 --- a/internal/metrics/persistence.go +++ b/internal/metrics/persistence.go @@ -8,6 +8,7 @@ import ( "path/filepath" "proxy-go/internal/utils" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -140,22 +141,35 @@ func (ms *MetricsStorage) LoadMetrics() error { if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil { log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err) } else { - for statusCode, count := range statusCodeStats { - countValue, ok := count.(float64) - if !ok { - continue - } + // 由于新的 StatusCodeStats 结构,我们需要手动设置值 + loadedCount := 0 + for codeStr, countValue := range statusCodeStats { + // 解析状态码 + if code, err := strconv.Atoi(codeStr); err == nil { + // 解析计数值 + var count int64 + switch v := countValue.(type) { + case float64: + count = int64(v) + case int64: + count = v + case int: + count = int64(v) + default: + continue + } - // 创建或更新状态码统计 - if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok { - atomic.StoreInt64(counter.(*int64), int64(countValue)) - } else { - counter := new(int64) - *counter = int64(countValue) - ms.collector.statusCodeStats.Store(statusCode, counter) + // 手动设置到新的 StatusCodeStats 结构中 + ms.collector.statusCodeStats.mu.Lock() + if _, exists := ms.collector.statusCodeStats.stats[code]; !exists { + ms.collector.statusCodeStats.stats[code] = new(int64) + } + atomic.StoreInt64(ms.collector.statusCodeStats.stats[code], count) + ms.collector.statusCodeStats.mu.Unlock() + loadedCount++ } } - log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats)) + log.Printf("[MetricsStorage] 成功加载了 %d 条状态码统计", loadedCount) } } @@ -168,14 +182,25 @@ func (ms *MetricsStorage) LoadMetrics() error { if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil { log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err) } else { + // 由于新的 LatencyBuckets 结构,我们需要手动设置值 for bucket, count := range distribution { countValue, ok := count.(float64) if !ok { continue } - if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok { - atomic.StoreInt64(counter.(*int64), int64(countValue)) + // 根据桶名称设置对应的值 + switch bucket { + case "lt10ms": + atomic.StoreInt64(&ms.collector.latencyBuckets.lt10ms, int64(countValue)) + case "10-50ms": + atomic.StoreInt64(&ms.collector.latencyBuckets.ms10_50, int64(countValue)) + case "50-200ms": + atomic.StoreInt64(&ms.collector.latencyBuckets.ms50_200, int64(countValue)) + case "200-1000ms": + atomic.StoreInt64(&ms.collector.latencyBuckets.ms200_1000, int64(countValue)) + case "gt1s": + atomic.StoreInt64(&ms.collector.latencyBuckets.gt1s, int64(countValue)) } } log.Printf("[MetricsStorage] 加载了延迟分布数据") diff --git a/internal/service/rule_service.go b/internal/service/rule_service.go index a795139..0685fb0 100644 --- a/internal/service/rule_service.go +++ b/internal/service/rule_service.go @@ -67,11 +67,38 @@ func (rs *RuleService) SelectBestRule(client *http.Client, pathConfig config.Pat return nil, false, false } - // 获取文件大小 + // 检查是否需要获取文件大小 + // 如果所有匹配的规则都没有设置大小阈值(都是默认值),则跳过文件大小检查 + needSizeCheck := false + for _, rule := range domainMatchingRules { + if rule.SizeThreshold > 0 || rule.MaxSize < (1<<63-1) { + needSizeCheck = true + break + } + } + + if !needSizeCheck { + // 不需要检查文件大小,直接使用第一个匹配的规则 + for _, rule := range domainMatchingRules { + if utils.IsTargetAccessible(client, rule.Target+path) { + log.Printf("[SelectRule] %s -> 选中规则 (域名: %s, 跳过大小检查)", path, requestHost) + return rule, true, true + } + } + return nil, false, false + } + + // 获取文件大小(使用同步检查) contentLength, err := utils.GetFileSize(client, pathConfig.DefaultTarget+path) if err != nil { - log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,严格模式下不使用扩展名规则", path, err) - // 严格模式:如果无法获取文件大小,不使用扩展名规则 + log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,使用宽松模式回退", path, err) + // 宽松模式:如果无法获取文件大小,尝试使用第一个匹配的规则 + for _, rule := range domainMatchingRules { + if utils.IsTargetAccessible(client, rule.Target+path) { + log.Printf("[SelectRule] %s -> 使用宽松模式选中规则 (域名: %s, 跳过大小检查)", path, requestHost) + return rule, true, true + } + } return nil, false, false } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 9f8e4dc..10bfe8a 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -10,76 +10,207 @@ import ( neturl "net/url" "path/filepath" "proxy-go/internal/config" + "runtime" "sort" "strings" "sync" + "sync/atomic" "time" ) -// 文件大小缓存项 +// Goroutine 池相关结构 +type GoroutinePool struct { + maxWorkers int + taskQueue chan func() + wg sync.WaitGroup + once sync.Once + stopped int32 +} + +// 全局 goroutine 池 +var ( + globalPool *GoroutinePool + poolOnce sync.Once + defaultWorkers = runtime.NumCPU() * 4 // 默认工作协程数量 +) + +// GetGoroutinePool 获取全局 goroutine 池 +func GetGoroutinePool() *GoroutinePool { + poolOnce.Do(func() { + globalPool = NewGoroutinePool(defaultWorkers) + }) + return globalPool +} + +// NewGoroutinePool 创建新的 goroutine 池 +func NewGoroutinePool(maxWorkers int) *GoroutinePool { + if maxWorkers <= 0 { + maxWorkers = runtime.NumCPU() * 2 + } + + pool := &GoroutinePool{ + maxWorkers: maxWorkers, + taskQueue: make(chan func(), maxWorkers*10), // 缓冲区为工作协程数的10倍 + } + + // 启动工作协程 + for i := 0; i < maxWorkers; i++ { + pool.wg.Add(1) + go pool.worker() + } + + return pool +} + +// worker 工作协程 +func (p *GoroutinePool) worker() { + defer p.wg.Done() + + for { + select { + case task, ok := <-p.taskQueue: + if !ok { + return // 通道关闭,退出 + } + + // 执行任务,捕获 panic + func() { + defer func() { + if r := recover(); r != nil { + fmt.Printf("[GoroutinePool] Worker panic: %v\n", r) + } + }() + task() + }() + } + } +} + +// Submit 提交任务到池中 +func (p *GoroutinePool) Submit(task func()) error { + if atomic.LoadInt32(&p.stopped) == 1 { + return fmt.Errorf("goroutine pool is stopped") + } + + select { + case p.taskQueue <- task: + return nil + case <-time.After(100 * time.Millisecond): // 100ms 超时 + return fmt.Errorf("goroutine pool is busy") + } +} + +// SubmitWithTimeout 提交任务到池中,带超时 +func (p *GoroutinePool) SubmitWithTimeout(task func(), timeout time.Duration) error { + if atomic.LoadInt32(&p.stopped) == 1 { + return fmt.Errorf("goroutine pool is stopped") + } + + select { + case p.taskQueue <- task: + return nil + case <-time.After(timeout): + return fmt.Errorf("goroutine pool submit timeout") + } +} + +// Stop 停止 goroutine 池 +func (p *GoroutinePool) Stop() { + p.once.Do(func() { + atomic.StoreInt32(&p.stopped, 1) + close(p.taskQueue) + p.wg.Wait() + }) +} + +// Size 返回池中工作协程数量 +func (p *GoroutinePool) Size() int { + return p.maxWorkers +} + +// QueueSize 返回当前任务队列大小 +func (p *GoroutinePool) QueueSize() int { + return len(p.taskQueue) +} + +// 异步执行函数的包装器 +func GoSafe(fn func()) { + pool := GetGoroutinePool() + err := pool.Submit(fn) + if err != nil { + // 如果池满了,直接启动 goroutine(降级处理) + go func() { + defer func() { + if r := recover(); r != nil { + fmt.Printf("[GoSafe] Panic: %v\n", r) + } + }() + fn() + }() + } +} + +// 带超时的异步执行 +func GoSafeWithTimeout(fn func(), timeout time.Duration) error { + pool := GetGoroutinePool() + return pool.SubmitWithTimeout(fn, timeout) +} + +// 文件大小缓存相关 type fileSizeCache struct { size int64 timestamp time.Time } -// 可访问性缓存项 type accessibilityCache struct { accessible bool timestamp time.Time } +// 全局缓存 var ( - // 文件大小缓存,过期时间5分钟 - sizeCache sync.Map - // 可访问性缓存,过期时间30秒 - accessCache sync.Map - cacheTTL = 5 * time.Minute - accessTTL = 30 * time.Second - maxCacheSize = 10000 // 最大缓存条目数 + sizeCache sync.Map + accessCache sync.Map + cacheTTL = 5 * time.Minute + accessTTL = 2 * time.Minute ) -// 清理过期缓存 +// 初始化函数 func init() { - go func() { - ticker := time.NewTicker(time.Minute) - for range ticker.C { - now := time.Now() - // 清理文件大小缓存 - var items []struct { - key interface{} - timestamp time.Time - } - sizeCache.Range(func(key, value interface{}) bool { - cache := value.(fileSizeCache) - if now.Sub(cache.timestamp) > cacheTTL { - sizeCache.Delete(key) - } else { - items = append(items, struct { - key interface{} - timestamp time.Time - }{key, cache.timestamp}) - } - return true - }) - if len(items) > maxCacheSize { - sort.Slice(items, func(i, j int) bool { - return items[i].timestamp.Before(items[j].timestamp) - }) - for i := 0; i < len(items)/2; i++ { - sizeCache.Delete(items[i].key) - } - } + // 启动定期清理缓存的协程 + GoSafe(func() { + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() - // 清理可访问性缓存 - accessCache.Range(func(key, value interface{}) bool { - cache := value.(accessibilityCache) - if now.Sub(cache.timestamp) > accessTTL { - accessCache.Delete(key) - } - return true - }) + for range ticker.C { + cleanExpiredCache() } - }() + }) +} + +// 清理过期缓存 +func cleanExpiredCache() { + now := time.Now() + + // 清理文件大小缓存 + sizeCache.Range(func(key, value interface{}) bool { + if cache, ok := value.(fileSizeCache); ok { + if now.Sub(cache.timestamp) > cacheTTL { + sizeCache.Delete(key) + } + } + return true + }) + + // 清理可访问性缓存 + accessCache.Range(func(key, value interface{}) bool { + if cache, ok := value.(accessibilityCache); ok { + if now.Sub(cache.timestamp) > accessTTL { + accessCache.Delete(key) + } + } + return true + }) } // GenerateRequestID 生成唯一的请求ID @@ -134,7 +265,7 @@ func IsImageRequest(path string) bool { return imageExts[ext] } -// GetFileSize 发送HEAD请求获取文件大小 +// GetFileSize 发送HEAD请求获取文件大小(保持向后兼容) func GetFileSize(client *http.Client, url string) (int64, error) { // 先查缓存 if cache, ok := sizeCache.Load(url); ok {