新增FetchURLsWithOptions方法以支持跳过缓存的URL获取,并更新RefreshDataSource方法以实现强制刷新数据源的功能。同时,调整Preloader中的数据源刷新逻辑以使用新方法。

This commit is contained in:
wood chen 2025-06-18 17:17:44 +08:00
parent 1949c8e9f3
commit 32b1cd94ff
2 changed files with 32 additions and 10 deletions

View File

@ -61,6 +61,11 @@ func getIntConfig(key string, defaultValue int) int {
// FetchURLs 从数据源获取URL列表
func (dsf *DataSourceFetcher) FetchURLs(dataSource *model.DataSource) ([]string, error) {
return dsf.FetchURLsWithOptions(dataSource, false)
}
// FetchURLsWithOptions 从数据源获取URL列表支持跳过缓存选项
func (dsf *DataSourceFetcher) FetchURLsWithOptions(dataSource *model.DataSource, skipCache bool) ([]string, error) {
// API类型的数据源直接实时请求不使用缓存
if dataSource.Type == "api_get" || dataSource.Type == "api_post" {
log.Printf("实时请求API数据源 (类型: %s, ID: %d)", dataSource.Type, dataSource.ID)
@ -70,10 +75,14 @@ func (dsf *DataSourceFetcher) FetchURLs(dataSource *model.DataSource) ([]string,
// 构建内存缓存的key使用数据源ID
cacheKey := fmt.Sprintf("datasource_%d", dataSource.ID)
// 先检查内存缓存
if cachedURLs, exists := dsf.cacheManager.GetFromMemoryCache(cacheKey); exists && len(cachedURLs) > 0 {
log.Printf("从内存缓存获取到 %d 个URL (数据源ID: %d)", len(cachedURLs), dataSource.ID)
return cachedURLs, nil
// 如果不跳过缓存,先检查内存缓存
if !skipCache {
if cachedURLs, exists := dsf.cacheManager.GetFromMemoryCache(cacheKey); exists && len(cachedURLs) > 0 {
log.Printf("从内存缓存获取到 %d 个URL (数据源ID: %d)", len(cachedURLs), dataSource.ID)
return cachedURLs, nil
}
} else {
log.Printf("跳过缓存,强制从数据源获取最新数据 (数据源ID: %d)", dataSource.ID)
}
var urls []string
@ -196,10 +205,10 @@ func (dsf *DataSourceFetcher) fetchS3URLs(dataSource *model.DataSource) ([]strin
// updateDataSourceSyncTime 更新数据源的同步时间
func (dsf *DataSourceFetcher) updateDataSourceSyncTime(dataSource *model.DataSource) error {
// 这里需要导入database包来更新数据库
// 为了避免循环依赖,我们通过回调或者接口来处理
// 暂时先记录日志,具体实现在主服务中处理
log.Printf("需要更新数据源 %d 的同步时间", dataSource.ID)
if err := database.DB.Model(dataSource).Update("last_sync", dataSource.LastSync).Error; err != nil {
return fmt.Errorf("failed to update sync time for data source %d: %w", dataSource.ID, err)
}
log.Printf("更新数据源 %d 的同步时间", dataSource.ID)
return nil
}
@ -215,3 +224,16 @@ func (dsf *DataSourceFetcher) PreloadDataSource(dataSource *model.DataSource) er
log.Printf("数据源 %d 预加载完成", dataSource.ID)
return nil
}
// RefreshDataSource 强制刷新数据源(跳过缓存)
func (dsf *DataSourceFetcher) RefreshDataSource(dataSource *model.DataSource) error {
log.Printf("开始强制刷新数据源 (类型: %s, ID: %d)", dataSource.Type, dataSource.ID)
_, err := dsf.FetchURLsWithOptions(dataSource, true) // 跳过缓存
if err != nil {
return fmt.Errorf("failed to refresh data source %d: %w", dataSource.ID, err)
}
log.Printf("数据源 %d 强制刷新完成", dataSource.ID)
return nil
}

View File

@ -154,7 +154,7 @@ func (p *Preloader) RefreshDataSource(dataSourceID uint) error {
}
log.Printf("手动刷新数据源 %d", dataSourceID)
return p.dataSourceFetcher.PreloadDataSource(&dataSource)
return p.dataSourceFetcher.RefreshDataSource(&dataSource)
}
// RefreshEndpoint 手动刷新指定端点的所有数据源
@ -184,7 +184,7 @@ func (p *Preloader) RefreshEndpoint(endpointID uint) error {
go func(ds model.DataSource) {
defer wg.Done()
if err := p.dataSourceFetcher.PreloadDataSource(&ds); err != nil {
if err := p.dataSourceFetcher.RefreshDataSource(&ds); err != nil {
log.Printf("刷新数据源 %d 失败: %v", ds.ID, err)
lastErr = err
}