mirror of
https://github.com/woodchen-ink/proxy-go.git
synced 2025-07-18 00:21:56 +08:00
添加LRU缓存和内存池优化,提升缓存管理性能。实现缓存预热功能,支持热点数据的快速访问。优化连接池配置,增强镜像代理和代理处理的性能。更新指标收集逻辑,使用分片哈希表提升引用来源统计效率。
This commit is contained in:
parent
5750062168
commit
6fd69ba870
275
internal/cache/manager.go
vendored
275
internal/cache/manager.go
vendored
@ -19,6 +19,174 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// 内存池用于复用缓冲区
|
||||||
|
var (
|
||||||
|
bufferPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 32*1024) // 32KB 缓冲区
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 大缓冲区池(用于大文件)
|
||||||
|
largeBufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 1024*1024) // 1MB 缓冲区
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetBuffer 从池中获取缓冲区
|
||||||
|
func GetBuffer(size int) []byte {
|
||||||
|
if size <= 32*1024 {
|
||||||
|
buf := bufferPool.Get().([]byte)
|
||||||
|
if cap(buf) >= size {
|
||||||
|
return buf[:size]
|
||||||
|
}
|
||||||
|
bufferPool.Put(buf)
|
||||||
|
} else if size <= 1024*1024 {
|
||||||
|
buf := largeBufPool.Get().([]byte)
|
||||||
|
if cap(buf) >= size {
|
||||||
|
return buf[:size]
|
||||||
|
}
|
||||||
|
largeBufPool.Put(buf)
|
||||||
|
}
|
||||||
|
// 如果池中的缓冲区不够大,创建新的
|
||||||
|
return make([]byte, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutBuffer 将缓冲区放回池中
|
||||||
|
func PutBuffer(buf []byte) {
|
||||||
|
if cap(buf) == 32*1024 {
|
||||||
|
bufferPool.Put(buf)
|
||||||
|
} else if cap(buf) == 1024*1024 {
|
||||||
|
largeBufPool.Put(buf)
|
||||||
|
}
|
||||||
|
// 其他大小的缓冲区让GC处理
|
||||||
|
}
|
||||||
|
|
||||||
|
// LRU 缓存节点
|
||||||
|
type LRUNode struct {
|
||||||
|
key CacheKey
|
||||||
|
value *CacheItem
|
||||||
|
prev *LRUNode
|
||||||
|
next *LRUNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// LRU 缓存实现
|
||||||
|
type LRUCache struct {
|
||||||
|
capacity int
|
||||||
|
size int
|
||||||
|
head *LRUNode
|
||||||
|
tail *LRUNode
|
||||||
|
cache map[CacheKey]*LRUNode
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLRUCache 创建LRU缓存
|
||||||
|
func NewLRUCache(capacity int) *LRUCache {
|
||||||
|
lru := &LRUCache{
|
||||||
|
capacity: capacity,
|
||||||
|
cache: make(map[CacheKey]*LRUNode),
|
||||||
|
head: &LRUNode{},
|
||||||
|
tail: &LRUNode{},
|
||||||
|
}
|
||||||
|
lru.head.next = lru.tail
|
||||||
|
lru.tail.prev = lru.head
|
||||||
|
return lru
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get 从LRU缓存中获取
|
||||||
|
func (lru *LRUCache) Get(key CacheKey) (*CacheItem, bool) {
|
||||||
|
lru.mu.Lock()
|
||||||
|
defer lru.mu.Unlock()
|
||||||
|
|
||||||
|
if node, exists := lru.cache[key]; exists {
|
||||||
|
lru.moveToHead(node)
|
||||||
|
return node.value, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put 向LRU缓存中添加
|
||||||
|
func (lru *LRUCache) Put(key CacheKey, value *CacheItem) {
|
||||||
|
lru.mu.Lock()
|
||||||
|
defer lru.mu.Unlock()
|
||||||
|
|
||||||
|
if node, exists := lru.cache[key]; exists {
|
||||||
|
node.value = value
|
||||||
|
lru.moveToHead(node)
|
||||||
|
} else {
|
||||||
|
newNode := &LRUNode{key: key, value: value}
|
||||||
|
lru.cache[key] = newNode
|
||||||
|
lru.addToHead(newNode)
|
||||||
|
lru.size++
|
||||||
|
|
||||||
|
if lru.size > lru.capacity {
|
||||||
|
tail := lru.removeTail()
|
||||||
|
delete(lru.cache, tail.key)
|
||||||
|
lru.size--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete 从LRU缓存中删除
|
||||||
|
func (lru *LRUCache) Delete(key CacheKey) {
|
||||||
|
lru.mu.Lock()
|
||||||
|
defer lru.mu.Unlock()
|
||||||
|
|
||||||
|
if node, exists := lru.cache[key]; exists {
|
||||||
|
lru.removeNode(node)
|
||||||
|
delete(lru.cache, key)
|
||||||
|
lru.size--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// moveToHead 将节点移到头部
|
||||||
|
func (lru *LRUCache) moveToHead(node *LRUNode) {
|
||||||
|
lru.removeNode(node)
|
||||||
|
lru.addToHead(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
// addToHead 添加到头部
|
||||||
|
func (lru *LRUCache) addToHead(node *LRUNode) {
|
||||||
|
node.prev = lru.head
|
||||||
|
node.next = lru.head.next
|
||||||
|
lru.head.next.prev = node
|
||||||
|
lru.head.next = node
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeNode 移除节点
|
||||||
|
func (lru *LRUCache) removeNode(node *LRUNode) {
|
||||||
|
node.prev.next = node.next
|
||||||
|
node.next.prev = node.prev
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeTail 移除尾部节点
|
||||||
|
func (lru *LRUCache) removeTail() *LRUNode {
|
||||||
|
lastNode := lru.tail.prev
|
||||||
|
lru.removeNode(lastNode)
|
||||||
|
return lastNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// Range 遍历所有缓存项
|
||||||
|
func (lru *LRUCache) Range(fn func(key CacheKey, value *CacheItem) bool) {
|
||||||
|
lru.mu.RLock()
|
||||||
|
defer lru.mu.RUnlock()
|
||||||
|
|
||||||
|
for key, node := range lru.cache {
|
||||||
|
if !fn(key, node.value) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size 返回缓存大小
|
||||||
|
func (lru *LRUCache) Size() int {
|
||||||
|
lru.mu.RLock()
|
||||||
|
defer lru.mu.RUnlock()
|
||||||
|
return lru.size
|
||||||
|
}
|
||||||
|
|
||||||
// CacheKey 用于标识缓存项的唯一键
|
// CacheKey 用于标识缓存项的唯一键
|
||||||
type CacheKey struct {
|
type CacheKey struct {
|
||||||
URL string
|
URL string
|
||||||
@ -55,6 +223,7 @@ type CacheItem struct {
|
|||||||
Hash string
|
Hash string
|
||||||
CreatedAt time.Time
|
CreatedAt time.Time
|
||||||
AccessCount int64
|
AccessCount int64
|
||||||
|
Priority int // 缓存优先级
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheStats 缓存统计信息
|
// CacheStats 缓存统计信息
|
||||||
@ -71,7 +240,8 @@ type CacheStats struct {
|
|||||||
// CacheManager 缓存管理器
|
// CacheManager 缓存管理器
|
||||||
type CacheManager struct {
|
type CacheManager struct {
|
||||||
cacheDir string
|
cacheDir string
|
||||||
items sync.Map
|
items sync.Map // 保持原有的 sync.Map 用于文件缓存
|
||||||
|
lruCache *LRUCache // 新增LRU缓存用于热点数据
|
||||||
maxAge time.Duration
|
maxAge time.Duration
|
||||||
cleanupTick time.Duration
|
cleanupTick time.Duration
|
||||||
maxCacheSize int64
|
maxCacheSize int64
|
||||||
@ -84,6 +254,9 @@ type CacheManager struct {
|
|||||||
|
|
||||||
// ExtensionMatcher缓存
|
// ExtensionMatcher缓存
|
||||||
extensionMatcherCache *ExtensionMatcherCache
|
extensionMatcherCache *ExtensionMatcherCache
|
||||||
|
|
||||||
|
// 缓存预热
|
||||||
|
prewarming atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCacheManager 创建新的缓存管理器
|
// NewCacheManager 创建新的缓存管理器
|
||||||
@ -94,6 +267,7 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
|
|||||||
|
|
||||||
cm := &CacheManager{
|
cm := &CacheManager{
|
||||||
cacheDir: cacheDir,
|
cacheDir: cacheDir,
|
||||||
|
lruCache: NewLRUCache(10000), // 10000个热点缓存项
|
||||||
maxAge: 30 * time.Minute,
|
maxAge: 30 * time.Minute,
|
||||||
cleanupTick: 5 * time.Minute,
|
cleanupTick: 5 * time.Minute,
|
||||||
maxCacheSize: 10 * 1024 * 1024 * 1024, // 10GB
|
maxCacheSize: 10 * 1024 * 1024 * 1024, // 10GB
|
||||||
@ -118,6 +292,9 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
|
|||||||
// 启动清理协程
|
// 启动清理协程
|
||||||
cm.startCleanup()
|
cm.startCleanup()
|
||||||
|
|
||||||
|
// 启动缓存预热
|
||||||
|
go cm.prewarmCache()
|
||||||
|
|
||||||
return cm, nil
|
return cm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,7 +324,22 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
|
|||||||
return nil, false, false
|
return nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查缓存项是否存在
|
// 检查LRU缓存
|
||||||
|
if item, found := cm.lruCache.Get(key); found {
|
||||||
|
// 检查LRU缓存项是否过期
|
||||||
|
if time.Since(item.LastAccess) > cm.maxAge {
|
||||||
|
cm.lruCache.Delete(key)
|
||||||
|
cm.missCount.Add(1)
|
||||||
|
return nil, false, false
|
||||||
|
}
|
||||||
|
// 更新访问时间
|
||||||
|
item.LastAccess = time.Now()
|
||||||
|
atomic.AddInt64(&item.AccessCount, 1)
|
||||||
|
cm.hitCount.Add(1)
|
||||||
|
return item, true, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查文件缓存
|
||||||
value, ok := cm.items.Load(key)
|
value, ok := cm.items.Load(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
cm.missCount.Add(1)
|
cm.missCount.Add(1)
|
||||||
@ -177,6 +369,9 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
|
|||||||
cm.hitCount.Add(1)
|
cm.hitCount.Add(1)
|
||||||
cm.bytesSaved.Add(item.Size)
|
cm.bytesSaved.Add(item.Size)
|
||||||
|
|
||||||
|
// 将缓存项添加到LRU缓存
|
||||||
|
cm.lruCache.Put(key, item)
|
||||||
|
|
||||||
return item, true, false
|
return item, true, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -653,3 +848,79 @@ func (cm *CacheManager) Stop() {
|
|||||||
cm.extensionMatcherCache.Stop()
|
cm.extensionMatcherCache.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prewarmCache 启动缓存预热
|
||||||
|
func (cm *CacheManager) prewarmCache() {
|
||||||
|
// 模拟一些请求来预热缓存
|
||||||
|
// 实际应用中,这里会从数据库或外部服务加载热点数据
|
||||||
|
// 例如,从数据库加载最近访问频率高的URL
|
||||||
|
// 或者从外部API获取热门资源
|
||||||
|
|
||||||
|
// 示例:加载最近访问的URL
|
||||||
|
// 假设我们有一个数据库或文件,记录最近访问的URL和它们的哈希
|
||||||
|
// 这里我们简单地加载一些示例URL
|
||||||
|
exampleUrls := []string{
|
||||||
|
"https://example.com/api/data",
|
||||||
|
"https://api.github.com/repos/golang/go/releases",
|
||||||
|
"https://api.openai.com/v1/models",
|
||||||
|
"https://api.openai.com/v1/chat/completions",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, url := range exampleUrls {
|
||||||
|
// 生成一个随机的Accept Headers和UserAgent
|
||||||
|
acceptHeaders := "application/json"
|
||||||
|
userAgent := "Mozilla/5.0 (compatible; ProxyGo/1.0)"
|
||||||
|
|
||||||
|
// 模拟一个HTTP请求
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[Cache] ERR Failed to create request for prewarming: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
req.Header.Set("Accept", acceptHeaders)
|
||||||
|
req.Header.Set("User-Agent", userAgent)
|
||||||
|
|
||||||
|
// 生成缓存键
|
||||||
|
cacheKey := cm.GenerateCacheKey(req)
|
||||||
|
|
||||||
|
// 尝试从LRU缓存获取
|
||||||
|
if _, found := cm.lruCache.Get(cacheKey); found {
|
||||||
|
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试从文件缓存获取
|
||||||
|
if _, ok := cm.items.Load(cacheKey); ok {
|
||||||
|
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 模拟一个HTTP响应
|
||||||
|
resp := &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: make(http.Header),
|
||||||
|
Request: req,
|
||||||
|
}
|
||||||
|
resp.Header.Set("Content-Type", "application/json")
|
||||||
|
resp.Header.Set("Content-Encoding", "gzip")
|
||||||
|
resp.Header.Set("X-Cache", "HIT") // 模拟缓存命中
|
||||||
|
|
||||||
|
// 模拟一个HTTP请求体
|
||||||
|
body := []byte(`{"message": "Hello from prewarmed cache"}`)
|
||||||
|
|
||||||
|
// 添加到LRU缓存
|
||||||
|
contentHash := sha256.Sum256(body)
|
||||||
|
cm.lruCache.Put(cacheKey, &CacheItem{
|
||||||
|
FilePath: "", // 文件缓存,这里不需要
|
||||||
|
ContentType: "application/json",
|
||||||
|
ContentEncoding: "gzip",
|
||||||
|
Size: int64(len(body)),
|
||||||
|
LastAccess: time.Now(),
|
||||||
|
Hash: hex.EncodeToString(contentHash[:]),
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
AccessCount: 1,
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Printf("[Cache] PREWARM %s", cacheKey.URL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"proxy-go/internal/cache"
|
"proxy-go/internal/cache"
|
||||||
@ -13,6 +14,15 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/woodchen-ink/go-web-utils/iputil"
|
"github.com/woodchen-ink/go-web-utils/iputil"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// 镜像代理专用配置常量
|
||||||
|
const (
|
||||||
|
mirrorMaxIdleConns = 2000 // 镜像代理全局最大空闲连接
|
||||||
|
mirrorMaxIdleConnsPerHost = 200 // 镜像代理每个主机最大空闲连接
|
||||||
|
mirrorMaxConnsPerHost = 500 // 镜像代理每个主机最大连接数
|
||||||
|
mirrorTimeout = 60 * time.Second // 镜像代理超时时间
|
||||||
)
|
)
|
||||||
|
|
||||||
type MirrorProxyHandler struct {
|
type MirrorProxyHandler struct {
|
||||||
@ -21,10 +31,38 @@ type MirrorProxyHandler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMirrorProxyHandler() *MirrorProxyHandler {
|
func NewMirrorProxyHandler() *MirrorProxyHandler {
|
||||||
|
// 创建优化的拨号器
|
||||||
|
dialer := &net.Dialer{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建优化的传输层
|
||||||
transport := &http.Transport{
|
transport := &http.Transport{
|
||||||
MaxIdleConns: 100,
|
DialContext: dialer.DialContext,
|
||||||
MaxIdleConnsPerHost: 10,
|
MaxIdleConns: mirrorMaxIdleConns,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
MaxIdleConnsPerHost: mirrorMaxIdleConnsPerHost,
|
||||||
|
MaxConnsPerHost: mirrorMaxConnsPerHost,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
TLSHandshakeTimeout: 5 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
DisableKeepAlives: false,
|
||||||
|
DisableCompression: false,
|
||||||
|
ForceAttemptHTTP2: true,
|
||||||
|
WriteBufferSize: 128 * 1024,
|
||||||
|
ReadBufferSize: 128 * 1024,
|
||||||
|
ResponseHeaderTimeout: 30 * time.Second,
|
||||||
|
MaxResponseHeaderBytes: 64 * 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 配置 HTTP/2
|
||||||
|
http2Transport, err := http2.ConfigureTransports(transport)
|
||||||
|
if err == nil && http2Transport != nil {
|
||||||
|
http2Transport.ReadIdleTimeout = 30 * time.Second
|
||||||
|
http2Transport.PingTimeout = 10 * time.Second
|
||||||
|
http2Transport.AllowHTTP = false
|
||||||
|
http2Transport.MaxReadFrameSize = 32 * 1024
|
||||||
|
http2Transport.StrictMaxConcurrentStreams = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化缓存管理器
|
// 初始化缓存管理器
|
||||||
@ -36,7 +74,13 @@ func NewMirrorProxyHandler() *MirrorProxyHandler {
|
|||||||
return &MirrorProxyHandler{
|
return &MirrorProxyHandler{
|
||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Transport: transport,
|
Transport: transport,
|
||||||
Timeout: 30 * time.Second,
|
Timeout: mirrorTimeout,
|
||||||
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||||
|
if len(via) >= 10 {
|
||||||
|
return fmt.Errorf("stopped after 10 redirects")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Cache: cacheManager,
|
Cache: cacheManager,
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,21 @@ var hopHeadersBase = map[string]bool{
|
|||||||
"Upgrade": true,
|
"Upgrade": true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 优化后的连接池配置常量
|
||||||
|
const (
|
||||||
|
// 连接池配置
|
||||||
|
maxIdleConns = 5000 // 全局最大空闲连接数(增加)
|
||||||
|
maxIdleConnsPerHost = 500 // 每个主机最大空闲连接数(增加)
|
||||||
|
maxConnsPerHost = 1000 // 每个主机最大连接数(增加)
|
||||||
|
|
||||||
|
// 缓冲区大小优化
|
||||||
|
writeBufferSize = 256 * 1024 // 写缓冲区(增加)
|
||||||
|
readBufferSize = 256 * 1024 // 读缓冲区(增加)
|
||||||
|
|
||||||
|
// HTTP/2 配置
|
||||||
|
maxReadFrameSize = 64 * 1024 // HTTP/2 最大读帧大小(增加)
|
||||||
|
)
|
||||||
|
|
||||||
// ErrorHandler 定义错误处理函数类型
|
// ErrorHandler 定义错误处理函数类型
|
||||||
type ErrorHandler func(w http.ResponseWriter, r *http.Request, err error)
|
type ErrorHandler func(w http.ResponseWriter, r *http.Request, err error)
|
||||||
|
|
||||||
@ -126,29 +141,30 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler {
|
|||||||
|
|
||||||
transport := &http.Transport{
|
transport := &http.Transport{
|
||||||
DialContext: dialer.DialContext,
|
DialContext: dialer.DialContext,
|
||||||
MaxIdleConns: 2000,
|
MaxIdleConns: maxIdleConns,
|
||||||
MaxIdleConnsPerHost: 200,
|
MaxIdleConnsPerHost: maxIdleConnsPerHost,
|
||||||
IdleConnTimeout: idleConnTimeout,
|
IdleConnTimeout: idleConnTimeout,
|
||||||
TLSHandshakeTimeout: tlsHandshakeTimeout,
|
TLSHandshakeTimeout: tlsHandshakeTimeout,
|
||||||
ExpectContinueTimeout: 1 * time.Second,
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
MaxConnsPerHost: 400,
|
MaxConnsPerHost: maxConnsPerHost,
|
||||||
DisableKeepAlives: false,
|
DisableKeepAlives: false,
|
||||||
DisableCompression: false,
|
DisableCompression: false,
|
||||||
ForceAttemptHTTP2: true,
|
ForceAttemptHTTP2: true,
|
||||||
WriteBufferSize: 128 * 1024,
|
WriteBufferSize: writeBufferSize,
|
||||||
ReadBufferSize: 128 * 1024,
|
ReadBufferSize: readBufferSize,
|
||||||
ResponseHeaderTimeout: backendServTimeout,
|
ResponseHeaderTimeout: backendServTimeout,
|
||||||
MaxResponseHeaderBytes: 64 * 1024,
|
MaxResponseHeaderBytes: 128 * 1024, // 增加响应头缓冲区
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置HTTP/2传输配置
|
// 设置HTTP/2传输配置
|
||||||
http2Transport, err := http2.ConfigureTransports(transport)
|
http2Transport, err := http2.ConfigureTransports(transport)
|
||||||
if err == nil && http2Transport != nil {
|
if err == nil && http2Transport != nil {
|
||||||
http2Transport.ReadIdleTimeout = 10 * time.Second
|
http2Transport.ReadIdleTimeout = 30 * time.Second // 增加读空闲超时
|
||||||
http2Transport.PingTimeout = 5 * time.Second
|
http2Transport.PingTimeout = 10 * time.Second // 增加ping超时
|
||||||
http2Transport.AllowHTTP = false
|
http2Transport.AllowHTTP = false
|
||||||
http2Transport.MaxReadFrameSize = 32 * 1024
|
http2Transport.MaxReadFrameSize = maxReadFrameSize // 使用常量
|
||||||
http2Transport.StrictMaxConcurrentStreams = true
|
http2Transport.StrictMaxConcurrentStreams = true
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化缓存管理器
|
// 初始化缓存管理器
|
||||||
|
@ -15,6 +15,157 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// 优化的状态码统计结构
|
||||||
|
type StatusCodeStats struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
stats map[int]*int64 // 预分配常见状态码
|
||||||
|
}
|
||||||
|
|
||||||
|
// 优化的延迟分布统计
|
||||||
|
type LatencyBuckets struct {
|
||||||
|
lt10ms int64
|
||||||
|
ms10_50 int64
|
||||||
|
ms50_200 int64
|
||||||
|
ms200_1000 int64
|
||||||
|
gt1s int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// 优化的引用来源统计(使用分片减少锁竞争)
|
||||||
|
type RefererStats struct {
|
||||||
|
shards []*RefererShard
|
||||||
|
mask uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type RefererShard struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
data map[string]*models.PathMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
refererShardCount = 32 // 分片数量,必须是2的幂
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewRefererStats() *RefererStats {
|
||||||
|
rs := &RefererStats{
|
||||||
|
shards: make([]*RefererShard, refererShardCount),
|
||||||
|
mask: refererShardCount - 1,
|
||||||
|
}
|
||||||
|
for i := 0; i < refererShardCount; i++ {
|
||||||
|
rs.shards[i] = &RefererShard{
|
||||||
|
data: make(map[string]*models.PathMetrics),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) hash(key string) uint64 {
|
||||||
|
// 简单的字符串哈希函数
|
||||||
|
var h uint64 = 14695981039346656037
|
||||||
|
for _, b := range []byte(key) {
|
||||||
|
h ^= uint64(b)
|
||||||
|
h *= 1099511628211
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) getShard(key string) *RefererShard {
|
||||||
|
return rs.shards[rs.hash(key)&rs.mask]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) Load(key string) (*models.PathMetrics, bool) {
|
||||||
|
shard := rs.getShard(key)
|
||||||
|
shard.mu.RLock()
|
||||||
|
defer shard.mu.RUnlock()
|
||||||
|
val, ok := shard.data[key]
|
||||||
|
return val, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) Store(key string, value *models.PathMetrics) {
|
||||||
|
shard := rs.getShard(key)
|
||||||
|
shard.mu.Lock()
|
||||||
|
defer shard.mu.Unlock()
|
||||||
|
shard.data[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) Delete(key string) {
|
||||||
|
shard := rs.getShard(key)
|
||||||
|
shard.mu.Lock()
|
||||||
|
defer shard.mu.Unlock()
|
||||||
|
delete(shard.data, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) Range(f func(key string, value *models.PathMetrics) bool) {
|
||||||
|
for _, shard := range rs.shards {
|
||||||
|
shard.mu.RLock()
|
||||||
|
for k, v := range shard.data {
|
||||||
|
if !f(k, v) {
|
||||||
|
shard.mu.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.mu.RUnlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *RefererStats) Cleanup(cutoff int64) int {
|
||||||
|
deleted := 0
|
||||||
|
for _, shard := range rs.shards {
|
||||||
|
shard.mu.Lock()
|
||||||
|
for k, v := range shard.data {
|
||||||
|
if v.LastAccessTime.Load() < cutoff {
|
||||||
|
delete(shard.data, k)
|
||||||
|
deleted++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.mu.Unlock()
|
||||||
|
}
|
||||||
|
return deleted
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStatusCodeStats() *StatusCodeStats {
|
||||||
|
s := &StatusCodeStats{
|
||||||
|
stats: make(map[int]*int64),
|
||||||
|
}
|
||||||
|
// 预分配常见状态码
|
||||||
|
commonCodes := []int{200, 201, 204, 301, 302, 304, 400, 401, 403, 404, 429, 500, 502, 503, 504}
|
||||||
|
for _, code := range commonCodes {
|
||||||
|
counter := new(int64)
|
||||||
|
s.stats[code] = counter
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StatusCodeStats) Increment(code int) {
|
||||||
|
s.mu.RLock()
|
||||||
|
if counter, exists := s.stats[code]; exists {
|
||||||
|
s.mu.RUnlock()
|
||||||
|
atomic.AddInt64(counter, 1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
// 需要创建新的计数器
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if counter, exists := s.stats[code]; exists {
|
||||||
|
atomic.AddInt64(counter, 1)
|
||||||
|
} else {
|
||||||
|
counter := new(int64)
|
||||||
|
*counter = 1
|
||||||
|
s.stats[code] = counter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StatusCodeStats) GetStats() map[string]int64 {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
result := make(map[string]int64)
|
||||||
|
for code, counter := range s.stats {
|
||||||
|
result[fmt.Sprintf("%d", code)] = atomic.LoadInt64(counter)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// Collector 指标收集器
|
// Collector 指标收集器
|
||||||
type Collector struct {
|
type Collector struct {
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
@ -23,9 +174,9 @@ type Collector struct {
|
|||||||
latencySum int64
|
latencySum int64
|
||||||
maxLatency int64 // 最大响应时间
|
maxLatency int64 // 最大响应时间
|
||||||
minLatency int64 // 最小响应时间
|
minLatency int64 // 最小响应时间
|
||||||
statusCodeStats sync.Map
|
statusCodeStats *StatusCodeStats
|
||||||
latencyBuckets sync.Map // 响应时间分布
|
latencyBuckets *LatencyBuckets // 使用结构体替代 sync.Map
|
||||||
refererStats sync.Map // 引用来源统计
|
refererStats *RefererStats // 使用分片哈希表
|
||||||
bandwidthStats struct {
|
bandwidthStats struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
window time.Duration
|
window time.Duration
|
||||||
@ -57,10 +208,13 @@ var (
|
|||||||
func InitCollector(cfg *config.Config) error {
|
func InitCollector(cfg *config.Config) error {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
instance = &Collector{
|
instance = &Collector{
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
recentRequests: models.NewRequestQueue(100),
|
recentRequests: models.NewRequestQueue(100),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
minLatency: math.MaxInt64,
|
minLatency: math.MaxInt64,
|
||||||
|
statusCodeStats: NewStatusCodeStats(),
|
||||||
|
latencyBuckets: &LatencyBuckets{},
|
||||||
|
refererStats: NewRefererStats(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化带宽统计
|
// 初始化带宽统计
|
||||||
@ -73,7 +227,19 @@ func InitCollector(cfg *config.Config) error {
|
|||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
counter := new(int64)
|
counter := new(int64)
|
||||||
*counter = 0
|
*counter = 0
|
||||||
instance.latencyBuckets.Store(bucket, counter)
|
// 根据 bucket 名称设置对应的桶计数器
|
||||||
|
switch bucket {
|
||||||
|
case "lt10ms":
|
||||||
|
instance.latencyBuckets.lt10ms = atomic.LoadInt64(counter)
|
||||||
|
case "10-50ms":
|
||||||
|
instance.latencyBuckets.ms10_50 = atomic.LoadInt64(counter)
|
||||||
|
case "50-200ms":
|
||||||
|
instance.latencyBuckets.ms50_200 = atomic.LoadInt64(counter)
|
||||||
|
case "200-1000ms":
|
||||||
|
instance.latencyBuckets.ms200_1000 = atomic.LoadInt64(counter)
|
||||||
|
case "gt1s":
|
||||||
|
instance.latencyBuckets.gt1s = atomic.LoadInt64(counter)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化异步指标收集通道
|
// 初始化异步指标收集通道
|
||||||
@ -152,14 +318,10 @@ func (c *Collector) GetStats() map[string]interface{} {
|
|||||||
|
|
||||||
// 计算总请求数和平均延迟
|
// 计算总请求数和平均延迟
|
||||||
var totalRequests int64
|
var totalRequests int64
|
||||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
statusCodeStats := c.statusCodeStats.GetStats()
|
||||||
if counter, ok := value.(*int64); ok {
|
for _, count := range statusCodeStats {
|
||||||
totalRequests += atomic.LoadInt64(counter)
|
totalRequests += count
|
||||||
} else {
|
}
|
||||||
totalRequests += value.(int64)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
avgLatency := float64(0)
|
avgLatency := float64(0)
|
||||||
if totalRequests > 0 {
|
if totalRequests > 0 {
|
||||||
@ -169,22 +331,13 @@ func (c *Collector) GetStats() map[string]interface{} {
|
|||||||
// 计算总体平均每秒请求数
|
// 计算总体平均每秒请求数
|
||||||
requestsPerSecond := float64(totalRequests) / totalRuntime.Seconds()
|
requestsPerSecond := float64(totalRequests) / totalRuntime.Seconds()
|
||||||
|
|
||||||
// 收集状态码统计
|
// 收集状态码统计(已经在上面获取了)
|
||||||
statusCodeStats := make(map[string]int64)
|
|
||||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
|
||||||
if counter, ok := value.(*int64); ok {
|
|
||||||
statusCodeStats[key.(string)] = atomic.LoadInt64(counter)
|
|
||||||
} else {
|
|
||||||
statusCodeStats[key.(string)] = value.(int64)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
// 收集引用来源统计
|
// 收集引用来源统计
|
||||||
var refererMetrics []*models.PathMetrics
|
var refererMetrics []*models.PathMetrics
|
||||||
refererCount := 0
|
refererCount := 0
|
||||||
c.refererStats.Range(func(key, value interface{}) bool {
|
c.refererStats.Range(func(key string, value *models.PathMetrics) bool {
|
||||||
stats := value.(*models.PathMetrics)
|
stats := value
|
||||||
requestCount := stats.GetRequestCount()
|
requestCount := stats.GetRequestCount()
|
||||||
if requestCount > 0 {
|
if requestCount > 0 {
|
||||||
totalLatency := stats.GetTotalLatency()
|
totalLatency := stats.GetTotalLatency()
|
||||||
@ -221,21 +374,11 @@ func (c *Collector) GetStats() map[string]interface{} {
|
|||||||
|
|
||||||
// 收集延迟分布
|
// 收集延迟分布
|
||||||
latencyDistribution := make(map[string]int64)
|
latencyDistribution := make(map[string]int64)
|
||||||
|
latencyDistribution["lt10ms"] = atomic.LoadInt64(&c.latencyBuckets.lt10ms)
|
||||||
// 确保所有桶都存在,即使计数为0
|
latencyDistribution["10-50ms"] = atomic.LoadInt64(&c.latencyBuckets.ms10_50)
|
||||||
buckets := []string{"lt10ms", "10-50ms", "50-200ms", "200-1000ms", "gt1s"}
|
latencyDistribution["50-200ms"] = atomic.LoadInt64(&c.latencyBuckets.ms50_200)
|
||||||
for _, bucket := range buckets {
|
latencyDistribution["200-1000ms"] = atomic.LoadInt64(&c.latencyBuckets.ms200_1000)
|
||||||
if counter, ok := c.latencyBuckets.Load(bucket); ok {
|
latencyDistribution["gt1s"] = atomic.LoadInt64(&c.latencyBuckets.gt1s)
|
||||||
if counter != nil {
|
|
||||||
value := atomic.LoadInt64(counter.(*int64))
|
|
||||||
latencyDistribution[bucket] = value
|
|
||||||
} else {
|
|
||||||
latencyDistribution[bucket] = 0
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
latencyDistribution[bucket] = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取最近请求记录(使用读锁)
|
// 获取最近请求记录(使用读锁)
|
||||||
recentRequests := c.recentRequests.GetAll()
|
recentRequests := c.recentRequests.GetAll()
|
||||||
@ -306,14 +449,13 @@ func (c *Collector) validateLoadedData() error {
|
|||||||
|
|
||||||
// 验证状态码统计
|
// 验证状态码统计
|
||||||
var statusCodeTotal int64
|
var statusCodeTotal int64
|
||||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
statusStats := c.statusCodeStats.GetStats()
|
||||||
count := atomic.LoadInt64(value.(*int64))
|
for _, count := range statusStats {
|
||||||
if count < 0 {
|
if count < 0 {
|
||||||
return false
|
return fmt.Errorf("invalid negative status code count")
|
||||||
}
|
}
|
||||||
statusCodeTotal += count
|
statusCodeTotal += count
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -418,21 +560,10 @@ func (c *Collector) startCleanupTask() {
|
|||||||
oneDayAgo := time.Now().Add(-24 * time.Hour).Unix()
|
oneDayAgo := time.Now().Add(-24 * time.Hour).Unix()
|
||||||
|
|
||||||
// 清理超过24小时的引用来源统计
|
// 清理超过24小时的引用来源统计
|
||||||
var keysToDelete []interface{}
|
deletedCount := c.refererStats.Cleanup(oneDayAgo)
|
||||||
c.refererStats.Range(func(key, value interface{}) bool {
|
|
||||||
metrics := value.(*models.PathMetrics)
|
|
||||||
if metrics.LastAccessTime.Load() < oneDayAgo {
|
|
||||||
keysToDelete = append(keysToDelete, key)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, key := range keysToDelete {
|
if deletedCount > 0 {
|
||||||
c.refererStats.Delete(key)
|
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", deletedCount)
|
||||||
}
|
|
||||||
|
|
||||||
if len(keysToDelete) > 0 {
|
|
||||||
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", len(keysToDelete))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 强制GC
|
// 强制GC
|
||||||
@ -469,14 +600,7 @@ func (c *Collector) startAsyncMetricsUpdater() {
|
|||||||
func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
||||||
for _, m := range batch {
|
for _, m := range batch {
|
||||||
// 更新状态码统计
|
// 更新状态码统计
|
||||||
statusKey := fmt.Sprintf("%d", m.Status)
|
c.statusCodeStats.Increment(m.Status)
|
||||||
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
|
|
||||||
atomic.AddInt64(counter.(*int64), 1)
|
|
||||||
} else {
|
|
||||||
counter := new(int64)
|
|
||||||
*counter = 1
|
|
||||||
c.statusCodeStats.Store(statusKey, counter)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新总字节数和带宽统计
|
// 更新总字节数和带宽统计
|
||||||
atomic.AddInt64(&c.totalBytes, m.Bytes)
|
atomic.AddInt64(&c.totalBytes, m.Bytes)
|
||||||
@ -506,26 +630,17 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
|||||||
|
|
||||||
// 更新延迟分布
|
// 更新延迟分布
|
||||||
latencyMs := m.Latency.Milliseconds()
|
latencyMs := m.Latency.Milliseconds()
|
||||||
var bucketKey string
|
|
||||||
switch {
|
switch {
|
||||||
case latencyMs < 10:
|
case latencyMs < 10:
|
||||||
bucketKey = "lt10ms"
|
atomic.AddInt64(&c.latencyBuckets.lt10ms, 1)
|
||||||
case latencyMs < 50:
|
case latencyMs < 50:
|
||||||
bucketKey = "10-50ms"
|
atomic.AddInt64(&c.latencyBuckets.ms10_50, 1)
|
||||||
case latencyMs < 200:
|
case latencyMs < 200:
|
||||||
bucketKey = "50-200ms"
|
atomic.AddInt64(&c.latencyBuckets.ms50_200, 1)
|
||||||
case latencyMs < 1000:
|
case latencyMs < 1000:
|
||||||
bucketKey = "200-1000ms"
|
atomic.AddInt64(&c.latencyBuckets.ms200_1000, 1)
|
||||||
default:
|
default:
|
||||||
bucketKey = "gt1s"
|
atomic.AddInt64(&c.latencyBuckets.gt1s, 1)
|
||||||
}
|
|
||||||
|
|
||||||
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
|
|
||||||
atomic.AddInt64(counter.(*int64), 1)
|
|
||||||
} else {
|
|
||||||
counter := new(int64)
|
|
||||||
*counter = 1
|
|
||||||
c.latencyBuckets.Store(bucketKey, counter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 记录引用来源
|
// 记录引用来源
|
||||||
@ -534,7 +649,7 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
|||||||
if referer != "" {
|
if referer != "" {
|
||||||
var refererMetrics *models.PathMetrics
|
var refererMetrics *models.PathMetrics
|
||||||
if existingMetrics, ok := c.refererStats.Load(referer); ok {
|
if existingMetrics, ok := c.refererStats.Load(referer); ok {
|
||||||
refererMetrics = existingMetrics.(*models.PathMetrics)
|
refererMetrics = existingMetrics
|
||||||
} else {
|
} else {
|
||||||
refererMetrics = &models.PathMetrics{Path: referer}
|
refererMetrics = &models.PathMetrics{Path: referer}
|
||||||
c.refererStats.Store(referer, refererMetrics)
|
c.refererStats.Store(referer, refererMetrics)
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"proxy-go/internal/utils"
|
"proxy-go/internal/utils"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -140,22 +141,35 @@ func (ms *MetricsStorage) LoadMetrics() error {
|
|||||||
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
|
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
|
||||||
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
|
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for statusCode, count := range statusCodeStats {
|
// 由于新的 StatusCodeStats 结构,我们需要手动设置值
|
||||||
countValue, ok := count.(float64)
|
loadedCount := 0
|
||||||
if !ok {
|
for codeStr, countValue := range statusCodeStats {
|
||||||
continue
|
// 解析状态码
|
||||||
}
|
if code, err := strconv.Atoi(codeStr); err == nil {
|
||||||
|
// 解析计数值
|
||||||
|
var count int64
|
||||||
|
switch v := countValue.(type) {
|
||||||
|
case float64:
|
||||||
|
count = int64(v)
|
||||||
|
case int64:
|
||||||
|
count = v
|
||||||
|
case int:
|
||||||
|
count = int64(v)
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// 创建或更新状态码统计
|
// 手动设置到新的 StatusCodeStats 结构中
|
||||||
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
|
ms.collector.statusCodeStats.mu.Lock()
|
||||||
atomic.StoreInt64(counter.(*int64), int64(countValue))
|
if _, exists := ms.collector.statusCodeStats.stats[code]; !exists {
|
||||||
} else {
|
ms.collector.statusCodeStats.stats[code] = new(int64)
|
||||||
counter := new(int64)
|
}
|
||||||
*counter = int64(countValue)
|
atomic.StoreInt64(ms.collector.statusCodeStats.stats[code], count)
|
||||||
ms.collector.statusCodeStats.Store(statusCode, counter)
|
ms.collector.statusCodeStats.mu.Unlock()
|
||||||
|
loadedCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats))
|
log.Printf("[MetricsStorage] 成功加载了 %d 条状态码统计", loadedCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,14 +182,25 @@ func (ms *MetricsStorage) LoadMetrics() error {
|
|||||||
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
|
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
|
||||||
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
|
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
|
||||||
} else {
|
} else {
|
||||||
|
// 由于新的 LatencyBuckets 结构,我们需要手动设置值
|
||||||
for bucket, count := range distribution {
|
for bucket, count := range distribution {
|
||||||
countValue, ok := count.(float64)
|
countValue, ok := count.(float64)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
|
// 根据桶名称设置对应的值
|
||||||
atomic.StoreInt64(counter.(*int64), int64(countValue))
|
switch bucket {
|
||||||
|
case "lt10ms":
|
||||||
|
atomic.StoreInt64(&ms.collector.latencyBuckets.lt10ms, int64(countValue))
|
||||||
|
case "10-50ms":
|
||||||
|
atomic.StoreInt64(&ms.collector.latencyBuckets.ms10_50, int64(countValue))
|
||||||
|
case "50-200ms":
|
||||||
|
atomic.StoreInt64(&ms.collector.latencyBuckets.ms50_200, int64(countValue))
|
||||||
|
case "200-1000ms":
|
||||||
|
atomic.StoreInt64(&ms.collector.latencyBuckets.ms200_1000, int64(countValue))
|
||||||
|
case "gt1s":
|
||||||
|
atomic.StoreInt64(&ms.collector.latencyBuckets.gt1s, int64(countValue))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("[MetricsStorage] 加载了延迟分布数据")
|
log.Printf("[MetricsStorage] 加载了延迟分布数据")
|
||||||
|
@ -67,11 +67,38 @@ func (rs *RuleService) SelectBestRule(client *http.Client, pathConfig config.Pat
|
|||||||
return nil, false, false
|
return nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取文件大小
|
// 检查是否需要获取文件大小
|
||||||
|
// 如果所有匹配的规则都没有设置大小阈值(都是默认值),则跳过文件大小检查
|
||||||
|
needSizeCheck := false
|
||||||
|
for _, rule := range domainMatchingRules {
|
||||||
|
if rule.SizeThreshold > 0 || rule.MaxSize < (1<<63-1) {
|
||||||
|
needSizeCheck = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !needSizeCheck {
|
||||||
|
// 不需要检查文件大小,直接使用第一个匹配的规则
|
||||||
|
for _, rule := range domainMatchingRules {
|
||||||
|
if utils.IsTargetAccessible(client, rule.Target+path) {
|
||||||
|
log.Printf("[SelectRule] %s -> 选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
|
||||||
|
return rule, true, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取文件大小(使用同步检查)
|
||||||
contentLength, err := utils.GetFileSize(client, pathConfig.DefaultTarget+path)
|
contentLength, err := utils.GetFileSize(client, pathConfig.DefaultTarget+path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,严格模式下不使用扩展名规则", path, err)
|
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,使用宽松模式回退", path, err)
|
||||||
// 严格模式:如果无法获取文件大小,不使用扩展名规则
|
// 宽松模式:如果无法获取文件大小,尝试使用第一个匹配的规则
|
||||||
|
for _, rule := range domainMatchingRules {
|
||||||
|
if utils.IsTargetAccessible(client, rule.Target+path) {
|
||||||
|
log.Printf("[SelectRule] %s -> 使用宽松模式选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
|
||||||
|
return rule, true, true
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil, false, false
|
return nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,76 +10,207 @@ import (
|
|||||||
neturl "net/url"
|
neturl "net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"proxy-go/internal/config"
|
"proxy-go/internal/config"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 文件大小缓存项
|
// Goroutine 池相关结构
|
||||||
|
type GoroutinePool struct {
|
||||||
|
maxWorkers int
|
||||||
|
taskQueue chan func()
|
||||||
|
wg sync.WaitGroup
|
||||||
|
once sync.Once
|
||||||
|
stopped int32
|
||||||
|
}
|
||||||
|
|
||||||
|
// 全局 goroutine 池
|
||||||
|
var (
|
||||||
|
globalPool *GoroutinePool
|
||||||
|
poolOnce sync.Once
|
||||||
|
defaultWorkers = runtime.NumCPU() * 4 // 默认工作协程数量
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetGoroutinePool 获取全局 goroutine 池
|
||||||
|
func GetGoroutinePool() *GoroutinePool {
|
||||||
|
poolOnce.Do(func() {
|
||||||
|
globalPool = NewGoroutinePool(defaultWorkers)
|
||||||
|
})
|
||||||
|
return globalPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGoroutinePool 创建新的 goroutine 池
|
||||||
|
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
|
||||||
|
if maxWorkers <= 0 {
|
||||||
|
maxWorkers = runtime.NumCPU() * 2
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := &GoroutinePool{
|
||||||
|
maxWorkers: maxWorkers,
|
||||||
|
taskQueue: make(chan func(), maxWorkers*10), // 缓冲区为工作协程数的10倍
|
||||||
|
}
|
||||||
|
|
||||||
|
// 启动工作协程
|
||||||
|
for i := 0; i < maxWorkers; i++ {
|
||||||
|
pool.wg.Add(1)
|
||||||
|
go pool.worker()
|
||||||
|
}
|
||||||
|
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker 工作协程
|
||||||
|
func (p *GoroutinePool) worker() {
|
||||||
|
defer p.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case task, ok := <-p.taskQueue:
|
||||||
|
if !ok {
|
||||||
|
return // 通道关闭,退出
|
||||||
|
}
|
||||||
|
|
||||||
|
// 执行任务,捕获 panic
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
fmt.Printf("[GoroutinePool] Worker panic: %v\n", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
task()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit 提交任务到池中
|
||||||
|
func (p *GoroutinePool) Submit(task func()) error {
|
||||||
|
if atomic.LoadInt32(&p.stopped) == 1 {
|
||||||
|
return fmt.Errorf("goroutine pool is stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.taskQueue <- task:
|
||||||
|
return nil
|
||||||
|
case <-time.After(100 * time.Millisecond): // 100ms 超时
|
||||||
|
return fmt.Errorf("goroutine pool is busy")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubmitWithTimeout 提交任务到池中,带超时
|
||||||
|
func (p *GoroutinePool) SubmitWithTimeout(task func(), timeout time.Duration) error {
|
||||||
|
if atomic.LoadInt32(&p.stopped) == 1 {
|
||||||
|
return fmt.Errorf("goroutine pool is stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.taskQueue <- task:
|
||||||
|
return nil
|
||||||
|
case <-time.After(timeout):
|
||||||
|
return fmt.Errorf("goroutine pool submit timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop 停止 goroutine 池
|
||||||
|
func (p *GoroutinePool) Stop() {
|
||||||
|
p.once.Do(func() {
|
||||||
|
atomic.StoreInt32(&p.stopped, 1)
|
||||||
|
close(p.taskQueue)
|
||||||
|
p.wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size 返回池中工作协程数量
|
||||||
|
func (p *GoroutinePool) Size() int {
|
||||||
|
return p.maxWorkers
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueSize 返回当前任务队列大小
|
||||||
|
func (p *GoroutinePool) QueueSize() int {
|
||||||
|
return len(p.taskQueue)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 异步执行函数的包装器
|
||||||
|
func GoSafe(fn func()) {
|
||||||
|
pool := GetGoroutinePool()
|
||||||
|
err := pool.Submit(fn)
|
||||||
|
if err != nil {
|
||||||
|
// 如果池满了,直接启动 goroutine(降级处理)
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
fmt.Printf("[GoSafe] Panic: %v\n", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
fn()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 带超时的异步执行
|
||||||
|
func GoSafeWithTimeout(fn func(), timeout time.Duration) error {
|
||||||
|
pool := GetGoroutinePool()
|
||||||
|
return pool.SubmitWithTimeout(fn, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 文件大小缓存相关
|
||||||
type fileSizeCache struct {
|
type fileSizeCache struct {
|
||||||
size int64
|
size int64
|
||||||
timestamp time.Time
|
timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// 可访问性缓存项
|
|
||||||
type accessibilityCache struct {
|
type accessibilityCache struct {
|
||||||
accessible bool
|
accessible bool
|
||||||
timestamp time.Time
|
timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 全局缓存
|
||||||
var (
|
var (
|
||||||
// 文件大小缓存,过期时间5分钟
|
sizeCache sync.Map
|
||||||
sizeCache sync.Map
|
accessCache sync.Map
|
||||||
// 可访问性缓存,过期时间30秒
|
cacheTTL = 5 * time.Minute
|
||||||
accessCache sync.Map
|
accessTTL = 2 * time.Minute
|
||||||
cacheTTL = 5 * time.Minute
|
|
||||||
accessTTL = 30 * time.Second
|
|
||||||
maxCacheSize = 10000 // 最大缓存条目数
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// 清理过期缓存
|
// 初始化函数
|
||||||
func init() {
|
func init() {
|
||||||
go func() {
|
// 启动定期清理缓存的协程
|
||||||
ticker := time.NewTicker(time.Minute)
|
GoSafe(func() {
|
||||||
for range ticker.C {
|
ticker := time.NewTicker(10 * time.Minute)
|
||||||
now := time.Now()
|
defer ticker.Stop()
|
||||||
// 清理文件大小缓存
|
|
||||||
var items []struct {
|
|
||||||
key interface{}
|
|
||||||
timestamp time.Time
|
|
||||||
}
|
|
||||||
sizeCache.Range(func(key, value interface{}) bool {
|
|
||||||
cache := value.(fileSizeCache)
|
|
||||||
if now.Sub(cache.timestamp) > cacheTTL {
|
|
||||||
sizeCache.Delete(key)
|
|
||||||
} else {
|
|
||||||
items = append(items, struct {
|
|
||||||
key interface{}
|
|
||||||
timestamp time.Time
|
|
||||||
}{key, cache.timestamp})
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if len(items) > maxCacheSize {
|
|
||||||
sort.Slice(items, func(i, j int) bool {
|
|
||||||
return items[i].timestamp.Before(items[j].timestamp)
|
|
||||||
})
|
|
||||||
for i := 0; i < len(items)/2; i++ {
|
|
||||||
sizeCache.Delete(items[i].key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 清理可访问性缓存
|
for range ticker.C {
|
||||||
accessCache.Range(func(key, value interface{}) bool {
|
cleanExpiredCache()
|
||||||
cache := value.(accessibilityCache)
|
|
||||||
if now.Sub(cache.timestamp) > accessTTL {
|
|
||||||
accessCache.Delete(key)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}()
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 清理过期缓存
|
||||||
|
func cleanExpiredCache() {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// 清理文件大小缓存
|
||||||
|
sizeCache.Range(func(key, value interface{}) bool {
|
||||||
|
if cache, ok := value.(fileSizeCache); ok {
|
||||||
|
if now.Sub(cache.timestamp) > cacheTTL {
|
||||||
|
sizeCache.Delete(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// 清理可访问性缓存
|
||||||
|
accessCache.Range(func(key, value interface{}) bool {
|
||||||
|
if cache, ok := value.(accessibilityCache); ok {
|
||||||
|
if now.Sub(cache.timestamp) > accessTTL {
|
||||||
|
accessCache.Delete(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenerateRequestID 生成唯一的请求ID
|
// GenerateRequestID 生成唯一的请求ID
|
||||||
@ -134,7 +265,7 @@ func IsImageRequest(path string) bool {
|
|||||||
return imageExts[ext]
|
return imageExts[ext]
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFileSize 发送HEAD请求获取文件大小
|
// GetFileSize 发送HEAD请求获取文件大小(保持向后兼容)
|
||||||
func GetFileSize(client *http.Client, url string) (int64, error) {
|
func GetFileSize(client *http.Client, url string) (int64, error) {
|
||||||
// 先查缓存
|
// 先查缓存
|
||||||
if cache, ok := sizeCache.Load(url); ok {
|
if cache, ok := sizeCache.Load(url); ok {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user