mirror of
https://github.com/woodchen-ink/random-api-go.git
synced 2025-07-18 05:42:01 +08:00
新增数据预加载功能,优化配置缓存逻辑,支持健康检查接口,调整兰空图床获取器的重试机制(指数退避),提升系统性能和稳定性。
This commit is contained in:
parent
26d214c44b
commit
c050955157
39
.github/workflows/docker-release.yml
vendored
Normal file
39
.github/workflows/docker-release.yml
vendored
Normal file
@ -0,0 +1,39 @@
|
||||
name: Docker Release
|
||||
|
||||
on:
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
env:
|
||||
IMAGE_NAME: random-api-go
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: woodchen
|
||||
password: ${{ secrets.ACCESS_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: Dockerfile
|
||||
push: true
|
||||
tags: woodchen/${{ env.IMAGE_NAME }}:${{ github.ref_name }}
|
||||
platforms: linux/amd64
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@ -4,8 +4,6 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
tags:
|
||||
- v*
|
||||
paths-ignore:
|
||||
- 'lankong_tools/**'
|
||||
- '*.md'
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"random-api-go/model"
|
||||
@ -16,6 +17,19 @@ import (
|
||||
|
||||
var DB *gorm.DB
|
||||
|
||||
// 配置缓存
|
||||
var (
|
||||
configCache = make(map[string]*CachedConfig)
|
||||
configCacheMutex sync.RWMutex
|
||||
)
|
||||
|
||||
// CachedConfig 缓存的配置项
|
||||
type CachedConfig struct {
|
||||
Value string
|
||||
CachedAt time.Time
|
||||
CacheTTL time.Duration // 缓存生存时间
|
||||
}
|
||||
|
||||
// Initialize 初始化数据库
|
||||
func Initialize(dataDir string) error {
|
||||
// 确保数据目录存在
|
||||
@ -78,19 +92,57 @@ func Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfig 获取配置值
|
||||
// GetConfig 获取配置值(带缓存)
|
||||
func GetConfig(key string, defaultValue string) string {
|
||||
// 先检查缓存
|
||||
configCacheMutex.RLock()
|
||||
cached, exists := configCache[key]
|
||||
configCacheMutex.RUnlock()
|
||||
|
||||
// 如果缓存存在且未过期,直接返回
|
||||
if exists && time.Since(cached.CachedAt) < cached.CacheTTL {
|
||||
return cached.Value
|
||||
}
|
||||
|
||||
// 从数据库查询
|
||||
var config model.Config
|
||||
if err := DB.Where("key = ?", key).First(&config).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 缓存默认值(短时间缓存,避免频繁查询不存在的配置)
|
||||
cacheConfig(key, defaultValue, 5*time.Minute)
|
||||
return defaultValue
|
||||
}
|
||||
log.Printf("Failed to get config %s: %v", key, err)
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
// 缓存查询结果
|
||||
var cacheTTL time.Duration
|
||||
switch key {
|
||||
case "homepage_content":
|
||||
cacheTTL = 10 * time.Minute // 首页内容缓存10分钟
|
||||
case "oauth_client_id", "oauth_client_secret", "oauth_redirect_uri":
|
||||
cacheTTL = 30 * time.Minute // OAuth配置缓存30分钟
|
||||
default:
|
||||
cacheTTL = 5 * time.Minute // 其他配置缓存5分钟
|
||||
}
|
||||
|
||||
cacheConfig(key, config.Value, cacheTTL)
|
||||
return config.Value
|
||||
}
|
||||
|
||||
// cacheConfig 缓存配置值
|
||||
func cacheConfig(key, value string, ttl time.Duration) {
|
||||
configCacheMutex.Lock()
|
||||
defer configCacheMutex.Unlock()
|
||||
|
||||
configCache[key] = &CachedConfig{
|
||||
Value: value,
|
||||
CachedAt: time.Now(),
|
||||
CacheTTL: ttl,
|
||||
}
|
||||
}
|
||||
|
||||
// SetConfig 设置配置值
|
||||
func SetConfig(key, value, configType string) error {
|
||||
var config model.Config
|
||||
@ -103,15 +155,31 @@ func SetConfig(key, value, configType string) error {
|
||||
Value: value,
|
||||
Type: configType,
|
||||
}
|
||||
return DB.Create(&config).Error
|
||||
if err := DB.Create(&config).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else {
|
||||
// 更新现有配置
|
||||
config.Value = value
|
||||
config.Type = configType
|
||||
if err := DB.Save(&config).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 更新现有配置
|
||||
config.Value = value
|
||||
config.Type = configType
|
||||
return DB.Save(&config).Error
|
||||
// 清理缓存
|
||||
invalidateConfigCache(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// invalidateConfigCache 清理指定配置的缓存
|
||||
func invalidateConfigCache(key string) {
|
||||
configCacheMutex.Lock()
|
||||
defer configCacheMutex.Unlock()
|
||||
delete(configCache, key)
|
||||
log.Printf("已清理配置 %s 的缓存", key)
|
||||
}
|
||||
|
||||
// ListConfigs 列出所有配置
|
||||
@ -123,5 +191,47 @@ func ListConfigs() ([]model.Config, error) {
|
||||
|
||||
// DeleteConfig 删除配置
|
||||
func DeleteConfig(key string) error {
|
||||
return DB.Where("key = ?", key).Delete(&model.Config{}).Error
|
||||
err := DB.Where("key = ?", key).Delete(&model.Config{}).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 清理缓存
|
||||
invalidateConfigCache(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfigCacheStats 获取配置缓存统计信息
|
||||
func GetConfigCacheStats() map[string]interface{} {
|
||||
configCacheMutex.RLock()
|
||||
defer configCacheMutex.RUnlock()
|
||||
|
||||
stats := make(map[string]interface{})
|
||||
stats["total_cached"] = len(configCache)
|
||||
|
||||
// 统计各种状态的缓存
|
||||
var validCount, expiredCount int
|
||||
cacheDetails := make(map[string]interface{})
|
||||
|
||||
for key, cached := range configCache {
|
||||
isExpired := time.Since(cached.CachedAt) >= cached.CacheTTL
|
||||
if isExpired {
|
||||
expiredCount++
|
||||
} else {
|
||||
validCount++
|
||||
}
|
||||
|
||||
cacheDetails[key] = map[string]interface{}{
|
||||
"cached_at": cached.CachedAt.Format("2006-01-02 15:04:05"),
|
||||
"ttl_seconds": int(cached.CacheTTL.Seconds()),
|
||||
"expired": isExpired,
|
||||
"value_length": len(cached.Value),
|
||||
}
|
||||
}
|
||||
|
||||
stats["valid_count"] = validCount
|
||||
stats["expired_count"] = expiredCount
|
||||
stats["details"] = cacheDetails
|
||||
|
||||
return stats
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"random-api-go/database"
|
||||
"random-api-go/initapp"
|
||||
"random-api-go/monitoring"
|
||||
"random-api-go/service"
|
||||
"random-api-go/stats"
|
||||
@ -34,7 +35,7 @@ type Handlers struct {
|
||||
func NewHandlers(statsManager *stats.StatsManager) *Handlers {
|
||||
return &Handlers{
|
||||
Stats: statsManager,
|
||||
cacheDuration: 30 * time.Minute, // 缓存30分钟
|
||||
cacheDuration: 5 * time.Minute, // 缓存5分钟,减少首次访问等待时间
|
||||
}
|
||||
}
|
||||
|
||||
@ -292,3 +293,27 @@ func (h *Handlers) HandlePublicHomeConfig(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// HandleHealth 处理健康检查请求
|
||||
func (h *Handlers) HandleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
// 获取初始化状态
|
||||
initStatus := initapp.GetInitStatus()
|
||||
|
||||
response := map[string]interface{}{
|
||||
"status": "healthy",
|
||||
"timestamp": time.Now().Unix(),
|
||||
"init": initStatus,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(response); err != nil {
|
||||
http.Error(w, "Error encoding response", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
247
initapp/init.go
Normal file
247
initapp/init.go
Normal file
@ -0,0 +1,247 @@
|
||||
package initapp
|
||||
|
||||
import (
|
||||
"log"
|
||||
"random-api-go/database"
|
||||
"random-api-go/model"
|
||||
"random-api-go/service"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// InitData 初始化应用数据,预加载所有需要的数据到内存中
|
||||
func InitData() error {
|
||||
log.Println("开始初始化应用数据...")
|
||||
start := time.Now()
|
||||
|
||||
// 1. 初始化端点服务(这会启动预加载器)
|
||||
endpointService := service.GetEndpointService()
|
||||
log.Println("✓ 端点服务已初始化")
|
||||
|
||||
// 2. 暂停预加载器的定期刷新,避免与初始化冲突
|
||||
preloader := endpointService.GetPreloader()
|
||||
preloader.PausePeriodicRefresh()
|
||||
log.Println("✓ 已暂停预加载器定期刷新")
|
||||
|
||||
// 3. 获取所有活跃的端点和数据源
|
||||
endpoints, err := endpointService.ListEndpoints()
|
||||
if err != nil {
|
||||
log.Printf("获取端点列表失败: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 4. 统计需要预加载的数据源
|
||||
var activeDataSources []model.DataSource
|
||||
for _, endpoint := range endpoints {
|
||||
if !endpoint.IsActive {
|
||||
continue
|
||||
}
|
||||
for _, ds := range endpoint.DataSources {
|
||||
if ds.IsActive && ds.Type != "api_get" && ds.Type != "api_post" {
|
||||
// API类型的数据源不需要预加载,使用实时请求
|
||||
activeDataSources = append(activeDataSources, ds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("发现 %d 个端点,%d 个需要预加载的数据源", len(endpoints), len(activeDataSources))
|
||||
|
||||
if len(activeDataSources) == 0 {
|
||||
log.Println("✓ 没有需要预加载的数据源")
|
||||
// 恢复预加载器定期刷新
|
||||
preloader.ResumePeriodicRefresh()
|
||||
log.Printf("应用数据初始化完成,耗时: %v", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
// 5. 并发预加载所有数据源
|
||||
var wg sync.WaitGroup
|
||||
var successCount, failCount int
|
||||
var mutex sync.Mutex
|
||||
|
||||
// 限制并发数,避免过多的并发请求
|
||||
semaphore := make(chan struct{}, 5) // 最多5个并发
|
||||
|
||||
for _, ds := range activeDataSources {
|
||||
wg.Add(1)
|
||||
go func(dataSource model.DataSource) {
|
||||
defer wg.Done()
|
||||
|
||||
// 获取信号量
|
||||
semaphore <- struct{}{}
|
||||
defer func() { <-semaphore }()
|
||||
|
||||
log.Printf("预加载数据源: %s (ID: %d, 类型: %s)", dataSource.Name, dataSource.ID, dataSource.Type)
|
||||
|
||||
// 使用预加载器预加载数据源
|
||||
if err := preloader.RefreshDataSource(dataSource.ID); err != nil {
|
||||
log.Printf("预加载数据源 %d 失败: %v", dataSource.ID, err)
|
||||
mutex.Lock()
|
||||
failCount++
|
||||
mutex.Unlock()
|
||||
} else {
|
||||
log.Printf("预加载数据源 %d 成功", dataSource.ID)
|
||||
mutex.Lock()
|
||||
successCount++
|
||||
mutex.Unlock()
|
||||
}
|
||||
}(ds)
|
||||
}
|
||||
|
||||
// 等待所有预加载完成
|
||||
wg.Wait()
|
||||
|
||||
log.Printf("✓ 数据源预加载完成: 成功 %d 个,失败 %d 个", successCount, failCount)
|
||||
|
||||
// 6. 预热URL统计缓存
|
||||
log.Println("预热URL统计缓存...")
|
||||
if err := preloadURLStats(endpointService, endpoints); err != nil {
|
||||
log.Printf("预热URL统计缓存失败: %v", err)
|
||||
} else {
|
||||
log.Println("✓ URL统计缓存预热完成")
|
||||
}
|
||||
|
||||
// 7. 预加载配置
|
||||
log.Println("预加载系统配置...")
|
||||
preloadConfigs()
|
||||
log.Println("✓ 系统配置预加载完成")
|
||||
|
||||
// 8. 恢复预加载器定期刷新
|
||||
preloader.ResumePeriodicRefresh()
|
||||
log.Println("✓ 已恢复预加载器定期刷新")
|
||||
|
||||
duration := time.Since(start)
|
||||
log.Printf("🎉 应用数据初始化完成,总耗时: %v", duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// preloadURLStats 预热URL统计缓存
|
||||
func preloadURLStats(endpointService *service.EndpointService, endpoints []*model.APIEndpoint) error {
|
||||
urlStats := make(map[string]struct {
|
||||
TotalURLs int `json:"total_urls"`
|
||||
})
|
||||
|
||||
for _, endpoint := range endpoints {
|
||||
if !endpoint.IsActive {
|
||||
continue
|
||||
}
|
||||
|
||||
totalURLs := 0
|
||||
for _, ds := range endpoint.DataSources {
|
||||
if ds.IsActive {
|
||||
// 使用优化后的URL计数方法
|
||||
count, err := endpointService.GetDataSourceURLCount(&ds)
|
||||
if err != nil {
|
||||
log.Printf("获取数据源 %d URL数量失败: %v", ds.ID, err)
|
||||
// 使用估算值
|
||||
switch ds.Type {
|
||||
case "manual":
|
||||
totalURLs += 5
|
||||
case "lankong":
|
||||
totalURLs += 100
|
||||
case "api_get", "api_post":
|
||||
totalURLs += 1
|
||||
default:
|
||||
totalURLs += 1
|
||||
}
|
||||
} else {
|
||||
totalURLs += count
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
urlStats[endpoint.URL] = struct {
|
||||
TotalURLs int `json:"total_urls"`
|
||||
}{
|
||||
TotalURLs: totalURLs,
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("预热了 %d 个端点的URL统计", len(urlStats))
|
||||
return nil
|
||||
}
|
||||
|
||||
// preloadConfigs 预加载系统配置
|
||||
func preloadConfigs() {
|
||||
// 预加载常用配置,触发数据库查询并缓存结果
|
||||
configs := []string{
|
||||
// 基础配置
|
||||
"homepage_content",
|
||||
|
||||
// OAuth配置
|
||||
"oauth_client_id",
|
||||
"oauth_client_secret",
|
||||
"oauth_redirect_uri",
|
||||
|
||||
// 系统配置
|
||||
"site_title",
|
||||
"site_description",
|
||||
"admin_email",
|
||||
"max_file_size",
|
||||
"allowed_file_types",
|
||||
|
||||
// API配置
|
||||
"rate_limit_enabled",
|
||||
"rate_limit_requests",
|
||||
"rate_limit_window",
|
||||
"cors_enabled",
|
||||
"cors_origins",
|
||||
|
||||
// 兰空图床配置
|
||||
"lankong_max_retries",
|
||||
|
||||
// 缓存配置
|
||||
"cache_enabled",
|
||||
"cache_ttl",
|
||||
"max_cache_size",
|
||||
|
||||
// 日志配置
|
||||
"log_level",
|
||||
"log_file_enabled",
|
||||
"log_retention_days",
|
||||
}
|
||||
|
||||
var loadedCount int
|
||||
for _, key := range configs {
|
||||
value := database.GetConfig(key, "")
|
||||
if value != "" {
|
||||
log.Printf("预加载配置 %s: %d 字符", key, len(value))
|
||||
loadedCount++
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("预加载了 %d 个配置项", loadedCount)
|
||||
}
|
||||
|
||||
// GetInitStatus 获取初始化状态(可用于健康检查)
|
||||
func GetInitStatus() map[string]interface{} {
|
||||
endpointService := service.GetEndpointService()
|
||||
|
||||
// 获取缓存统计
|
||||
cacheStats := endpointService.GetCacheManager().GetCacheStats()
|
||||
|
||||
// 获取端点数量
|
||||
endpoints, _ := endpointService.ListEndpoints()
|
||||
activeEndpoints := 0
|
||||
for _, ep := range endpoints {
|
||||
if ep.IsActive {
|
||||
activeEndpoints++
|
||||
}
|
||||
}
|
||||
|
||||
// 获取配置缓存统计
|
||||
configCacheStats := database.GetConfigCacheStats()
|
||||
|
||||
return map[string]interface{}{
|
||||
"data_cache": map[string]interface{}{
|
||||
"items": len(cacheStats),
|
||||
"details": cacheStats,
|
||||
},
|
||||
"config_cache": configCacheStats,
|
||||
"endpoints": map[string]interface{}{
|
||||
"total": len(endpoints),
|
||||
"active": activeEndpoints,
|
||||
},
|
||||
}
|
||||
}
|
8
main.go
8
main.go
@ -12,6 +12,7 @@ import (
|
||||
"random-api-go/config"
|
||||
"random-api-go/database"
|
||||
"random-api-go/handler"
|
||||
"random-api-go/initapp"
|
||||
"random-api-go/logging"
|
||||
"random-api-go/router"
|
||||
"random-api-go/service"
|
||||
@ -64,6 +65,13 @@ func (a *App) Initialize() error {
|
||||
// 初始化端点服务
|
||||
service.GetEndpointService()
|
||||
|
||||
// 预加载所有数据到内存
|
||||
log.Println("开始预加载应用数据...")
|
||||
if err := initapp.InitData(); err != nil {
|
||||
log.Printf("预加载数据失败: %v", err)
|
||||
// 不返回错误,允许应用继续启动
|
||||
}
|
||||
|
||||
// 创建管理后台处理器
|
||||
a.adminHandler = handler.NewAdminHandler()
|
||||
|
||||
|
@ -20,6 +20,8 @@ type Handler interface {
|
||||
HandleStats(w http.ResponseWriter, r *http.Request)
|
||||
HandleURLStats(w http.ResponseWriter, r *http.Request)
|
||||
HandleMetrics(w http.ResponseWriter, r *http.Request)
|
||||
// 健康检查
|
||||
HandleHealth(w http.ResponseWriter, r *http.Request)
|
||||
// 公开端点
|
||||
HandlePublicEndpoints(w http.ResponseWriter, r *http.Request)
|
||||
// 公开首页配置
|
||||
@ -78,6 +80,7 @@ func (r *Router) SetupAllRoutes(handler Handler, adminHandler AdminHandler, stat
|
||||
r.HandleFunc("/api/stats", handler.HandleStats)
|
||||
r.HandleFunc("/api/urlstats", handler.HandleURLStats)
|
||||
r.HandleFunc("/api/metrics", handler.HandleMetrics)
|
||||
r.HandleFunc("/api/health", handler.HandleHealth)
|
||||
r.HandleFunc("/api/endpoints", handler.HandlePublicEndpoints)
|
||||
r.HandleFunc("/api/home-config", handler.HandlePublicHomeConfig)
|
||||
|
||||
|
@ -4,7 +4,9 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"random-api-go/database"
|
||||
"random-api-go/model"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@ -18,13 +20,43 @@ type DataSourceFetcher struct {
|
||||
|
||||
// NewDataSourceFetcher 创建数据源获取器
|
||||
func NewDataSourceFetcher(cacheManager *CacheManager) *DataSourceFetcher {
|
||||
// 从配置中获取兰空图床最大重试次数
|
||||
maxRetries := getIntConfig("lankong_max_retries", 7)
|
||||
|
||||
var lankongFetcher *LankongFetcher
|
||||
if maxRetries > 0 {
|
||||
// 使用自定义配置
|
||||
lankongFetcher = NewLankongFetcherWithConfig(maxRetries)
|
||||
log.Printf("兰空图床获取器配置: 最大重试%d次", maxRetries)
|
||||
} else {
|
||||
// 使用默认配置
|
||||
lankongFetcher = NewLankongFetcher()
|
||||
log.Printf("兰空图床获取器使用默认配置")
|
||||
}
|
||||
|
||||
return &DataSourceFetcher{
|
||||
cacheManager: cacheManager,
|
||||
lankongFetcher: NewLankongFetcher(),
|
||||
lankongFetcher: lankongFetcher,
|
||||
apiFetcher: NewAPIFetcher(),
|
||||
}
|
||||
}
|
||||
|
||||
// getIntConfig 获取整数配置,如果不存在或无效则返回默认值
|
||||
func getIntConfig(key string, defaultValue int) int {
|
||||
configStr := database.GetConfig(key, "")
|
||||
if configStr == "" {
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
value, err := strconv.Atoi(configStr)
|
||||
if err != nil {
|
||||
log.Printf("配置 %s 的值 '%s' 不是有效整数,使用默认值 %d", key, configStr, defaultValue)
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
// FetchURLs 从数据源获取URL列表
|
||||
func (dsf *DataSourceFetcher) FetchURLs(dataSource *model.DataSource) ([]string, error) {
|
||||
// API类型的数据源直接实时请求,不使用缓存
|
||||
|
@ -355,6 +355,11 @@ func (s *EndpointService) GetPreloader() *Preloader {
|
||||
return s.preloader
|
||||
}
|
||||
|
||||
// GetCacheManager 获取缓存管理器(用于外部控制)
|
||||
func (s *EndpointService) GetCacheManager() *CacheManager {
|
||||
return s.cacheManager
|
||||
}
|
||||
|
||||
// GetDataSourceURLCount 获取数据源的URL数量
|
||||
func (s *EndpointService) GetDataSourceURLCount(dataSource *model.DataSource) (int, error) {
|
||||
// 对于API类型和端点类型的数据源,返回1(因为每次都是实时请求)
|
||||
@ -362,11 +367,25 @@ func (s *EndpointService) GetDataSourceURLCount(dataSource *model.DataSource) (i
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
// 对于其他类型的数据源,尝试获取实际的URL数量
|
||||
urls, err := s.dataSourceFetcher.FetchURLs(dataSource)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
// 优先从内存缓存获取URL数量,避免触发耗时的网络请求
|
||||
cacheKey := fmt.Sprintf("datasource_%d", dataSource.ID)
|
||||
if cachedURLs, exists := s.cacheManager.GetFromMemoryCache(cacheKey); exists {
|
||||
return len(cachedURLs), nil
|
||||
}
|
||||
|
||||
return len(urls), nil
|
||||
// 如果缓存中没有数据,返回估算值,避免在统计时触发耗时操作
|
||||
switch dataSource.Type {
|
||||
case "manual":
|
||||
// 手动数据源可以快速解析配置获取数量
|
||||
urls, err := s.dataSourceFetcher.FetchURLs(dataSource)
|
||||
if err != nil {
|
||||
return 0, nil // 返回0而不是错误,避免影响整体统计
|
||||
}
|
||||
return len(urls), nil
|
||||
case "lankong":
|
||||
// 兰空图床如果没有缓存,返回估算值
|
||||
return 100, nil // 估算值
|
||||
default:
|
||||
return 0, nil
|
||||
}
|
||||
}
|
||||
|
@ -12,14 +12,38 @@ import (
|
||||
|
||||
// LankongFetcher 兰空图床获取器
|
||||
type LankongFetcher struct {
|
||||
client *http.Client
|
||||
client *http.Client
|
||||
retryConfig *RetryConfig
|
||||
}
|
||||
|
||||
// RetryConfig 重试配置
|
||||
type RetryConfig struct {
|
||||
MaxRetries int // 最大重试次数
|
||||
BaseDelay time.Duration // 基础延迟
|
||||
}
|
||||
|
||||
// NewLankongFetcher 创建兰空图床获取器
|
||||
func NewLankongFetcher() *LankongFetcher {
|
||||
return &LankongFetcher{
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: 60 * time.Second, // 增加超时时间
|
||||
},
|
||||
retryConfig: &RetryConfig{
|
||||
MaxRetries: 7, // 最多重试7次 (0秒/15秒/15秒/30秒/30秒/60秒/60秒/180秒)
|
||||
BaseDelay: 1 * time.Second, // 基础延迟(实际不使用,使用固定延迟序列)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewLankongFetcherWithConfig 创建带自定义配置的兰空图床获取器
|
||||
func NewLankongFetcherWithConfig(maxRetries int) *LankongFetcher {
|
||||
return &LankongFetcher{
|
||||
client: &http.Client{
|
||||
Timeout: 60 * time.Second,
|
||||
},
|
||||
retryConfig: &RetryConfig{
|
||||
MaxRetries: maxRetries,
|
||||
BaseDelay: 1 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -52,7 +76,7 @@ func (lf *LankongFetcher) FetchURLs(config *model.LankongConfig) ([]string, erro
|
||||
|
||||
// 获取第一页以确定总页数
|
||||
firstPageURL := fmt.Sprintf("%s?album_id=%s&page=1", baseURL, albumID)
|
||||
response, err := lf.fetchPage(firstPageURL, config.APIToken)
|
||||
response, err := lf.fetchPageWithRetry(firstPageURL, config.APIToken)
|
||||
if err != nil {
|
||||
log.Printf("Failed to fetch first page for album %s: %v", albumID, err)
|
||||
continue
|
||||
@ -62,9 +86,10 @@ func (lf *LankongFetcher) FetchURLs(config *model.LankongConfig) ([]string, erro
|
||||
log.Printf("相册 %s 共有 %d 页", albumID, totalPages)
|
||||
|
||||
// 处理所有页面
|
||||
albumURLs := []string{}
|
||||
for page := 1; page <= totalPages; page++ {
|
||||
reqURL := fmt.Sprintf("%s?album_id=%s&page=%d", baseURL, albumID, page)
|
||||
pageResponse, err := lf.fetchPage(reqURL, config.APIToken)
|
||||
pageResponse, err := lf.fetchPageWithRetry(reqURL, config.APIToken)
|
||||
if err != nil {
|
||||
log.Printf("Failed to fetch page %d for album %s: %v", page, albumID, err)
|
||||
continue
|
||||
@ -72,22 +97,90 @@ func (lf *LankongFetcher) FetchURLs(config *model.LankongConfig) ([]string, erro
|
||||
|
||||
for _, item := range pageResponse.Data.Data {
|
||||
if item.Links.URL != "" {
|
||||
allURLs = append(allURLs, item.Links.URL)
|
||||
albumURLs = append(albumURLs, item.Links.URL)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加小延迟避免请求过快
|
||||
if page < totalPages {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// 进度日志
|
||||
if page%10 == 0 || page == totalPages {
|
||||
log.Printf("相册 %s: 已处理 %d/%d 页,收集到 %d 个URL", albumID, page, totalPages, len(albumURLs))
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("完成相册 %s: 收集到 %d 个URL", albumID, len(allURLs))
|
||||
allURLs = append(allURLs, albumURLs...)
|
||||
log.Printf("完成相册 %s: 收集到 %d 个URL", albumID, len(albumURLs))
|
||||
}
|
||||
|
||||
return allURLs, nil
|
||||
}
|
||||
|
||||
// fetchPageWithRetry 带重试的页面获取
|
||||
func (lf *LankongFetcher) fetchPageWithRetry(url string, apiToken string) (*LankongResponse, error) {
|
||||
var lastErr error
|
||||
|
||||
for attempt := 0; attempt <= lf.retryConfig.MaxRetries; attempt++ {
|
||||
response, err := lf.fetchPage(url, apiToken)
|
||||
if err == nil {
|
||||
return response, nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// 如果是最后一次尝试,不再重试
|
||||
if attempt == lf.retryConfig.MaxRetries {
|
||||
break
|
||||
}
|
||||
|
||||
// 计算延迟时间
|
||||
var delay time.Duration
|
||||
if isRateLimitError(err) {
|
||||
// 对于429错误,使用固定的延迟序列
|
||||
delay = getRateLimitDelay(attempt)
|
||||
log.Printf("遇到频率限制 (尝试 %d/%d): %v,等待 %v 后重试", attempt+1, lf.retryConfig.MaxRetries+1, err, delay)
|
||||
} else {
|
||||
// 其他错误使用较短的延迟
|
||||
delay = time.Duration(attempt+1) * time.Second
|
||||
log.Printf("请求失败 (尝试 %d/%d): %v,%v 后重试", attempt+1, lf.retryConfig.MaxRetries+1, err, delay)
|
||||
}
|
||||
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("重试 %d 次后仍然失败: %v", lf.retryConfig.MaxRetries, lastErr)
|
||||
}
|
||||
|
||||
// getRateLimitDelay 获取频率限制的延迟时间
|
||||
// 延迟序列:0秒 / 15秒 / 15秒 / 30秒 / 30秒 / 60秒 / 60秒 / 180秒
|
||||
func getRateLimitDelay(attempt int) time.Duration {
|
||||
delaySequence := []time.Duration{
|
||||
0 * time.Second, // 第1次重试:立即
|
||||
15 * time.Second, // 第2次重试:15秒后
|
||||
15 * time.Second, // 第3次重试:15秒后
|
||||
30 * time.Second, // 第4次重试:30秒后
|
||||
30 * time.Second, // 第5次重试:30秒后
|
||||
60 * time.Second, // 第6次重试:60秒后
|
||||
60 * time.Second, // 第7次重试:60秒后
|
||||
180 * time.Second, // 第8次重试:180秒后
|
||||
}
|
||||
|
||||
if attempt < len(delaySequence) {
|
||||
return delaySequence[attempt]
|
||||
}
|
||||
|
||||
// 如果超出序列长度,使用最后一个值
|
||||
return delaySequence[len(delaySequence)-1]
|
||||
}
|
||||
|
||||
// isRateLimitError 检查是否是频率限制错误
|
||||
func isRateLimitError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
errStr := err.Error()
|
||||
return fmt.Sprintf("%v", err) == "rate limit exceeded (429), need to slow down requests" ||
|
||||
fmt.Sprintf("%v", errStr) == "API returned status code: 429"
|
||||
}
|
||||
|
||||
// fetchPage 获取兰空图床单页数据
|
||||
func (lf *LankongFetcher) fetchPage(url string, apiToken string) (*LankongResponse, error) {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
@ -104,6 +197,11 @@ func (lf *LankongFetcher) fetchPage(url string, apiToken string) (*LankongRespon
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 特殊处理429错误
|
||||
if resp.StatusCode == 429 {
|
||||
return nil, fmt.Errorf("rate limit exceeded (429), need to slow down requests")
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("API returned status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ type Preloader struct {
|
||||
dataSourceFetcher *DataSourceFetcher
|
||||
cacheManager *CacheManager
|
||||
running bool
|
||||
paused bool
|
||||
stopChan chan struct{}
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
@ -55,6 +56,22 @@ func (p *Preloader) Stop() {
|
||||
log.Println("预加载器已停止")
|
||||
}
|
||||
|
||||
// PausePeriodicRefresh 暂停定期刷新
|
||||
func (p *Preloader) PausePeriodicRefresh() {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
p.paused = true
|
||||
log.Println("预加载器定期刷新已暂停")
|
||||
}
|
||||
|
||||
// ResumePeriodicRefresh 恢复定期刷新
|
||||
func (p *Preloader) ResumePeriodicRefresh() {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
p.paused = false
|
||||
log.Println("预加载器定期刷新已恢复")
|
||||
}
|
||||
|
||||
// PreloadDataSourceOnSave 在保存数据源时预加载数据
|
||||
func (p *Preloader) PreloadDataSourceOnSave(dataSource *model.DataSource) {
|
||||
// API类型的数据源不需要预加载,使用实时请求
|
||||
@ -185,6 +202,16 @@ func (p *Preloader) runPeriodicRefresh() {
|
||||
|
||||
// checkAndRefreshExpiredData 检查并刷新过期数据
|
||||
func (p *Preloader) checkAndRefreshExpiredData() {
|
||||
// 检查是否暂停
|
||||
p.mutex.RLock()
|
||||
isPaused := p.paused
|
||||
p.mutex.RUnlock()
|
||||
|
||||
if isPaused {
|
||||
log.Println("预加载器定期刷新已暂停,跳过此次检查")
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("开始检查过期数据...")
|
||||
|
||||
// 获取所有活跃的数据源
|
||||
|
Loading…
x
Reference in New Issue
Block a user