添加异步请求指标收集功能,优化请求记录逻辑,支持批量更新状态码、字节数和延迟统计,提升性能和可扩展性。

This commit is contained in:
wood chen 2025-07-11 20:38:24 +08:00
parent ef2ab55fe6
commit 7e81e90113

View File

@ -37,6 +37,17 @@ type Collector struct {
config *config.Config
}
type RequestMetric struct {
Path string
Status int
Latency time.Duration
Bytes int64
ClientIP string
Request *http.Request
}
var requestChan chan RequestMetric
var (
instance *Collector
once sync.Once
@ -65,6 +76,10 @@ func InitCollector(cfg *config.Config) error {
instance.latencyBuckets.Store(bucket, counter)
}
// 初始化异步指标收集通道
requestChan = make(chan RequestMetric, 10000)
instance.startAsyncMetricsUpdater()
// 启动数据一致性检查器
instance.startConsistencyChecker()
@ -89,97 +104,22 @@ func (c *Collector) EndRequest() {
atomic.AddInt64(&c.activeRequests, -1)
}
// RecordRequest 记录请求
// RecordRequest 记录请求异步写入channel
func (c *Collector) RecordRequest(path string, status int, latency time.Duration, bytes int64, clientIP string, r *http.Request) {
// 更新状态码统计
statusKey := fmt.Sprintf("%d", status)
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.statusCodeStats.Store(statusKey, counter)
metric := RequestMetric{
Path: path,
Status: status,
Latency: latency,
Bytes: bytes,
ClientIP: clientIP,
Request: r,
}
// 更新总字节数和带宽统计
atomic.AddInt64(&c.totalBytes, bytes)
c.updateBandwidthStats(bytes)
// 更新延迟统计
atomic.AddInt64(&c.latencySum, int64(latency))
latencyNanos := int64(latency)
for {
oldMin := atomic.LoadInt64(&c.minLatency)
if oldMin <= latencyNanos {
break
}
if atomic.CompareAndSwapInt64(&c.minLatency, oldMin, latencyNanos) {
break
}
}
for {
oldMax := atomic.LoadInt64(&c.maxLatency)
if oldMax >= latencyNanos {
break
}
if atomic.CompareAndSwapInt64(&c.maxLatency, oldMax, latencyNanos) {
break
}
}
// 更新延迟分布
latencyMs := latency.Milliseconds()
var bucketKey string
switch {
case latencyMs < 10:
bucketKey = "lt10ms"
case latencyMs < 50:
bucketKey = "10-50ms"
case latencyMs < 200:
bucketKey = "50-200ms"
case latencyMs < 1000:
bucketKey = "200-1000ms"
select {
case requestChan <- metric:
// ok
default:
bucketKey = "gt1s"
// channel 满了,丢弃或降级处理
}
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.latencyBuckets.Store(bucketKey, counter)
}
// 记录引用来源
if referer := r.Referer(); referer != "" {
var refererMetrics *models.PathMetrics
if existingMetrics, ok := c.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
} else {
refererMetrics = &models.PathMetrics{Path: referer}
c.refererStats.Store(referer, refererMetrics)
}
refererMetrics.AddRequest()
if status >= 400 {
refererMetrics.AddError()
}
refererMetrics.AddBytes(bytes)
refererMetrics.AddLatency(latency.Nanoseconds())
// 更新最后访问时间
refererMetrics.LastAccessTime.Store(time.Now().Unix())
}
// 更新最近请求记录
c.recentRequests.Push(models.RequestLog{
Time: time.Now(),
Path: path,
Status: status,
Latency: int64(latency),
BytesSent: bytes,
ClientIP: clientIP,
})
}
// FormatUptime 格式化运行时间
@ -500,3 +440,125 @@ func (c *Collector) startCleanupTask() {
}
}()
}
// 异步批量处理请求指标
func (c *Collector) startAsyncMetricsUpdater() {
go func() {
batch := make([]RequestMetric, 0, 1000)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case metric := <-requestChan:
batch = append(batch, metric)
if len(batch) >= 1000 {
c.updateMetricsBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
c.updateMetricsBatch(batch)
batch = batch[:0]
}
}
}
}()
}
// 批量更新指标
func (c *Collector) updateMetricsBatch(batch []RequestMetric) {
for _, m := range batch {
// 更新状态码统计
statusKey := fmt.Sprintf("%d", m.Status)
if counter, ok := c.statusCodeStats.Load(statusKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.statusCodeStats.Store(statusKey, counter)
}
// 更新总字节数和带宽统计
atomic.AddInt64(&c.totalBytes, m.Bytes)
c.updateBandwidthStats(m.Bytes)
// 更新延迟统计
atomic.AddInt64(&c.latencySum, int64(m.Latency))
latencyNanos := int64(m.Latency)
for {
oldMin := atomic.LoadInt64(&c.minLatency)
if oldMin <= latencyNanos {
break
}
if atomic.CompareAndSwapInt64(&c.minLatency, oldMin, latencyNanos) {
break
}
}
for {
oldMax := atomic.LoadInt64(&c.maxLatency)
if oldMax >= latencyNanos {
break
}
if atomic.CompareAndSwapInt64(&c.maxLatency, oldMax, latencyNanos) {
break
}
}
// 更新延迟分布
latencyMs := m.Latency.Milliseconds()
var bucketKey string
switch {
case latencyMs < 10:
bucketKey = "lt10ms"
case latencyMs < 50:
bucketKey = "10-50ms"
case latencyMs < 200:
bucketKey = "50-200ms"
case latencyMs < 1000:
bucketKey = "200-1000ms"
default:
bucketKey = "gt1s"
}
if counter, ok := c.latencyBuckets.Load(bucketKey); ok {
atomic.AddInt64(counter.(*int64), 1)
} else {
counter := new(int64)
*counter = 1
c.latencyBuckets.Store(bucketKey, counter)
}
// 记录引用来源
if m.Request != nil {
referer := m.Request.Referer()
if referer != "" {
var refererMetrics *models.PathMetrics
if existingMetrics, ok := c.refererStats.Load(referer); ok {
refererMetrics = existingMetrics.(*models.PathMetrics)
} else {
refererMetrics = &models.PathMetrics{Path: referer}
c.refererStats.Store(referer, refererMetrics)
}
refererMetrics.AddRequest()
if m.Status >= 400 {
refererMetrics.AddError()
}
refererMetrics.AddBytes(m.Bytes)
refererMetrics.AddLatency(m.Latency.Nanoseconds())
// 更新最后访问时间
refererMetrics.LastAccessTime.Store(time.Now().Unix())
}
}
// 更新最近请求记录
c.recentRequests.Push(models.RequestLog{
Time: time.Now(),
Path: m.Path,
Status: m.Status,
Latency: int64(m.Latency),
BytesSent: m.Bytes,
ClientIP: m.ClientIP,
})
}
}