mirror of
https://github.com/woodchen-ink/proxy-go.git
synced 2025-07-18 00:21:56 +08:00
Compare commits
2 Commits
818dd11dda
...
6fd69ba870
Author | SHA1 | Date | |
---|---|---|---|
6fd69ba870 | |||
5750062168 |
281
internal/cache/manager.go
vendored
281
internal/cache/manager.go
vendored
@ -19,6 +19,174 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// 内存池用于复用缓冲区
|
||||
var (
|
||||
bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 32*1024) // 32KB 缓冲区
|
||||
},
|
||||
}
|
||||
|
||||
// 大缓冲区池(用于大文件)
|
||||
largeBufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1024*1024) // 1MB 缓冲区
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// GetBuffer 从池中获取缓冲区
|
||||
func GetBuffer(size int) []byte {
|
||||
if size <= 32*1024 {
|
||||
buf := bufferPool.Get().([]byte)
|
||||
if cap(buf) >= size {
|
||||
return buf[:size]
|
||||
}
|
||||
bufferPool.Put(buf)
|
||||
} else if size <= 1024*1024 {
|
||||
buf := largeBufPool.Get().([]byte)
|
||||
if cap(buf) >= size {
|
||||
return buf[:size]
|
||||
}
|
||||
largeBufPool.Put(buf)
|
||||
}
|
||||
// 如果池中的缓冲区不够大,创建新的
|
||||
return make([]byte, size)
|
||||
}
|
||||
|
||||
// PutBuffer 将缓冲区放回池中
|
||||
func PutBuffer(buf []byte) {
|
||||
if cap(buf) == 32*1024 {
|
||||
bufferPool.Put(buf)
|
||||
} else if cap(buf) == 1024*1024 {
|
||||
largeBufPool.Put(buf)
|
||||
}
|
||||
// 其他大小的缓冲区让GC处理
|
||||
}
|
||||
|
||||
// LRU 缓存节点
|
||||
type LRUNode struct {
|
||||
key CacheKey
|
||||
value *CacheItem
|
||||
prev *LRUNode
|
||||
next *LRUNode
|
||||
}
|
||||
|
||||
// LRU 缓存实现
|
||||
type LRUCache struct {
|
||||
capacity int
|
||||
size int
|
||||
head *LRUNode
|
||||
tail *LRUNode
|
||||
cache map[CacheKey]*LRUNode
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewLRUCache 创建LRU缓存
|
||||
func NewLRUCache(capacity int) *LRUCache {
|
||||
lru := &LRUCache{
|
||||
capacity: capacity,
|
||||
cache: make(map[CacheKey]*LRUNode),
|
||||
head: &LRUNode{},
|
||||
tail: &LRUNode{},
|
||||
}
|
||||
lru.head.next = lru.tail
|
||||
lru.tail.prev = lru.head
|
||||
return lru
|
||||
}
|
||||
|
||||
// Get 从LRU缓存中获取
|
||||
func (lru *LRUCache) Get(key CacheKey) (*CacheItem, bool) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
if node, exists := lru.cache[key]; exists {
|
||||
lru.moveToHead(node)
|
||||
return node.value, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Put 向LRU缓存中添加
|
||||
func (lru *LRUCache) Put(key CacheKey, value *CacheItem) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
if node, exists := lru.cache[key]; exists {
|
||||
node.value = value
|
||||
lru.moveToHead(node)
|
||||
} else {
|
||||
newNode := &LRUNode{key: key, value: value}
|
||||
lru.cache[key] = newNode
|
||||
lru.addToHead(newNode)
|
||||
lru.size++
|
||||
|
||||
if lru.size > lru.capacity {
|
||||
tail := lru.removeTail()
|
||||
delete(lru.cache, tail.key)
|
||||
lru.size--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete 从LRU缓存中删除
|
||||
func (lru *LRUCache) Delete(key CacheKey) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
if node, exists := lru.cache[key]; exists {
|
||||
lru.removeNode(node)
|
||||
delete(lru.cache, key)
|
||||
lru.size--
|
||||
}
|
||||
}
|
||||
|
||||
// moveToHead 将节点移到头部
|
||||
func (lru *LRUCache) moveToHead(node *LRUNode) {
|
||||
lru.removeNode(node)
|
||||
lru.addToHead(node)
|
||||
}
|
||||
|
||||
// addToHead 添加到头部
|
||||
func (lru *LRUCache) addToHead(node *LRUNode) {
|
||||
node.prev = lru.head
|
||||
node.next = lru.head.next
|
||||
lru.head.next.prev = node
|
||||
lru.head.next = node
|
||||
}
|
||||
|
||||
// removeNode 移除节点
|
||||
func (lru *LRUCache) removeNode(node *LRUNode) {
|
||||
node.prev.next = node.next
|
||||
node.next.prev = node.prev
|
||||
}
|
||||
|
||||
// removeTail 移除尾部节点
|
||||
func (lru *LRUCache) removeTail() *LRUNode {
|
||||
lastNode := lru.tail.prev
|
||||
lru.removeNode(lastNode)
|
||||
return lastNode
|
||||
}
|
||||
|
||||
// Range 遍历所有缓存项
|
||||
func (lru *LRUCache) Range(fn func(key CacheKey, value *CacheItem) bool) {
|
||||
lru.mu.RLock()
|
||||
defer lru.mu.RUnlock()
|
||||
|
||||
for key, node := range lru.cache {
|
||||
if !fn(key, node.value) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Size 返回缓存大小
|
||||
func (lru *LRUCache) Size() int {
|
||||
lru.mu.RLock()
|
||||
defer lru.mu.RUnlock()
|
||||
return lru.size
|
||||
}
|
||||
|
||||
// CacheKey 用于标识缓存项的唯一键
|
||||
type CacheKey struct {
|
||||
URL string
|
||||
@ -55,6 +223,7 @@ type CacheItem struct {
|
||||
Hash string
|
||||
CreatedAt time.Time
|
||||
AccessCount int64
|
||||
Priority int // 缓存优先级
|
||||
}
|
||||
|
||||
// CacheStats 缓存统计信息
|
||||
@ -71,7 +240,8 @@ type CacheStats struct {
|
||||
// CacheManager 缓存管理器
|
||||
type CacheManager struct {
|
||||
cacheDir string
|
||||
items sync.Map
|
||||
items sync.Map // 保持原有的 sync.Map 用于文件缓存
|
||||
lruCache *LRUCache // 新增LRU缓存用于热点数据
|
||||
maxAge time.Duration
|
||||
cleanupTick time.Duration
|
||||
maxCacheSize int64
|
||||
@ -84,6 +254,9 @@ type CacheManager struct {
|
||||
|
||||
// ExtensionMatcher缓存
|
||||
extensionMatcherCache *ExtensionMatcherCache
|
||||
|
||||
// 缓存预热
|
||||
prewarming atomic.Bool
|
||||
}
|
||||
|
||||
// NewCacheManager 创建新的缓存管理器
|
||||
@ -94,6 +267,7 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
|
||||
|
||||
cm := &CacheManager{
|
||||
cacheDir: cacheDir,
|
||||
lruCache: NewLRUCache(10000), // 10000个热点缓存项
|
||||
maxAge: 30 * time.Minute,
|
||||
cleanupTick: 5 * time.Minute,
|
||||
maxCacheSize: 10 * 1024 * 1024 * 1024, // 10GB
|
||||
@ -118,6 +292,9 @@ func NewCacheManager(cacheDir string) (*CacheManager, error) {
|
||||
// 启动清理协程
|
||||
cm.startCleanup()
|
||||
|
||||
// 启动缓存预热
|
||||
go cm.prewarmCache()
|
||||
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
@ -147,7 +324,22 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
|
||||
return nil, false, false
|
||||
}
|
||||
|
||||
// 检查缓存项是否存在
|
||||
// 检查LRU缓存
|
||||
if item, found := cm.lruCache.Get(key); found {
|
||||
// 检查LRU缓存项是否过期
|
||||
if time.Since(item.LastAccess) > cm.maxAge {
|
||||
cm.lruCache.Delete(key)
|
||||
cm.missCount.Add(1)
|
||||
return nil, false, false
|
||||
}
|
||||
// 更新访问时间
|
||||
item.LastAccess = time.Now()
|
||||
atomic.AddInt64(&item.AccessCount, 1)
|
||||
cm.hitCount.Add(1)
|
||||
return item, true, false
|
||||
}
|
||||
|
||||
// 检查文件缓存
|
||||
value, ok := cm.items.Load(key)
|
||||
if !ok {
|
||||
cm.missCount.Add(1)
|
||||
@ -177,6 +369,9 @@ func (cm *CacheManager) Get(key CacheKey, r *http.Request) (*CacheItem, bool, bo
|
||||
cm.hitCount.Add(1)
|
||||
cm.bytesSaved.Add(item.Size)
|
||||
|
||||
// 将缓存项添加到LRU缓存
|
||||
cm.lruCache.Put(key, item)
|
||||
|
||||
return item, true, false
|
||||
}
|
||||
|
||||
@ -230,7 +425,11 @@ func (cm *CacheManager) Put(key CacheKey, resp *http.Response, body []byte) (*Ca
|
||||
}
|
||||
|
||||
cm.items.Store(key, item)
|
||||
log.Printf("[Cache] NEW %s %s (%s) from %s", resp.Request.Method, key.URL, formatBytes(item.Size), utils.GetRequestSource(resp.Request))
|
||||
method := "GET"
|
||||
if resp.Request != nil {
|
||||
method = resp.Request.Method
|
||||
}
|
||||
log.Printf("[Cache] NEW %s %s (%s) from %s", method, key.URL, formatBytes(item.Size), utils.GetRequestSource(resp.Request))
|
||||
return item, nil
|
||||
}
|
||||
|
||||
@ -649,3 +848,79 @@ func (cm *CacheManager) Stop() {
|
||||
cm.extensionMatcherCache.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// prewarmCache 启动缓存预热
|
||||
func (cm *CacheManager) prewarmCache() {
|
||||
// 模拟一些请求来预热缓存
|
||||
// 实际应用中,这里会从数据库或外部服务加载热点数据
|
||||
// 例如,从数据库加载最近访问频率高的URL
|
||||
// 或者从外部API获取热门资源
|
||||
|
||||
// 示例:加载最近访问的URL
|
||||
// 假设我们有一个数据库或文件,记录最近访问的URL和它们的哈希
|
||||
// 这里我们简单地加载一些示例URL
|
||||
exampleUrls := []string{
|
||||
"https://example.com/api/data",
|
||||
"https://api.github.com/repos/golang/go/releases",
|
||||
"https://api.openai.com/v1/models",
|
||||
"https://api.openai.com/v1/chat/completions",
|
||||
}
|
||||
|
||||
for _, url := range exampleUrls {
|
||||
// 生成一个随机的Accept Headers和UserAgent
|
||||
acceptHeaders := "application/json"
|
||||
userAgent := "Mozilla/5.0 (compatible; ProxyGo/1.0)"
|
||||
|
||||
// 模拟一个HTTP请求
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
log.Printf("[Cache] ERR Failed to create request for prewarming: %v", err)
|
||||
continue
|
||||
}
|
||||
req.Header.Set("Accept", acceptHeaders)
|
||||
req.Header.Set("User-Agent", userAgent)
|
||||
|
||||
// 生成缓存键
|
||||
cacheKey := cm.GenerateCacheKey(req)
|
||||
|
||||
// 尝试从LRU缓存获取
|
||||
if _, found := cm.lruCache.Get(cacheKey); found {
|
||||
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
|
||||
continue
|
||||
}
|
||||
|
||||
// 尝试从文件缓存获取
|
||||
if _, ok := cm.items.Load(cacheKey); ok {
|
||||
log.Printf("[Cache] WARN %s (prewarmed)", cacheKey.URL)
|
||||
continue
|
||||
}
|
||||
|
||||
// 模拟一个HTTP响应
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: make(http.Header),
|
||||
Request: req,
|
||||
}
|
||||
resp.Header.Set("Content-Type", "application/json")
|
||||
resp.Header.Set("Content-Encoding", "gzip")
|
||||
resp.Header.Set("X-Cache", "HIT") // 模拟缓存命中
|
||||
|
||||
// 模拟一个HTTP请求体
|
||||
body := []byte(`{"message": "Hello from prewarmed cache"}`)
|
||||
|
||||
// 添加到LRU缓存
|
||||
contentHash := sha256.Sum256(body)
|
||||
cm.lruCache.Put(cacheKey, &CacheItem{
|
||||
FilePath: "", // 文件缓存,这里不需要
|
||||
ContentType: "application/json",
|
||||
ContentEncoding: "gzip",
|
||||
Size: int64(len(body)),
|
||||
LastAccess: time.Now(),
|
||||
Hash: hex.EncodeToString(contentHash[:]),
|
||||
CreatedAt: time.Now(),
|
||||
AccessCount: 1,
|
||||
})
|
||||
|
||||
log.Printf("[Cache] PREWARM %s", cacheKey.URL)
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"proxy-go/internal/cache"
|
||||
@ -13,6 +14,15 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/woodchen-ink/go-web-utils/iputil"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
// 镜像代理专用配置常量
|
||||
const (
|
||||
mirrorMaxIdleConns = 2000 // 镜像代理全局最大空闲连接
|
||||
mirrorMaxIdleConnsPerHost = 200 // 镜像代理每个主机最大空闲连接
|
||||
mirrorMaxConnsPerHost = 500 // 镜像代理每个主机最大连接数
|
||||
mirrorTimeout = 60 * time.Second // 镜像代理超时时间
|
||||
)
|
||||
|
||||
type MirrorProxyHandler struct {
|
||||
@ -21,10 +31,38 @@ type MirrorProxyHandler struct {
|
||||
}
|
||||
|
||||
func NewMirrorProxyHandler() *MirrorProxyHandler {
|
||||
// 创建优化的拨号器
|
||||
dialer := &net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}
|
||||
|
||||
// 创建优化的传输层
|
||||
transport := &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
DialContext: dialer.DialContext,
|
||||
MaxIdleConns: mirrorMaxIdleConns,
|
||||
MaxIdleConnsPerHost: mirrorMaxIdleConnsPerHost,
|
||||
MaxConnsPerHost: mirrorMaxConnsPerHost,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
DisableCompression: false,
|
||||
ForceAttemptHTTP2: true,
|
||||
WriteBufferSize: 128 * 1024,
|
||||
ReadBufferSize: 128 * 1024,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
MaxResponseHeaderBytes: 64 * 1024,
|
||||
}
|
||||
|
||||
// 配置 HTTP/2
|
||||
http2Transport, err := http2.ConfigureTransports(transport)
|
||||
if err == nil && http2Transport != nil {
|
||||
http2Transport.ReadIdleTimeout = 30 * time.Second
|
||||
http2Transport.PingTimeout = 10 * time.Second
|
||||
http2Transport.AllowHTTP = false
|
||||
http2Transport.MaxReadFrameSize = 32 * 1024
|
||||
http2Transport.StrictMaxConcurrentStreams = true
|
||||
}
|
||||
|
||||
// 初始化缓存管理器
|
||||
@ -36,7 +74,13 @@ func NewMirrorProxyHandler() *MirrorProxyHandler {
|
||||
return &MirrorProxyHandler{
|
||||
client: &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: mirrorTimeout,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
if len(via) >= 10 {
|
||||
return fmt.Errorf("stopped after 10 redirects")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
Cache: cacheManager,
|
||||
}
|
||||
|
@ -43,6 +43,21 @@ var hopHeadersBase = map[string]bool{
|
||||
"Upgrade": true,
|
||||
}
|
||||
|
||||
// 优化后的连接池配置常量
|
||||
const (
|
||||
// 连接池配置
|
||||
maxIdleConns = 5000 // 全局最大空闲连接数(增加)
|
||||
maxIdleConnsPerHost = 500 // 每个主机最大空闲连接数(增加)
|
||||
maxConnsPerHost = 1000 // 每个主机最大连接数(增加)
|
||||
|
||||
// 缓冲区大小优化
|
||||
writeBufferSize = 256 * 1024 // 写缓冲区(增加)
|
||||
readBufferSize = 256 * 1024 // 读缓冲区(增加)
|
||||
|
||||
// HTTP/2 配置
|
||||
maxReadFrameSize = 64 * 1024 // HTTP/2 最大读帧大小(增加)
|
||||
)
|
||||
|
||||
// ErrorHandler 定义错误处理函数类型
|
||||
type ErrorHandler func(w http.ResponseWriter, r *http.Request, err error)
|
||||
|
||||
@ -126,29 +141,30 @@ func NewProxyHandler(cfg *config.Config) *ProxyHandler {
|
||||
|
||||
transport := &http.Transport{
|
||||
DialContext: dialer.DialContext,
|
||||
MaxIdleConns: 2000,
|
||||
MaxIdleConnsPerHost: 200,
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxIdleConnsPerHost: maxIdleConnsPerHost,
|
||||
IdleConnTimeout: idleConnTimeout,
|
||||
TLSHandshakeTimeout: tlsHandshakeTimeout,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
MaxConnsPerHost: 400,
|
||||
MaxConnsPerHost: maxConnsPerHost,
|
||||
DisableKeepAlives: false,
|
||||
DisableCompression: false,
|
||||
ForceAttemptHTTP2: true,
|
||||
WriteBufferSize: 128 * 1024,
|
||||
ReadBufferSize: 128 * 1024,
|
||||
WriteBufferSize: writeBufferSize,
|
||||
ReadBufferSize: readBufferSize,
|
||||
ResponseHeaderTimeout: backendServTimeout,
|
||||
MaxResponseHeaderBytes: 64 * 1024,
|
||||
MaxResponseHeaderBytes: 128 * 1024, // 增加响应头缓冲区
|
||||
}
|
||||
|
||||
// 设置HTTP/2传输配置
|
||||
http2Transport, err := http2.ConfigureTransports(transport)
|
||||
if err == nil && http2Transport != nil {
|
||||
http2Transport.ReadIdleTimeout = 10 * time.Second
|
||||
http2Transport.PingTimeout = 5 * time.Second
|
||||
http2Transport.ReadIdleTimeout = 30 * time.Second // 增加读空闲超时
|
||||
http2Transport.PingTimeout = 10 * time.Second // 增加ping超时
|
||||
http2Transport.AllowHTTP = false
|
||||
http2Transport.MaxReadFrameSize = 32 * 1024
|
||||
http2Transport.MaxReadFrameSize = maxReadFrameSize // 使用常量
|
||||
http2Transport.StrictMaxConcurrentStreams = true
|
||||
|
||||
}
|
||||
|
||||
// 初始化缓存管理器
|
||||
|
@ -15,6 +15,157 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// 优化的状态码统计结构
|
||||
type StatusCodeStats struct {
|
||||
mu sync.RWMutex
|
||||
stats map[int]*int64 // 预分配常见状态码
|
||||
}
|
||||
|
||||
// 优化的延迟分布统计
|
||||
type LatencyBuckets struct {
|
||||
lt10ms int64
|
||||
ms10_50 int64
|
||||
ms50_200 int64
|
||||
ms200_1000 int64
|
||||
gt1s int64
|
||||
}
|
||||
|
||||
// 优化的引用来源统计(使用分片减少锁竞争)
|
||||
type RefererStats struct {
|
||||
shards []*RefererShard
|
||||
mask uint64
|
||||
}
|
||||
|
||||
type RefererShard struct {
|
||||
mu sync.RWMutex
|
||||
data map[string]*models.PathMetrics
|
||||
}
|
||||
|
||||
const (
|
||||
refererShardCount = 32 // 分片数量,必须是2的幂
|
||||
)
|
||||
|
||||
func NewRefererStats() *RefererStats {
|
||||
rs := &RefererStats{
|
||||
shards: make([]*RefererShard, refererShardCount),
|
||||
mask: refererShardCount - 1,
|
||||
}
|
||||
for i := 0; i < refererShardCount; i++ {
|
||||
rs.shards[i] = &RefererShard{
|
||||
data: make(map[string]*models.PathMetrics),
|
||||
}
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
func (rs *RefererStats) hash(key string) uint64 {
|
||||
// 简单的字符串哈希函数
|
||||
var h uint64 = 14695981039346656037
|
||||
for _, b := range []byte(key) {
|
||||
h ^= uint64(b)
|
||||
h *= 1099511628211
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func (rs *RefererStats) getShard(key string) *RefererShard {
|
||||
return rs.shards[rs.hash(key)&rs.mask]
|
||||
}
|
||||
|
||||
func (rs *RefererStats) Load(key string) (*models.PathMetrics, bool) {
|
||||
shard := rs.getShard(key)
|
||||
shard.mu.RLock()
|
||||
defer shard.mu.RUnlock()
|
||||
val, ok := shard.data[key]
|
||||
return val, ok
|
||||
}
|
||||
|
||||
func (rs *RefererStats) Store(key string, value *models.PathMetrics) {
|
||||
shard := rs.getShard(key)
|
||||
shard.mu.Lock()
|
||||
defer shard.mu.Unlock()
|
||||
shard.data[key] = value
|
||||
}
|
||||
|
||||
func (rs *RefererStats) Delete(key string) {
|
||||
shard := rs.getShard(key)
|
||||
shard.mu.Lock()
|
||||
defer shard.mu.Unlock()
|
||||
delete(shard.data, key)
|
||||
}
|
||||
|
||||
func (rs *RefererStats) Range(f func(key string, value *models.PathMetrics) bool) {
|
||||
for _, shard := range rs.shards {
|
||||
shard.mu.RLock()
|
||||
for k, v := range shard.data {
|
||||
if !f(k, v) {
|
||||
shard.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
shard.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RefererStats) Cleanup(cutoff int64) int {
|
||||
deleted := 0
|
||||
for _, shard := range rs.shards {
|
||||
shard.mu.Lock()
|
||||
for k, v := range shard.data {
|
||||
if v.LastAccessTime.Load() < cutoff {
|
||||
delete(shard.data, k)
|
||||
deleted++
|
||||
}
|
||||
}
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
return deleted
|
||||
}
|
||||
|
||||
func NewStatusCodeStats() *StatusCodeStats {
|
||||
s := &StatusCodeStats{
|
||||
stats: make(map[int]*int64),
|
||||
}
|
||||
// 预分配常见状态码
|
||||
commonCodes := []int{200, 201, 204, 301, 302, 304, 400, 401, 403, 404, 429, 500, 502, 503, 504}
|
||||
for _, code := range commonCodes {
|
||||
counter := new(int64)
|
||||
s.stats[code] = counter
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *StatusCodeStats) Increment(code int) {
|
||||
s.mu.RLock()
|
||||
if counter, exists := s.stats[code]; exists {
|
||||
s.mu.RUnlock()
|
||||
atomic.AddInt64(counter, 1)
|
||||
return
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
// 需要创建新的计数器
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if counter, exists := s.stats[code]; exists {
|
||||
atomic.AddInt64(counter, 1)
|
||||
} else {
|
||||
counter := new(int64)
|
||||
*counter = 1
|
||||
s.stats[code] = counter
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusCodeStats) GetStats() map[string]int64 {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
result := make(map[string]int64)
|
||||
for code, counter := range s.stats {
|
||||
result[fmt.Sprintf("%d", code)] = atomic.LoadInt64(counter)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Collector 指标收集器
|
||||
type Collector struct {
|
||||
startTime time.Time
|
||||
@ -23,9 +174,9 @@ type Collector struct {
|
||||
latencySum int64
|
||||
maxLatency int64 // 最大响应时间
|
||||
minLatency int64 // 最小响应时间
|
||||
statusCodeStats sync.Map
|
||||
latencyBuckets sync.Map // 响应时间分布
|
||||
refererStats sync.Map // 引用来源统计
|
||||
statusCodeStats *StatusCodeStats
|
||||
latencyBuckets *LatencyBuckets // 使用结构体替代 sync.Map
|
||||
refererStats *RefererStats // 使用分片哈希表
|
||||
bandwidthStats struct {
|
||||
sync.RWMutex
|
||||
window time.Duration
|
||||
@ -61,6 +212,9 @@ func InitCollector(cfg *config.Config) error {
|
||||
recentRequests: models.NewRequestQueue(100),
|
||||
config: cfg,
|
||||
minLatency: math.MaxInt64,
|
||||
statusCodeStats: NewStatusCodeStats(),
|
||||
latencyBuckets: &LatencyBuckets{},
|
||||
refererStats: NewRefererStats(),
|
||||
}
|
||||
|
||||
// 初始化带宽统计
|
||||
@ -73,7 +227,19 @@ func InitCollector(cfg *config.Config) error {
|
||||
for _, bucket := range buckets {
|
||||
counter := new(int64)
|
||||
*counter = 0
|
||||
instance.latencyBuckets.Store(bucket, counter)
|
||||
// 根据 bucket 名称设置对应的桶计数器
|
||||
switch bucket {
|
||||
case "lt10ms":
|
||||
instance.latencyBuckets.lt10ms = atomic.LoadInt64(counter)
|
||||
case "10-50ms":
|
||||
instance.latencyBuckets.ms10_50 = atomic.LoadInt64(counter)
|
||||
case "50-200ms":
|
||||
instance.latencyBuckets.ms50_200 = atomic.LoadInt64(counter)
|
||||
case "200-1000ms":
|
||||
instance.latencyBuckets.ms200_1000 = atomic.LoadInt64(counter)
|
||||
case "gt1s":
|
||||
instance.latencyBuckets.gt1s = atomic.LoadInt64(counter)
|
||||
}
|
||||
}
|
||||
|
||||
// 初始化异步指标收集通道
|
||||
@ -152,14 +318,10 @@ func (c *Collector) GetStats() map[string]interface{} {
|
||||
|
||||
// 计算总请求数和平均延迟
|
||||
var totalRequests int64
|
||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
||||
if counter, ok := value.(*int64); ok {
|
||||
totalRequests += atomic.LoadInt64(counter)
|
||||
} else {
|
||||
totalRequests += value.(int64)
|
||||
statusCodeStats := c.statusCodeStats.GetStats()
|
||||
for _, count := range statusCodeStats {
|
||||
totalRequests += count
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
avgLatency := float64(0)
|
||||
if totalRequests > 0 {
|
||||
@ -169,22 +331,13 @@ func (c *Collector) GetStats() map[string]interface{} {
|
||||
// 计算总体平均每秒请求数
|
||||
requestsPerSecond := float64(totalRequests) / totalRuntime.Seconds()
|
||||
|
||||
// 收集状态码统计
|
||||
statusCodeStats := make(map[string]int64)
|
||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
||||
if counter, ok := value.(*int64); ok {
|
||||
statusCodeStats[key.(string)] = atomic.LoadInt64(counter)
|
||||
} else {
|
||||
statusCodeStats[key.(string)] = value.(int64)
|
||||
}
|
||||
return true
|
||||
})
|
||||
// 收集状态码统计(已经在上面获取了)
|
||||
|
||||
// 收集引用来源统计
|
||||
var refererMetrics []*models.PathMetrics
|
||||
refererCount := 0
|
||||
c.refererStats.Range(func(key, value interface{}) bool {
|
||||
stats := value.(*models.PathMetrics)
|
||||
c.refererStats.Range(func(key string, value *models.PathMetrics) bool {
|
||||
stats := value
|
||||
requestCount := stats.GetRequestCount()
|
||||
if requestCount > 0 {
|
||||
totalLatency := stats.GetTotalLatency()
|
||||
@ -221,21 +374,11 @@ func (c *Collector) GetStats() map[string]interface{} {
|
||||
|
||||
// 收集延迟分布
|
||||
latencyDistribution := make(map[string]int64)
|
||||
|
||||
// 确保所有桶都存在,即使计数为0
|
||||
buckets := []string{"lt10ms", "10-50ms", "50-200ms", "200-1000ms", "gt1s"}
|
||||
for _, bucket := range buckets {
|
||||
if counter, ok := c.latencyBuckets.Load(bucket); ok {
|
||||
if counter != nil {
|
||||
value := atomic.LoadInt64(counter.(*int64))
|
||||
latencyDistribution[bucket] = value
|
||||
} else {
|
||||
latencyDistribution[bucket] = 0
|
||||
}
|
||||
} else {
|
||||
latencyDistribution[bucket] = 0
|
||||
}
|
||||
}
|
||||
latencyDistribution["lt10ms"] = atomic.LoadInt64(&c.latencyBuckets.lt10ms)
|
||||
latencyDistribution["10-50ms"] = atomic.LoadInt64(&c.latencyBuckets.ms10_50)
|
||||
latencyDistribution["50-200ms"] = atomic.LoadInt64(&c.latencyBuckets.ms50_200)
|
||||
latencyDistribution["200-1000ms"] = atomic.LoadInt64(&c.latencyBuckets.ms200_1000)
|
||||
latencyDistribution["gt1s"] = atomic.LoadInt64(&c.latencyBuckets.gt1s)
|
||||
|
||||
// 获取最近请求记录(使用读锁)
|
||||
recentRequests := c.recentRequests.GetAll()
|
||||
@ -306,14 +449,13 @@ func (c *Collector) validateLoadedData() error {
|
||||
|
||||
// 验证状态码统计
|
||||
var statusCodeTotal int64
|
||||
c.statusCodeStats.Range(func(key, value interface{}) bool {
|
||||
count := atomic.LoadInt64(value.(*int64))
|
||||
statusStats := c.statusCodeStats.GetStats()
|
||||
for _, count := range statusStats {
|
||||
if count < 0 {
|
||||
return false
|
||||
return fmt.Errorf("invalid negative status code count")
|
||||
}
|
||||
statusCodeTotal += count
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -418,21 +560,10 @@ func (c *Collector) startCleanupTask() {
|
||||
oneDayAgo := time.Now().Add(-24 * time.Hour).Unix()
|
||||
|
||||
// 清理超过24小时的引用来源统计
|
||||
var keysToDelete []interface{}
|
||||
c.refererStats.Range(func(key, value interface{}) bool {
|
||||
metrics := value.(*models.PathMetrics)
|
||||
if metrics.LastAccessTime.Load() < oneDayAgo {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
deletedCount := c.refererStats.Cleanup(oneDayAgo)
|
||||
|
||||
for _, key := range keysToDelete {
|
||||
c.refererStats.Delete(key)
|
||||
}
|
||||
|
||||
if len(keysToDelete) > 0 {
|
||||
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", len(keysToDelete))
|
||||
if deletedCount > 0 {
|
||||
log.Printf("[Collector] 已清理 %d 条过期的引用来源统计", deletedCount)
|
||||
}
|
||||
|
||||
// 强制GC
|
||||
@ -469,14 +600,7 @@ func (c *Collector) startAsyncMetricsUpdater() {
|
||||
func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
||||
for _, m := range batch {
|
||||
// 更新状态码统计
|
||||
statusKey := fmt.Sprintf("%d", m.Status)
|
||||
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
|
||||
atomic.AddInt64(counter.(*int64), 1)
|
||||
} else {
|
||||
counter := new(int64)
|
||||
*counter = 1
|
||||
c.statusCodeStats.Store(statusKey, counter)
|
||||
}
|
||||
c.statusCodeStats.Increment(m.Status)
|
||||
|
||||
// 更新总字节数和带宽统计
|
||||
atomic.AddInt64(&c.totalBytes, m.Bytes)
|
||||
@ -506,26 +630,17 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
||||
|
||||
// 更新延迟分布
|
||||
latencyMs := m.Latency.Milliseconds()
|
||||
var bucketKey string
|
||||
switch {
|
||||
case latencyMs < 10:
|
||||
bucketKey = "lt10ms"
|
||||
atomic.AddInt64(&c.latencyBuckets.lt10ms, 1)
|
||||
case latencyMs < 50:
|
||||
bucketKey = "10-50ms"
|
||||
atomic.AddInt64(&c.latencyBuckets.ms10_50, 1)
|
||||
case latencyMs < 200:
|
||||
bucketKey = "50-200ms"
|
||||
atomic.AddInt64(&c.latencyBuckets.ms50_200, 1)
|
||||
case latencyMs < 1000:
|
||||
bucketKey = "200-1000ms"
|
||||
atomic.AddInt64(&c.latencyBuckets.ms200_1000, 1)
|
||||
default:
|
||||
bucketKey = "gt1s"
|
||||
}
|
||||
|
||||
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
|
||||
atomic.AddInt64(counter.(*int64), 1)
|
||||
} else {
|
||||
counter := new(int64)
|
||||
*counter = 1
|
||||
c.latencyBuckets.Store(bucketKey, counter)
|
||||
atomic.AddInt64(&c.latencyBuckets.gt1s, 1)
|
||||
}
|
||||
|
||||
// 记录引用来源
|
||||
@ -534,7 +649,7 @@ func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
|
||||
if referer != "" {
|
||||
var refererMetrics *models.PathMetrics
|
||||
if existingMetrics, ok := c.refererStats.Load(referer); ok {
|
||||
refererMetrics = existingMetrics.(*models.PathMetrics)
|
||||
refererMetrics = existingMetrics
|
||||
} else {
|
||||
refererMetrics = &models.PathMetrics{Path: referer}
|
||||
c.refererStats.Store(referer, refererMetrics)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"path/filepath"
|
||||
"proxy-go/internal/utils"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -140,22 +141,35 @@ func (ms *MetricsStorage) LoadMetrics() error {
|
||||
if err := loadJSONFromFile(ms.statusCodeFile, &statusCodeStats); err != nil {
|
||||
log.Printf("[MetricsStorage] 加载状态码统计失败: %v", err)
|
||||
} else {
|
||||
for statusCode, count := range statusCodeStats {
|
||||
countValue, ok := count.(float64)
|
||||
if !ok {
|
||||
// 由于新的 StatusCodeStats 结构,我们需要手动设置值
|
||||
loadedCount := 0
|
||||
for codeStr, countValue := range statusCodeStats {
|
||||
// 解析状态码
|
||||
if code, err := strconv.Atoi(codeStr); err == nil {
|
||||
// 解析计数值
|
||||
var count int64
|
||||
switch v := countValue.(type) {
|
||||
case float64:
|
||||
count = int64(v)
|
||||
case int64:
|
||||
count = v
|
||||
case int:
|
||||
count = int64(v)
|
||||
default:
|
||||
continue
|
||||
}
|
||||
|
||||
// 创建或更新状态码统计
|
||||
if counter, ok := ms.collector.statusCodeStats.Load(statusCode); ok {
|
||||
atomic.StoreInt64(counter.(*int64), int64(countValue))
|
||||
} else {
|
||||
counter := new(int64)
|
||||
*counter = int64(countValue)
|
||||
ms.collector.statusCodeStats.Store(statusCode, counter)
|
||||
// 手动设置到新的 StatusCodeStats 结构中
|
||||
ms.collector.statusCodeStats.mu.Lock()
|
||||
if _, exists := ms.collector.statusCodeStats.stats[code]; !exists {
|
||||
ms.collector.statusCodeStats.stats[code] = new(int64)
|
||||
}
|
||||
atomic.StoreInt64(ms.collector.statusCodeStats.stats[code], count)
|
||||
ms.collector.statusCodeStats.mu.Unlock()
|
||||
loadedCount++
|
||||
}
|
||||
}
|
||||
log.Printf("[MetricsStorage] 加载了 %d 条状态码统计", len(statusCodeStats))
|
||||
log.Printf("[MetricsStorage] 成功加载了 %d 条状态码统计", loadedCount)
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,14 +182,25 @@ func (ms *MetricsStorage) LoadMetrics() error {
|
||||
if err := loadJSONFromFile(latencyDistributionFile, &distribution); err != nil {
|
||||
log.Printf("[MetricsStorage] 加载延迟分布失败: %v", err)
|
||||
} else {
|
||||
// 由于新的 LatencyBuckets 结构,我们需要手动设置值
|
||||
for bucket, count := range distribution {
|
||||
countValue, ok := count.(float64)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if counter, ok := ms.collector.latencyBuckets.Load(bucket); ok {
|
||||
atomic.StoreInt64(counter.(*int64), int64(countValue))
|
||||
// 根据桶名称设置对应的值
|
||||
switch bucket {
|
||||
case "lt10ms":
|
||||
atomic.StoreInt64(&ms.collector.latencyBuckets.lt10ms, int64(countValue))
|
||||
case "10-50ms":
|
||||
atomic.StoreInt64(&ms.collector.latencyBuckets.ms10_50, int64(countValue))
|
||||
case "50-200ms":
|
||||
atomic.StoreInt64(&ms.collector.latencyBuckets.ms50_200, int64(countValue))
|
||||
case "200-1000ms":
|
||||
atomic.StoreInt64(&ms.collector.latencyBuckets.ms200_1000, int64(countValue))
|
||||
case "gt1s":
|
||||
atomic.StoreInt64(&ms.collector.latencyBuckets.gt1s, int64(countValue))
|
||||
}
|
||||
}
|
||||
log.Printf("[MetricsStorage] 加载了延迟分布数据")
|
||||
|
@ -67,11 +67,38 @@ func (rs *RuleService) SelectBestRule(client *http.Client, pathConfig config.Pat
|
||||
return nil, false, false
|
||||
}
|
||||
|
||||
// 获取文件大小
|
||||
// 检查是否需要获取文件大小
|
||||
// 如果所有匹配的规则都没有设置大小阈值(都是默认值),则跳过文件大小检查
|
||||
needSizeCheck := false
|
||||
for _, rule := range domainMatchingRules {
|
||||
if rule.SizeThreshold > 0 || rule.MaxSize < (1<<63-1) {
|
||||
needSizeCheck = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !needSizeCheck {
|
||||
// 不需要检查文件大小,直接使用第一个匹配的规则
|
||||
for _, rule := range domainMatchingRules {
|
||||
if utils.IsTargetAccessible(client, rule.Target+path) {
|
||||
log.Printf("[SelectRule] %s -> 选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
|
||||
return rule, true, true
|
||||
}
|
||||
}
|
||||
return nil, false, false
|
||||
}
|
||||
|
||||
// 获取文件大小(使用同步检查)
|
||||
contentLength, err := utils.GetFileSize(client, pathConfig.DefaultTarget+path)
|
||||
if err != nil {
|
||||
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,严格模式下不使用扩展名规则", path, err)
|
||||
// 严格模式:如果无法获取文件大小,不使用扩展名规则
|
||||
log.Printf("[SelectRule] %s -> 获取文件大小出错: %v,使用宽松模式回退", path, err)
|
||||
// 宽松模式:如果无法获取文件大小,尝试使用第一个匹配的规则
|
||||
for _, rule := range domainMatchingRules {
|
||||
if utils.IsTargetAccessible(client, rule.Target+path) {
|
||||
log.Printf("[SelectRule] %s -> 使用宽松模式选中规则 (域名: %s, 跳过大小检查)", path, requestHost)
|
||||
return rule, true, true
|
||||
}
|
||||
}
|
||||
return nil, false, false
|
||||
}
|
||||
|
||||
|
@ -10,76 +10,207 @@ import (
|
||||
neturl "net/url"
|
||||
"path/filepath"
|
||||
"proxy-go/internal/config"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 文件大小缓存项
|
||||
// Goroutine 池相关结构
|
||||
type GoroutinePool struct {
|
||||
maxWorkers int
|
||||
taskQueue chan func()
|
||||
wg sync.WaitGroup
|
||||
once sync.Once
|
||||
stopped int32
|
||||
}
|
||||
|
||||
// 全局 goroutine 池
|
||||
var (
|
||||
globalPool *GoroutinePool
|
||||
poolOnce sync.Once
|
||||
defaultWorkers = runtime.NumCPU() * 4 // 默认工作协程数量
|
||||
)
|
||||
|
||||
// GetGoroutinePool 获取全局 goroutine 池
|
||||
func GetGoroutinePool() *GoroutinePool {
|
||||
poolOnce.Do(func() {
|
||||
globalPool = NewGoroutinePool(defaultWorkers)
|
||||
})
|
||||
return globalPool
|
||||
}
|
||||
|
||||
// NewGoroutinePool 创建新的 goroutine 池
|
||||
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
|
||||
if maxWorkers <= 0 {
|
||||
maxWorkers = runtime.NumCPU() * 2
|
||||
}
|
||||
|
||||
pool := &GoroutinePool{
|
||||
maxWorkers: maxWorkers,
|
||||
taskQueue: make(chan func(), maxWorkers*10), // 缓冲区为工作协程数的10倍
|
||||
}
|
||||
|
||||
// 启动工作协程
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
pool.wg.Add(1)
|
||||
go pool.worker()
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// worker 工作协程
|
||||
func (p *GoroutinePool) worker() {
|
||||
defer p.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case task, ok := <-p.taskQueue:
|
||||
if !ok {
|
||||
return // 通道关闭,退出
|
||||
}
|
||||
|
||||
// 执行任务,捕获 panic
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
fmt.Printf("[GoroutinePool] Worker panic: %v\n", r)
|
||||
}
|
||||
}()
|
||||
task()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Submit 提交任务到池中
|
||||
func (p *GoroutinePool) Submit(task func()) error {
|
||||
if atomic.LoadInt32(&p.stopped) == 1 {
|
||||
return fmt.Errorf("goroutine pool is stopped")
|
||||
}
|
||||
|
||||
select {
|
||||
case p.taskQueue <- task:
|
||||
return nil
|
||||
case <-time.After(100 * time.Millisecond): // 100ms 超时
|
||||
return fmt.Errorf("goroutine pool is busy")
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitWithTimeout 提交任务到池中,带超时
|
||||
func (p *GoroutinePool) SubmitWithTimeout(task func(), timeout time.Duration) error {
|
||||
if atomic.LoadInt32(&p.stopped) == 1 {
|
||||
return fmt.Errorf("goroutine pool is stopped")
|
||||
}
|
||||
|
||||
select {
|
||||
case p.taskQueue <- task:
|
||||
return nil
|
||||
case <-time.After(timeout):
|
||||
return fmt.Errorf("goroutine pool submit timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// Stop 停止 goroutine 池
|
||||
func (p *GoroutinePool) Stop() {
|
||||
p.once.Do(func() {
|
||||
atomic.StoreInt32(&p.stopped, 1)
|
||||
close(p.taskQueue)
|
||||
p.wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Size 返回池中工作协程数量
|
||||
func (p *GoroutinePool) Size() int {
|
||||
return p.maxWorkers
|
||||
}
|
||||
|
||||
// QueueSize 返回当前任务队列大小
|
||||
func (p *GoroutinePool) QueueSize() int {
|
||||
return len(p.taskQueue)
|
||||
}
|
||||
|
||||
// 异步执行函数的包装器
|
||||
func GoSafe(fn func()) {
|
||||
pool := GetGoroutinePool()
|
||||
err := pool.Submit(fn)
|
||||
if err != nil {
|
||||
// 如果池满了,直接启动 goroutine(降级处理)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
fmt.Printf("[GoSafe] Panic: %v\n", r)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// 带超时的异步执行
|
||||
func GoSafeWithTimeout(fn func(), timeout time.Duration) error {
|
||||
pool := GetGoroutinePool()
|
||||
return pool.SubmitWithTimeout(fn, timeout)
|
||||
}
|
||||
|
||||
// 文件大小缓存相关
|
||||
type fileSizeCache struct {
|
||||
size int64
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// 可访问性缓存项
|
||||
type accessibilityCache struct {
|
||||
accessible bool
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// 全局缓存
|
||||
var (
|
||||
// 文件大小缓存,过期时间5分钟
|
||||
sizeCache sync.Map
|
||||
// 可访问性缓存,过期时间30秒
|
||||
accessCache sync.Map
|
||||
cacheTTL = 5 * time.Minute
|
||||
accessTTL = 30 * time.Second
|
||||
maxCacheSize = 10000 // 最大缓存条目数
|
||||
accessTTL = 2 * time.Minute
|
||||
)
|
||||
|
||||
// 清理过期缓存
|
||||
// 初始化函数
|
||||
func init() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
// 启动定期清理缓存的协程
|
||||
GoSafe(func() {
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
now := time.Now()
|
||||
// 清理文件大小缓存
|
||||
var items []struct {
|
||||
key interface{}
|
||||
timestamp time.Time
|
||||
cleanExpiredCache()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 清理过期缓存
|
||||
func cleanExpiredCache() {
|
||||
now := time.Now()
|
||||
|
||||
// 清理文件大小缓存
|
||||
sizeCache.Range(func(key, value interface{}) bool {
|
||||
cache := value.(fileSizeCache)
|
||||
if cache, ok := value.(fileSizeCache); ok {
|
||||
if now.Sub(cache.timestamp) > cacheTTL {
|
||||
sizeCache.Delete(key)
|
||||
} else {
|
||||
items = append(items, struct {
|
||||
key interface{}
|
||||
timestamp time.Time
|
||||
}{key, cache.timestamp})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(items) > maxCacheSize {
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].timestamp.Before(items[j].timestamp)
|
||||
})
|
||||
for i := 0; i < len(items)/2; i++ {
|
||||
sizeCache.Delete(items[i].key)
|
||||
}
|
||||
}
|
||||
|
||||
// 清理可访问性缓存
|
||||
accessCache.Range(func(key, value interface{}) bool {
|
||||
cache := value.(accessibilityCache)
|
||||
if cache, ok := value.(accessibilityCache); ok {
|
||||
if now.Sub(cache.timestamp) > accessTTL {
|
||||
accessCache.Delete(key)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// GenerateRequestID 生成唯一的请求ID
|
||||
@ -94,6 +225,9 @@ func GenerateRequestID() string {
|
||||
|
||||
// 获取请求来源
|
||||
func GetRequestSource(r *http.Request) string {
|
||||
if r == nil {
|
||||
return ""
|
||||
}
|
||||
referer := r.Header.Get("Referer")
|
||||
if referer != "" {
|
||||
return fmt.Sprintf(" (from: %s)", referer)
|
||||
@ -131,7 +265,7 @@ func IsImageRequest(path string) bool {
|
||||
return imageExts[ext]
|
||||
}
|
||||
|
||||
// GetFileSize 发送HEAD请求获取文件大小
|
||||
// GetFileSize 发送HEAD请求获取文件大小(保持向后兼容)
|
||||
func GetFileSize(client *http.Client, url string) (int64, error) {
|
||||
// 先查缓存
|
||||
if cache, ok := sizeCache.Load(url); ok {
|
||||
|
Loading…
x
Reference in New Issue
Block a user