Compare commits

..

2 Commits

7 changed files with 804 additions and 168 deletions

View File

@ -19,6 +19,174 @@ import (
"time"
)
// 内存池用于复用缓冲区
var (
bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 32*1024) // 32KB 缓冲区
},
}
// 大缓冲区池(用于大文件)
largeBufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024*1024) // 1MB 缓冲区
},
}
)
// GetBuffer 从池中获取缓冲区
func GetBuffer(size int) []byte {
if size <= 32*1024 {
buf := bufferPool.Get().([]byte)
if cap(buf) >= size {
return buf[:size]
}
bufferPool.Put(buf)
} else if size <= 1024*1024 {
buf := largeBufPool.Get().([]byte)
if cap(buf) >= size {
return buf[:size]
}
largeBufPool.Put(buf)
}
// 如果池中的缓冲区不够大,创建新的
return make([]byte, size)
}
// PutBuffer 将缓冲区放回池中
func PutBuffer(buf []byte) {
if cap(buf) == 32*1024 {
bufferPool.Put(buf)
} else if cap(buf) == 1024*1024 {
largeBufPool.Put(buf)
}
// 其他大小的缓冲区让GC处理
}
// LRU 缓存节点
type LRUNode struct {
key CacheKey
value *CacheItem
prev *LRUNode
next *LRUNode
}
// LRU 缓存实现
type LRUCache struct {
capacity int
size int
head *LRUNode
tail *LRUNode
cache map[CacheKey]*LRUNode
mu sync.RWMutex
}
// NewLRUCache 创建LRU缓存
func NewLRUCache(capacity int) *LRUCache {
lru := &LRUCache{
capacity: capacity,
cache: make(map[CacheKey]*LRUNode),
head: &LRUNode{},
tail: &LRUNode{},
}
lru.head.next = lru.tail
lru.tail.prev = lru.head
return lru
}
// Get 从LRU缓存中获取
func (lru *LRUCache) Get(key CacheKey) (*CacheItem, bool) {
lru.mu.Lock()
defer lru.mu.Unlock()
if node, exists := lru.cache[key]; exists {
lru.moveToHead(node)
return node.value, true
}
return nil, false
}
// Put 向LRU缓存中添加
func (lru *LRUCache) Put(key CacheKey, value *CacheItem) {
lru.mu.Lock()
defer lru.mu.Unlock()
if node, exists := lru.cache[key]; exists {
node.value = value
lru.moveToHead(node)
} else {
newNode := &LRUNode{key: key, value: value}
lru.cache[key] = newNode
lru.addToHead(newNode)
lru.size++
if lru.size > lru.capacity {
tail := lru.removeTail()
delete(lru.cache, tail.key)
lru.size--
}
}
}
// Delete 从LRU缓存中删除
func (lru *LRUCache) Delete(key CacheKey) {
lru.mu.Lock()
defer lru.mu.Unlock()
if node, exists := lru.cache[key]; exists {
lru.removeNode(node)
delete(lru.cache, key)
lru.size--
}
}
// moveToHead 将节点移到头部
func (lru *LRUCache) moveToHead(node *LRUNode) {
lru.removeNode(node)
lru.addToHead(node)
}
// addToHead 添加到头部
func (lru *LRUCache) addToHead(node *LRUNode) {
node.prev = lru.head
node.next = lru.head.next
lru.head.next.prev = node
lru.head.next = node
}
// removeNode 移除节点
func (lru *LRUCache) removeNode(node *LRUNode) {
node.prev.next = node.next
node.next.prev = node.prev
}
// removeTail 移除尾部节点
func (lru *LRUCache) removeTail() *LRUNode {
lastNode := lru.tail.prev
lru.removeNode(lastNode)
return lastNode
}
// Range 遍历所有缓存项
func (lru *LRUCache) Range(fn func(key CacheKey, value *CacheItem) bool) {
lru.mu.RLock()
defer lru.mu.RUnlock()
for key, node := range lru.cache {
if !fn(key, node.value) {
break
}
}
}
// Size 返回缓存大小
func (lru *LRUCache) Size() int {
lru.mu.RLock()
defer lru.mu.RUnlock()
return lru.size
}
// CacheKey 用于标识缓存项的唯一键
type CacheKey struct {
URL string
@ -55,6 +223,7 @@ type CacheItem struct {
Hash string
CreatedAt time.Time
AccessCount int64
Priority int // 缓存优先级
}
// CacheStats 缓存统计信息
@ -71,7 +240,8 @@ type CacheStats struct {
// CacheManager 缓存管理器
type CacheManager struct {
cacheDir string
items sync.Map
items sync.Map // 保持原有的 sync.Map 用于文件缓存
lruCache *LRUCache // 新增LRU缓存用于热点数据
maxAge time.Duration
cleanupTick time.Duration
maxCacheSize int64
@ -84,6 +254,9 @@ type CacheManager struct {
// ExtensionMatcher缓存
extensionMatcherCache *ExtensionMatcherCache
// 缓存预热
prewarming atomic.Bool
}
// NewCacheManager 创建新的缓存管理器
@ -94,6 +267,7 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
cm := &CacheManager{
cacheDir: cacheDir,
lruCache: NewLRUCache(10000), // 10000个热点缓存项
maxAge: 30 * time.Minute,
cleanupTick: 5 * time.Minute,
maxCacheSize: 10 * 1024 * 1024 * 1024, // 10GB
@ -118,6 +292,9 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
// 启动清理协程
cm.startCleanup()
// 启动缓存预热
go cm.prewarmCache()
return cm, nil
}
@ -147,7 +324,22 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
return nil, false, false
}
// 检查缓存项是否存在
// 检查LRU缓存
if item, found := cm.lruCache.Get(key); found {
// 检查LRU缓存项是否过期
if time.Since(item.LastAccess) > cm.maxAge {
cm.lruCache.Delete(key)
cm.missCount.Add(1)
return nil, false, false
}
// 更新访问时间
item.LastAccess = time.Now()
atomic.AddInt64(&item.AccessCount, 1)
cm.hitCount.Add(1)
return item, true, false
}
// 检查文件缓存
value, ok := cm.items.Load(key)
if !ok {
cm.missCount.Add(1)
@ -177,6 +369,9 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
cm.hitCount.Add(1)
cm.bytesSaved.Add(item.Size)
// 将缓存项添加到LRU缓存
cm.lruCache.Put(key, item)
return item, true, false
}
@ -230,7 +425,11 @@ func (cm *CacheManager) Put(key CacheKey, resp *http.Response, body []byte) (*Ca
}
cm.items.Store(key, item)
log.Printf("[Cache] NEW %s %s (%s) from %s", resp.Request.Method, key.URL, formatBytes(item.Size), utils.GetRequestSource(resp.Request))
method := "GET"
if resp.Request != nil {
method = resp.Request.Method
}
log.Printf("[Cache] NEW %s %s (%s) from %s", method, key.URL, formatBytes(item.Size), utils.GetRequestSource(resp.Request))
return item, nil
}
@ -649,3 +848,79 @@ func (cm *CacheManager) Stop() {
cm.extensionMatcherCache.Stop()
}
}
// prewarmCache 启动缓存预热
func (cm *CacheManager) prewarmCache() {
// 模拟一些请求来预热缓存
// 实际应用中,这里会从数据库或外部服务加载热点数据
// 例如从数据库加载最近访问频率高的URL
// 或者从外部API获取热门资源
// 示例加载最近访问的URL
// 假设我们有一个数据库或文件记录最近访问的URL和它们的哈希
// 这里我们简单地加载一些示例URL
exampleUrls := []string{
"https://example.com/api/data",
"https://api.github.com/repos/golang/go/releases",
"https://api.openai.com/v1/models",
"https://api.openai.com/v1/chat/completions",
}
for _, url := range exampleUrls {
// 生成一个随机的Accept Headers和UserAgent
acceptHeaders := "application/json"
userAgent := "Mozilla/5.0 (compatible; ProxyGo/1.0)"
// 模拟一个HTTP请求
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("[Cache] ERR Failed to create request for prewarming: %v", err)
continue
}
req.Header.Set("Accept", acceptHeaders)
req.Header.Set("User-Agent", userAgent)
// 生成缓存键
cacheKey := cm.GenerateCacheKey(req)
// 尝试从LRU缓存获取
if _, found := cm.lruCache.Get(cacheKey); found {
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
continue
}
// 尝试从文件缓存获取
if _, ok := cm.items.Load(cacheKey); ok {
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
continue
}
// 模拟一个HTTP响应
resp := &http.Response{
StatusCode: http.StatusOK,
Header: make(http.Header),
Request: req,
}
resp.Header.Set("Content-Type", "application/json")
resp.Header.Set("Content-Encoding", "gzip")
resp.Header.Set("X-Cache", "HIT") // 模拟缓存命中
// 模拟一个HTTP请求体
body := []byte(`{"message": "Hello from prewarmed cache"}`)
// 添加到LRU缓存
contentHash := sha256.Sum256(body)
cm.lruCache.Put(cacheKey, &CacheItem{
FilePath: "", // 文件缓存,这里不需要
ContentType: "application/json",
ContentEncoding: "gzip",
Size: int64(len(body)),
LastAccess: time.Now(),
Hash: hex.EncodeToString(contentHash[:]),
CreatedAt: time.Now(),
AccessCount: 1,
})
log.Printf("[Cache] PREWARM %s", cacheKey.URL)
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"proxy-go/internal/cache"
@ -13,6 +14,15 @@ import (
"time"
"github.com/woodchen-ink/go-web-utils/iputil"
"golang.org/x/net/http2"
)
// 镜像代理专用配置常量
const (
mirrorMaxIdleConns = 2000 // 镜像代理全局最大空闲连接
mirrorMaxIdleConnsPerHost = 200 // 镜像代理每个主机最大空闲连接
mirrorMaxConnsPerHost = 500 // 镜像代理每个主机最大连接数
mirrorTimeout = 60 * time.Second // 镜像代理超时时间
)
type MirrorProxyHandler struct {
@ -21,10 +31,38 @@ type MirrorProxyHandler struct {
}
func NewMirrorProxyHandler() *MirrorProxyHandler {
// 创建优化的拨号器
dialer := &net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}
// 创建优化的传输层
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DialContext: dialer.DialContext,
MaxIdleConns: mirrorMaxIdleConns,
MaxIdleConnsPerHost: mirrorMaxIdleConnsPerHost,
MaxConnsPerHost: mirrorMaxConnsPerHost,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
ForceAttemptHTTP2: true,
WriteBufferSize: 128 * 1024,
ReadBufferSize: 128 * 1024,
ResponseHeaderTimeout: 30 * time.Second,
MaxResponseHeaderBytes: 64 * 1024,
}
// 配置 HTTP/2
http2Transport, err := http2.ConfigureTransports(transport)
if err == nil && http2Transport != nil {
http2Transport.ReadIdleTimeout = 30 * time.Second
http2Transport.PingTimeout = 10 * time.Second
http2Transport.AllowHTTP = false
http2Transport.MaxReadFrameSize = 32 * 1024
http2Transport.StrictMaxConcurrentStreams = true
}
// 初始化缓存管理器
@ -36,7 +74,13 @@ func NewMirrorProxyHandler() *MirrorProxyHandler {
return &MirrorProxyHandler{
client: &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
Timeout: mirrorTimeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return fmt.Errorf("stopped after 10 redirects")
}
return nil
},
},
Cache: cacheManager,
}

View File

@ -43,6 +43,21 @@ var hopHeadersBase = map[string]bool{
"Upgrade": true,
}
// 优化后的连接池配置常量
const (
// 连接池配置
maxIdleConns = 5000 // 全局最大空闲连接数(增加)
maxIdleConnsPerHost = 500 // 每个主机最大空闲连接数(增加)
maxConnsPerHost = 1000 // 每个主机最大连接数(增加)
// 缓冲区大小优化
writeBufferSize = 256 * 1024 // 写缓冲区(增加)
readBufferSize = 256 * 1024 // 读缓冲区(增加)
// HTTP/2 配置
maxReadFrameSize = 64 * 1024 // HTTP/2 最大读帧大小(增加)
)
// ErrorHandler 定义错误处理函数类型
type ErrorHandler func(w http.ResponseWriter, r *http.Request, err error)
@ -126,29 +141,30 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler {
transport := &http.Transport{
DialContext: dialer.DialContext,
MaxIdleConns: 2000,
MaxIdleConnsPerHost: 200,
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
IdleConnTimeout: idleConnTimeout,
TLSHandshakeTimeout: tlsHandshakeTimeout,
ExpectContinueTimeout: 1 * time.Second,
MaxConnsPerHost: 400,
MaxConnsPerHost: maxConnsPerHost,
DisableKeepAlives: false,
DisableCompression: false,
ForceAttemptHTTP2: true,
WriteBufferSize: 128 * 1024,
ReadBufferSize: 128 * 1024,
WriteBufferSize: writeBufferSize,
ReadBufferSize: readBufferSize,
ResponseHeaderTimeout: backendServTimeout,
MaxResponseHeaderBytes: 64 * 1024,
MaxResponseHeaderBytes: 128 * 1024, // 增加响应头缓冲区
}
// 设置HTTP/2传输配置
http2Transport, err := http2.ConfigureTransports(transport)
if err == nil && http2Transport != nil {
http2Transport.ReadIdleTimeout = 10 * time.Second
http2Transport.PingTimeout = 5 * time.Second
http2Transport.ReadIdleTimeout = 30 * time.Second // 增加读空闲超时
http2Transport.PingTimeout = 10 * time.Second // 增加ping超时
http2Transport.AllowHTTP = false
http2Transport.MaxReadFrameSize = 32 * 1024
http2Transport.MaxReadFrameSize = maxReadFrameSize // 使用常量
http2Transport.StrictMaxConcurrentStreams = true
}
// 初始化缓存管理器

View File

@ -15,6 +15,157 @@ import (
"time"
)
// 优化的状态码统计结构
type StatusCodeStats struct {
mu sync.RWMutex
stats map[int]*int64 // 预分配常见状态码
}
// 优化的延迟分布统计
type LatencyBuckets struct {
lt10ms int64
ms10_50 int64
ms50_200 int64
ms200_1000 int64
gt1s int64
}
// 优化的引用来源统计(使用分片减少锁竞争)
type RefererStats struct {
shards []*RefererShard
mask uint64
}
type RefererShard struct {
mu sync.RWMutex
data map[string]*models.PathMetrics
}
const (
refererShardCount = 32 // 分片数量必须是2的幂
)
func NewRefererStats() *RefererStats {
rs := &RefererStats{
shards: make([]*RefererShard, refererShardCount),
mask: refererShardCount - 1,
}
for i := 0; i < refererShardCount; i++ {
rs.shards[i] = &RefererShard{
data: make(map[string]*models.PathMetrics),
}
}
return rs
}
func (rs *RefererStats) hash(key string) uint64 {
// 简单的字符串哈希函数
var h uint64 = 14695981039346656037
for _, b := range []byte(key) {
h ^= uint64(b)
h *= 1099511628211
}
return h
}
func (rs *RefererStats) getShard(key string) *RefererShard {
return rs.shards[rs.hash(key)&rs.mask]
}
func (rs *RefererStats) Load(key string) (*models.PathMetrics, bool) {
shard := rs.getShard(key)
shard.mu.RLock()
defer shard.mu.RUnlock()
val, ok := shard.data[key]
return val, ok
}
func (rs *RefererStats) Store(key string, value *models.PathMetrics) {
shard := rs.getShard(key)
shard.mu.Lock()
defer shard.mu.Unlock()
shard.data[key] = value
}
func (rs *RefererStats) Delete(key string) {
shard := rs.getShard(key)
shard.mu.Lock()
defer shard.mu.Unlock()
delete(shard.data, key)
}
func (rs *RefererStats) Range(f func(key string, value *models.PathMetrics) bool) {
for _, shard := range rs.shards {
shard.mu.RLock()
for k, v := range shard.data {
if !f(k, v) {
shard.mu.RUnlock()
return
}
}
shard.mu.RUnlock()
}
}
func (rs *RefererStats) Cleanup(cutoff int64) int {
deleted := 0
for _, shard := range rs.shards {
shard.mu.Lock()
for k, v := range shard.data {
if v.LastAccessTime.Load() < cutoff {
delete(shard.data, k)
deleted++
}
}
shard.mu.Unlock()
}
return deleted
}
func NewStatusCodeStats() *StatusCodeStats {
s := &StatusCodeStats{
stats: make(map[int]*int64),
}
// 预分配常见状态码
commonCodes := []int{200, 201, 204, 301, 302, 304, 400, 401, 403, 404, 429, 500, 502, 503, 504}
for _, code := range commonCodes {
counter := new(int64)
s.stats[code] = counter
}
return s
}
func (s *StatusCodeStats) Increment(code int) {
s.mu.RLock()
if counter, exists := s.stats[code]; exists {
s.mu.RUnlock()
atomic.AddInt64(counter, 1)
return
}
s.mu.RUnlock()
// 需要创建新的计数器
s.mu.Lock()
defer s.mu.Unlock()
if counter, exists := s.stats[code]; exists {
atomic.AddInt64(counter, 1)
} else {
counter := new(int64)
*counter = 1
s.stats[code] = counter
}
}
func (s *StatusCodeStats) GetStats() map[string]int64 {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]int64)
for code, counter := range s.stats {
result[fmt.Sprintf("%d", code)] = atomic.LoadInt64(counter)
}
return result
}
// Collector 指标收集器
type Collector struct {
startTime time.Time
@ -23,9 +174,9 @@ type Collector struct {
latencySum int64
maxLatency int64 // 最大响应时间
minLatency int64 // 最小响应时间
statusCodeStats sync.Map
latencyBuckets sync.Map // 响应时间分布
refererStats sync.Map // 引用来源统计
statusCodeStats *StatusCodeStats
latencyBuckets *LatencyBuckets // 使用结构体替代 sync.Map
refererStats *RefererStats // 使用分片哈希表
bandwidthStats struct {
sync.RWMutex
window time.Duration
@ -57,10 +208,13 @@ var (
func InitCollector(cfg *config.Config) error {
once.Do(func() {
instance = &Collector{
startTime: time.Now(),
recentRequests: models.NewRequestQueue(100),
config: cfg,
minLatency: math.MaxInt64,
startTime: time.Now(),
recentRequests: models.NewRequestQueue(100),
config: cfg,
minLatency: math.MaxInt64,
statusCodeStats: NewStatusCodeStats(),
latencyBuckets: &LatencyBuckets{},
refererStats: NewRefererStats(),
}
// 初始化带宽统计
@ -73,7 +227,19 @@ func InitCollector(cfg *config.Config) error {
for _, bucket := range buckets {
counter := new(int64)
*counter = 0
instance.latencyBuckets.Store(bucket, counter)
// 根据 bucket 名称设置对应的桶计数器
switch bucket {
case "lt10ms":
instance.latencyBuckets.lt10ms = atomic.LoadInt64(counter)
case "10-50ms":
instance.latencyBuckets.ms10_50 = atomic.LoadInt64(counter)
case "50-200ms":
instance.latencyBuckets.ms50_200 = atomic.LoadInt64(counter)
case "200-1000ms":
instance.latencyBuckets.ms200_1000 = atomic.LoadInt64(counter)
case "gt1s":
instance.latencyBuckets.gt1s = atomic.LoadInt64(counter)
}
}
// 初始化异步指标收集通道
@ -152,14 +318,10 @@ func (c *Collector) GetStats() map[string]interface{} {
// 计算总请求数和平均延迟
var totalRequests int64
c.statusCodeStats.Range(func(key, value interface{}) bool {
if counter, ok := value.(*int64); ok {
totalRequests += atomic.LoadInt64(counter)
} else {
totalRequests += value.(int64)
}
return true
})
statusCodeStats := c.statusCodeStats.GetStats()
for _, count := range statusCodeStats {
totalRequests += count
}
avgLatency := float64(0)
if totalRequests > 0 {
@ -169,22 +331,13 @@ func (c *Collector) GetStats() map[string]interface{} {
// 计算总体平均每秒请求数
requestsPerSecond := float64(totalRequests) / totalRuntime.Seconds()
// 收集状态码统计
statusCodeStats := make(map[string]int64)
c.statusCodeStats.Range(func(key, value interface{}) bool {
if counter, ok := value.(*int64); ok {
statusCodeStats[key.(string)] = atomic.LoadInt64(counter)
} else {
statusCodeStats[key.(string)] = value.(int64)
}
return true
})
// 收集状态码统计(已经在上面获取了)
// 收集引用来源统计
var refererMetrics []*models.PathMetrics
refererCount := 0
c.refererStats.Range(func(key, value interface{}) bool {
stats := value.(*models.PathMetrics)
c.refererStats.Range(func(key string, value *models.PathMetrics) bool {
stats := value
requestCount := stats.GetRequestCount()
if requestCount > 0 {
totalLatency := stats.GetTotalLatency()
@ -221,21 +374,11 @@ func (c *Collector) GetStats() map[string]interface{} {
// 收集延迟分布
latencyDistribution := make(map[string]int64)
// 确保所有桶都存在即使计数为0
buckets := []string{"lt10ms", "10-50ms", "50-200ms", "200-1000ms", "gt1s"}
for _, bucket := range buckets {
if counter, ok := c.latencyBuckets.Load(bucket); ok {
if counter != nil {
value := atomic.LoadInt64(counter.(*int64))
latencyDistribution[bucket] = value
} else {
latencyDistribution[bucket] = 0
}
} else {
latencyDistribution[bucket] = 0
}
}
latencyDistribution["lt10ms"] = atomic.LoadInt64(&c.latencyBuckets.lt10ms)
latencyDistribution["10-50ms"] = atomic.LoadInt64(&c.latencyBuckets.ms10_50)
latencyDistribution["50-200ms"] = atomic.LoadInt64(&c.latencyBuckets.ms50_200)
latencyDistribution["200-1000ms"] = atomic.LoadInt64(&c.latencyBuckets.ms200_1000)
latencyDistribution["gt1s"] = atomic.LoadInt64(&c.latencyBuckets.gt1s)
// 获取最近请求记录(使用读锁)
recentRequests := c.recentRequests.GetAll()
@ -306,14 +449,13 @@ func (c *Collector) validateLoadedData() error {
// 验证状态码统计
var statusCodeTotal int64
c.statusCodeStats.Range(func(key, value interface{}) bool {
count := atomic.LoadInt64(value.(*int64))
statusStats := c.statusCodeStats.GetStats()
for _, count := range statusStats {
if count < 0 {
return false
return fmt.Errorf("invalid negative status code count")
}
statusCodeTotal += count
return true
})
}
return nil
}
@ -418,21 +560,10 @@ func (c *Collector) startCleanupTask() {
oneDayAgo := time.Now().Add(-24 * time.Hour).Unix()
// 清理超过24小时的引用来源统计
var keysToDelete []interface{}
c.refererStats.Range(func(key, value interface{}) bool {
metrics := value.(*models.PathMetrics)
if metrics.LastAccessTime.Load() < oneDayAgo {
keysToDelete = append(keysToDelete, key)
}
return true
})
deletedCount := c.refererStats.Cleanup(oneDayAgo)
for _, key := range keysToDelete {
c.refererStats.Delete(key)
}
if len(keysToDelete) > 0 {
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", len(keysToDelete))
if deletedCount > 0 {
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", deletedCount)
}
// 强制GC
@ -469,14 +600,7 @@ func (c *Collector) startAsyncMetricsUpdater() {
func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
for _, m := range batch {
// 更新状态码统计
statusKey := fmt.Sprintf("%d", m.Status)
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.statusCodeStats.Store(statusKey, counter)
}
c.statusCodeStats.Increment(m.Status)
// 更新总字节数和带宽统计
atomic.AddInt64(&c.totalBytes, m.Bytes)
@ -506,26 +630,17 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
// 更新延迟分布
latencyMs := m.Latency.Milliseconds()
var bucketKey string
switch {
case latencyMs < 10:
bucketKey = "lt10ms"
atomic.AddInt64(&c.latencyBuckets.lt10ms, 1)
case latencyMs < 50:
bucketKey = "10-50ms"
atomic.AddInt64(&c.latencyBuckets.ms10_50, 1)
case latencyMs < 200:
bucketKey = "50-200ms"
atomic.AddInt64(&c.latencyBuckets.ms50_200, 1)
case latencyMs < 1000:
bucketKey = "200-1000ms"
atomic.AddInt64(&c.latencyBuckets.ms200_1000, 1)
default:
bucketKey = "gt1s"
}
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.latencyBuckets.Store(bucketKey, counter)
atomic.AddInt64(&c.latencyBuckets.gt1s, 1)
}
// 记录引用来源
@ -534,7 +649,7 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
if referer != "" {
var refererMetrics *models.PathMetrics
if existingMetrics, ok := c.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
refererMetrics = existingMetrics
} else {
refererMetrics = &models.PathMetrics{Path: referer}
c.refererStats.Store(referer, refererMetrics)

View File

@ -8,6 +8,7 @@ import (
"path/filepath"
"proxy-go/internal/utils"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -140,22 +141,35 @@ func (ms *MetricsStorage) LoadMetrics() error {
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
} else {
for statusCode, count := range statusCodeStats {
countValue, ok := count.(float64)
if !ok {
continue
}
// 由于新的 StatusCodeStats 结构,我们需要手动设置值
loadedCount := 0
for codeStr, countValue := range statusCodeStats {
// 解析状态码
if code, err := strconv.Atoi(codeStr); err == nil {
// 解析计数值
var count int64
switch v := countValue.(type) {
case float64:
count = int64(v)
case int64:
count = v
case int:
count = int64(v)
default:
continue
}
// 创建或更新状态码统计
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
} else {
counter := new(int64)
*counter = int64(countValue)
ms.collector.statusCodeStats.Store(statusCode, counter)
// 手动设置到新的 StatusCodeStats 结构中
ms.collector.statusCodeStats.mu.Lock()
if _, exists := ms.collector.statusCodeStats.stats[code]; !exists {
ms.collector.statusCodeStats.stats[code] = new(int64)
}
atomic.StoreInt64(ms.collector.statusCodeStats.stats[code], count)
ms.collector.statusCodeStats.mu.Unlock()
loadedCount++
}
}
log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats))
log.Printf("[MetricsStorage] 成功加载了 %d 条状态码统计", loadedCount)
}
}
@ -168,14 +182,25 @@ func (ms *MetricsStorage) LoadMetrics() error {
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
} else {
// 由于新的 LatencyBuckets 结构,我们需要手动设置值
for bucket, count := range distribution {
countValue, ok := count.(float64)
if !ok {
continue
}
if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
atomic.StoreInt64(counter.(*int64), int64(countValue))
// 根据桶名称设置对应的值
switch bucket {
case "lt10ms":
atomic.StoreInt64(&ms.collector.latencyBuckets.lt10ms, int64(countValue))
case "10-50ms":
atomic.StoreInt64(&ms.collector.latencyBuckets.ms10_50, int64(countValue))
case "50-200ms":
atomic.StoreInt64(&ms.collector.latencyBuckets.ms50_200, int64(countValue))
case "200-1000ms":
atomic.StoreInt64(&ms.collector.latencyBuckets.ms200_1000, int64(countValue))
case "gt1s":
atomic.StoreInt64(&ms.collector.latencyBuckets.gt1s, int64(countValue))
}
}
log.Printf("[MetricsStorage] 加载了延迟分布数据")

View File

@ -67,11 +67,38 @@ func (rs *RuleService) SelectBestRule(client *http.Client, pathConfig config.Pat
return nil, false, false
}
// 获取文件大小
// 检查是否需要获取文件大小
// 如果所有匹配的规则都没有设置大小阈值(都是默认值),则跳过文件大小检查
needSizeCheck := false
for _, rule := range domainMatchingRules {
if rule.SizeThreshold > 0 || rule.MaxSize < (1<<63-1) {
needSizeCheck = true
break
}
}
if !needSizeCheck {
// 不需要检查文件大小,直接使用第一个匹配的规则
for _, rule := range domainMatchingRules {
if utils.IsTargetAccessible(client, rule.Target+path) {
log.Printf("[SelectRule] %s -> 选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
return rule, true, true
}
}
return nil, false, false
}
// 获取文件大小(使用同步检查)
contentLength, err := utils.GetFileSize(client, pathConfig.DefaultTarget+path)
if err != nil {
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v严格模式下不使用扩展名规则", path, err)
// 严格模式:如果无法获取文件大小,不使用扩展名规则
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v使用宽松模式回退", path, err)
// 宽松模式:如果无法获取文件大小,尝试使用第一个匹配的规则
for _, rule := range domainMatchingRules {
if utils.IsTargetAccessible(client, rule.Target+path) {
log.Printf("[SelectRule] %s -> 使用宽松模式选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
return rule, true, true
}
}
return nil, false, false
}

View File

@ -10,76 +10,207 @@ import (
neturl "net/url"
"path/filepath"
"proxy-go/internal/config"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)
// 文件大小缓存项
// Goroutine 池相关结构
type GoroutinePool struct {
maxWorkers int
taskQueue chan func()
wg sync.WaitGroup
once sync.Once
stopped int32
}
// 全局 goroutine 池
var (
globalPool *GoroutinePool
poolOnce sync.Once
defaultWorkers = runtime.NumCPU() * 4 // 默认工作协程数量
)
// GetGoroutinePool 获取全局 goroutine 池
func GetGoroutinePool() *GoroutinePool {
poolOnce.Do(func() {
globalPool = NewGoroutinePool(defaultWorkers)
})
return globalPool
}
// NewGoroutinePool 创建新的 goroutine 池
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
if maxWorkers <= 0 {
maxWorkers = runtime.NumCPU() * 2
}
pool := &GoroutinePool{
maxWorkers: maxWorkers,
taskQueue: make(chan func(), maxWorkers*10), // 缓冲区为工作协程数的10倍
}
// 启动工作协程
for i := 0; i < maxWorkers; i++ {
pool.wg.Add(1)
go pool.worker()
}
return pool
}
// worker 工作协程
func (p *GoroutinePool) worker() {
defer p.wg.Done()
for {
select {
case task, ok := <-p.taskQueue:
if !ok {
return // 通道关闭,退出
}
// 执行任务,捕获 panic
func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("[GoroutinePool] Worker panic: %v\n", r)
}
}()
task()
}()
}
}
}
// Submit 提交任务到池中
func (p *GoroutinePool) Submit(task func()) error {
if atomic.LoadInt32(&p.stopped) == 1 {
return fmt.Errorf("goroutine pool is stopped")
}
select {
case p.taskQueue <- task:
return nil
case <-time.After(100 * time.Millisecond): // 100ms 超时
return fmt.Errorf("goroutine pool is busy")
}
}
// SubmitWithTimeout 提交任务到池中,带超时
func (p *GoroutinePool) SubmitWithTimeout(task func(), timeout time.Duration) error {
if atomic.LoadInt32(&p.stopped) == 1 {
return fmt.Errorf("goroutine pool is stopped")
}
select {
case p.taskQueue <- task:
return nil
case <-time.After(timeout):
return fmt.Errorf("goroutine pool submit timeout")
}
}
// Stop 停止 goroutine 池
func (p *GoroutinePool) Stop() {
p.once.Do(func() {
atomic.StoreInt32(&p.stopped, 1)
close(p.taskQueue)
p.wg.Wait()
})
}
// Size 返回池中工作协程数量
func (p *GoroutinePool) Size() int {
return p.maxWorkers
}
// QueueSize 返回当前任务队列大小
func (p *GoroutinePool) QueueSize() int {
return len(p.taskQueue)
}
// 异步执行函数的包装器
func GoSafe(fn func()) {
pool := GetGoroutinePool()
err := pool.Submit(fn)
if err != nil {
// 如果池满了,直接启动 goroutine降级处理
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("[GoSafe] Panic: %v\n", r)
}
}()
fn()
}()
}
}
// 带超时的异步执行
func GoSafeWithTimeout(fn func(), timeout time.Duration) error {
pool := GetGoroutinePool()
return pool.SubmitWithTimeout(fn, timeout)
}
// 文件大小缓存相关
type fileSizeCache struct {
size int64
timestamp time.Time
}
// 可访问性缓存项
type accessibilityCache struct {
accessible bool
timestamp time.Time
}
// 全局缓存
var (
// 文件大小缓存过期时间5分钟
sizeCache sync.Map
// 可访问性缓存过期时间30秒
accessCache sync.Map
cacheTTL = 5 * time.Minute
accessTTL = 30 * time.Second
maxCacheSize = 10000 // 最大缓存条目数
sizeCache sync.Map
accessCache sync.Map
cacheTTL = 5 * time.Minute
accessTTL = 2 * time.Minute
)
// 清理过期缓存
// 初始化函数
func init() {
go func() {
ticker := time.NewTicker(time.Minute)
for range ticker.C {
now := time.Now()
// 清理文件大小缓存
var items []struct {
key interface{}
timestamp time.Time
}
sizeCache.Range(func(key, value interface{}) bool {
cache := value.(fileSizeCache)
if now.Sub(cache.timestamp) > cacheTTL {
sizeCache.Delete(key)
} else {
items = append(items, struct {
key interface{}
timestamp time.Time
}{key, cache.timestamp})
}
return true
})
if len(items) > maxCacheSize {
sort.Slice(items, func(i, j int) bool {
return items[i].timestamp.Before(items[j].timestamp)
})
for i := 0; i < len(items)/2; i++ {
sizeCache.Delete(items[i].key)
}
}
// 启动定期清理缓存的协程
GoSafe(func() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
// 清理可访问性缓存
accessCache.Range(func(key, value interface{}) bool {
cache := value.(accessibilityCache)
if now.Sub(cache.timestamp) > accessTTL {
accessCache.Delete(key)
}
return true
})
for range ticker.C {
cleanExpiredCache()
}
}()
})
}
// 清理过期缓存
func cleanExpiredCache() {
now := time.Now()
// 清理文件大小缓存
sizeCache.Range(func(key, value interface{}) bool {
if cache, ok := value.(fileSizeCache); ok {
if now.Sub(cache.timestamp) > cacheTTL {
sizeCache.Delete(key)
}
}
return true
})
// 清理可访问性缓存
accessCache.Range(func(key, value interface{}) bool {
if cache, ok := value.(accessibilityCache); ok {
if now.Sub(cache.timestamp) > accessTTL {
accessCache.Delete(key)
}
}
return true
})
}
// GenerateRequestID 生成唯一的请求ID
@ -94,6 +225,9 @@ func GenerateRequestID() string {
// 获取请求来源
func GetRequestSource(r *http.Request) string {
if r == nil {
return ""
}
referer := r.Header.Get("Referer")
if referer != "" {
return fmt.Sprintf(" (from: %s)", referer)
@ -131,7 +265,7 @@ func IsImageRequest(path string) bool {
return imageExts[ext]
}
// GetFileSize 发送HEAD请求获取文件大小
// GetFileSize 发送HEAD请求获取文件大小(保持向后兼容)
func GetFileSize(client *http.Client, url string) (int64, error) {
// 先查缓存
if cache, ok := sizeCache.Load(url); ok {