Files
junhong_cmp_fiber/internal/polling/scheduler.go
huang 18daeae65a
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
## 变更概述
将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。

## 数据库变更
- 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record
- 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record
- 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包
- 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备

## 代码变更
- Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型
- Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存
- Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务
- Bootstrap 层:更新 Store 和 Service 依赖注入
- 常量层:按钱包类型重新组织常量和 Redis Key 生成函数

## 技术特性
- 乐观锁:使用 version 字段防止并发冲突
- 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤
- 事务管理:所有余额变动使用事务保证 ACID
- 缓存策略:Cache-Aside 模式,余额变动后删除缓存

## 业务影响
- 代理钱包和卡钱包业务完全隔离,互不影响
- 为独立监控、优化、扩展打下基础
- 提升代理钱包的稳定性和独立性

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-25 09:51:00 +08:00

745 lines
22 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package polling
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// Scheduler 轮询调度器
// 负责管理 IoT 卡的定期检查任务(实名、流量、套餐)
type Scheduler struct {
db *gorm.DB
redis *redis.Client
queueClient *asynq.Client
logger *zap.Logger
configStore *postgres.PollingConfigStore
iotCardStore *postgres.IotCardStore
concurrencyStore *postgres.PollingConcurrencyConfigStore
// 任务 19: 套餐激活检查处理器
packageActivationHandler *PackageActivationHandler
// 任务 20: 流量重置调度处理器
dataResetHandler *DataResetHandler
// 配置缓存
configCache []*model.PollingConfig
configCacheLock sync.RWMutex
configCacheTime time.Time
// 初始化状态
initProgress *InitProgress
initCompleted atomic.Bool
// 控制信号
stopChan chan struct{}
wg sync.WaitGroup
}
// InitProgress 初始化进度
type InitProgress struct {
mu sync.RWMutex
TotalCards int64 `json:"total_cards"` // 总卡数
LoadedCards int64 `json:"loaded_cards"` // 已加载卡数
StartTime time.Time `json:"start_time"` // 开始时间
LastBatchTime time.Time `json:"last_batch_time"` // 最后一批处理时间
Status string `json:"status"` // 状态: pending, running, completed, failed
ErrorMessage string `json:"error_message"` // 错误信息
}
// SchedulerConfig 调度器配置
// 设计目标:支持一亿张卡规模
type SchedulerConfig struct {
ScheduleInterval time.Duration // 调度循环间隔(默认 1 秒,支持高吞吐)
InitBatchSize int // 初始化每批加载数量(默认 100000
InitBatchSleepDuration time.Duration // 初始化批次间休眠时间(默认 500ms
ConfigCacheTTL time.Duration // 配置缓存 TTL默认 5 分钟)
CardCacheTTL time.Duration // 卡信息缓存 TTL默认 7 天)
ScheduleBatchSize int // 每次调度取出的卡数(默认 50000
MaxManualBatchSize int // 手动触发每次处理数量(默认 1000
}
// DefaultSchedulerConfig 默认调度器配置
// 单 Worker 设计吞吐50000 张/秒,支持多 Worker 水平扩展
func DefaultSchedulerConfig() *SchedulerConfig {
return &SchedulerConfig{
ScheduleInterval: 1 * time.Second, // 1秒调度一次提高响应速度
InitBatchSize: 100000, // 10万张/批初始化
InitBatchSleepDuration: 500 * time.Millisecond, // 500ms 间隔,加快初始化
ConfigCacheTTL: 5 * time.Minute,
CardCacheTTL: 7 * 24 * time.Hour,
ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张
MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张
}
}
// NewScheduler 创建调度器实例
func NewScheduler(
db *gorm.DB,
redisClient *redis.Client,
queueClient *asynq.Client,
logger *zap.Logger,
) *Scheduler {
return &Scheduler{
db: db,
redis: redisClient,
queueClient: queueClient,
logger: logger,
configStore: postgres.NewPollingConfigStore(db),
iotCardStore: postgres.NewIotCardStore(db, redisClient),
concurrencyStore: postgres.NewPollingConcurrencyConfigStore(db),
packageActivationHandler: NewPackageActivationHandler(db, redisClient, queueClient, nil, logger),
dataResetHandler: NewDataResetHandler(nil, logger), // ResetService 需要通过 SetResetService 注入
initProgress: &InitProgress{
Status: "pending",
},
stopChan: make(chan struct{}),
}
}
// Start 启动调度器
// 快速启动10秒内完成配置加载和调度器启动
func (s *Scheduler) Start(ctx context.Context) error {
startTime := time.Now()
s.logger.Info("轮询调度器启动中...")
// 1. 加载轮询配置到缓存
if err := s.loadConfigs(ctx); err != nil {
s.logger.Error("加载轮询配置失败", zap.Error(err))
return err
}
s.logger.Info("轮询配置已加载", zap.Int("config_count", len(s.configCache)))
// 2. 初始化并发控制配置
if err := s.initConcurrencyConfigs(ctx); err != nil {
s.logger.Warn("初始化并发控制配置失败,使用默认值", zap.Error(err))
}
// 3. 启动调度循环(非阻塞)
s.wg.Add(1)
go s.scheduleLoop(ctx)
// 4. 启动后台渐进式初始化(非阻塞)
s.wg.Add(1)
go s.progressiveInit(ctx)
elapsed := time.Since(startTime)
s.logger.Info("轮询调度器已启动",
zap.Duration("startup_time", elapsed),
zap.Bool("fast_start", elapsed < 10*time.Second))
return nil
}
// Stop 停止调度器
func (s *Scheduler) Stop() {
s.logger.Info("正在停止轮询调度器...")
close(s.stopChan)
s.wg.Wait()
s.logger.Info("轮询调度器已停止")
}
// loadConfigs 加载轮询配置到缓存
func (s *Scheduler) loadConfigs(ctx context.Context) error {
configs, err := s.configStore.ListEnabled(ctx)
if err != nil {
return err
}
s.configCacheLock.Lock()
s.configCache = configs
s.configCacheTime = time.Now()
s.configCacheLock.Unlock()
// 同步到 Redis 缓存
return s.syncConfigsToRedis(ctx, configs)
}
// syncConfigsToRedis 同步配置到 Redis
func (s *Scheduler) syncConfigsToRedis(ctx context.Context, configs []*model.PollingConfig) error {
key := constants.RedisPollingConfigsCacheKey()
// 序列化配置列表为 JSON
configData := make([]interface{}, 0, len(configs)*2)
for _, cfg := range configs {
jsonData, err := json.Marshal(cfg)
if err != nil {
s.logger.Warn("序列化轮询配置失败", zap.Uint("config_id", cfg.ID), zap.Error(err))
continue
}
configData = append(configData, cfg.ID, string(jsonData))
}
if len(configData) > 0 {
pipe := s.redis.Pipeline()
pipe.Del(ctx, key)
// 使用 HSET 存储配置
pipe.HSet(ctx, key, configData...)
pipe.Expire(ctx, key, 24*time.Hour)
_, err := pipe.Exec(ctx)
return err
}
return nil
}
// initConcurrencyConfigs 初始化并发控制配置到 Redis
func (s *Scheduler) initConcurrencyConfigs(ctx context.Context) error {
configs, err := s.concurrencyStore.List(ctx)
if err != nil {
return err
}
for _, cfg := range configs {
key := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType)
if err := s.redis.Set(ctx, key, cfg.MaxConcurrency, 0).Err(); err != nil {
s.logger.Warn("设置并发配置失败",
zap.String("task_type", cfg.TaskType),
zap.Error(err))
}
}
return nil
}
// scheduleLoop 调度循环
// 每 10 秒执行一次,从 Redis Sorted Set 获取到期的卡,生成 Asynq 任务
func (s *Scheduler) scheduleLoop(ctx context.Context) {
defer s.wg.Done()
config := DefaultSchedulerConfig()
ticker := time.NewTicker(config.ScheduleInterval)
defer ticker.Stop()
s.logger.Info("调度循环已启动", zap.Duration("interval", config.ScheduleInterval))
for {
select {
case <-s.stopChan:
s.logger.Info("调度循环收到停止信号")
return
case <-ticker.C:
s.processSchedule(ctx)
}
}
}
// processSchedule 处理一次调度
func (s *Scheduler) processSchedule(ctx context.Context) {
now := time.Now().Unix()
// 1. 优先处理手动触发队列
s.processManualQueue(ctx, constants.TaskTypePollingRealname)
s.processManualQueue(ctx, constants.TaskTypePollingCarddata)
s.processManualQueue(ctx, constants.TaskTypePollingPackage)
// 2. 处理定时队列
s.processTimedQueue(ctx, constants.RedisPollingQueueRealnameKey(), constants.TaskTypePollingRealname, now)
s.processTimedQueue(ctx, constants.RedisPollingQueueCarddataKey(), constants.TaskTypePollingCarddata, now)
s.processTimedQueue(ctx, constants.RedisPollingQueuePackageKey(), constants.TaskTypePollingPackage, now)
// 任务 19.6: 套餐激活检查(每次调度都执行,内部会限流)
if s.packageActivationHandler != nil {
if err := s.packageActivationHandler.HandlePackageActivationCheck(ctx); err != nil {
s.logger.Warn("套餐激活检查失败", zap.Error(err))
}
}
// 任务 20.6: 流量重置调度(每次调度都执行,内部会限流)
if s.dataResetHandler != nil {
if err := s.dataResetHandler.HandleDataReset(ctx); err != nil {
s.logger.Warn("流量重置调度失败", zap.Error(err))
}
}
}
// processManualQueue 处理手动触发队列
// 优化:批量读取和提交,提高吞吐
func (s *Scheduler) processManualQueue(ctx context.Context, taskType string) {
config := DefaultSchedulerConfig()
key := constants.RedisPollingManualQueueKey(taskType)
// 批量读取手动触发任务
cardIDs := make([]string, 0, config.MaxManualBatchSize)
for i := 0; i < config.MaxManualBatchSize; i++ {
cardIDStr, err := s.redis.LPop(ctx, key).Result()
if err != nil {
if err != redis.Nil {
s.logger.Error("读取手动触发队列失败",
zap.String("task_type", taskType),
zap.Error(err))
}
break
}
cardIDs = append(cardIDs, cardIDStr)
}
// 批量提交任务
if len(cardIDs) > 0 {
s.enqueueBatch(ctx, taskType, cardIDs, true)
}
}
// processTimedQueue 处理定时队列
// 优化:支持大批量处理,每次最多取 ScheduleBatchSize 张卡
func (s *Scheduler) processTimedQueue(ctx context.Context, queueKey, taskType string, now int64) {
config := DefaultSchedulerConfig()
// 获取所有到期的卡score <= now
// 使用 ZRANGEBYSCORE 获取,每次最多取 ScheduleBatchSize 张
cardIDs, err := s.redis.ZRangeByScore(ctx, queueKey, &redis.ZRangeBy{
Min: "-inf",
Max: formatInt64(now),
Count: int64(config.ScheduleBatchSize),
}).Result()
if err != nil {
if err != redis.Nil {
s.logger.Error("读取定时队列失败",
zap.String("queue_key", queueKey),
zap.Error(err))
}
return
}
if len(cardIDs) == 0 {
return
}
// 只在数量较大时打印日志,避免日志过多
if len(cardIDs) >= 1000 {
s.logger.Info("处理定时队列",
zap.String("task_type", taskType),
zap.Int("card_count", len(cardIDs)))
}
// 移除已取出的卡(使用最后一个卡的 score 作为边界,更精确)
if err := s.redis.ZRemRangeByScore(ctx, queueKey, "-inf", formatInt64(now)).Err(); err != nil {
s.logger.Error("移除已处理的卡失败", zap.Error(err))
}
// 批量提交任务(使用 goroutine 并行提交,提高吞吐)
s.enqueueBatch(ctx, taskType, cardIDs, false)
}
// enqueueBatch 批量提交任务到 Asynq 队列
// 使用多 goroutine 并行提交,提高吞吐量
func (s *Scheduler) enqueueBatch(ctx context.Context, taskType string, cardIDs []string, isManual bool) {
if len(cardIDs) == 0 {
return
}
// 分批并行提交,每批 1000 个,最多 10 个并行
batchSize := 1000
maxParallel := 10
sem := make(chan struct{}, maxParallel)
var wg sync.WaitGroup
for i := 0; i < len(cardIDs); i += batchSize {
end := i + batchSize
if end > len(cardIDs) {
end = len(cardIDs)
}
batch := cardIDs[i:end]
wg.Add(1)
sem <- struct{}{} // 获取信号量
go func(batch []string) {
defer wg.Done()
defer func() { <-sem }() // 释放信号量
for _, cardID := range batch {
if err := s.enqueueTask(ctx, taskType, cardID, isManual); err != nil {
s.logger.Warn("提交任务失败",
zap.String("task_type", taskType),
zap.String("card_id", cardID),
zap.Error(err))
}
}
}(batch)
}
wg.Wait()
}
// enqueueTask 提交任务到 Asynq 队列
func (s *Scheduler) enqueueTask(ctx context.Context, taskType, cardID string, isManual bool) error {
payload := map[string]interface{}{
"card_id": cardID,
"is_manual": isManual,
"timestamp": time.Now().Unix(),
}
task := asynq.NewTask(taskType, mustMarshal(payload),
asynq.MaxRetry(0), // 不重试,失败后重新入队
asynq.Timeout(30*time.Second), // 30秒超时
asynq.Queue(constants.QueueDefault),
)
_, err := s.queueClient.Enqueue(task)
return err
}
// progressiveInit 渐进式初始化
// 分批加载卡数据到 Redis每批 10 万张sleep 1 秒
func (s *Scheduler) progressiveInit(ctx context.Context) {
defer s.wg.Done()
config := DefaultSchedulerConfig()
s.initProgress.mu.Lock()
s.initProgress.Status = "running"
s.initProgress.StartTime = time.Now()
s.initProgress.mu.Unlock()
s.logger.Info("开始渐进式初始化...")
// 获取总卡数
var totalCards int64
if err := s.db.Model(&model.IotCard{}).Count(&totalCards).Error; err != nil {
s.logger.Error("获取卡总数失败", zap.Error(err))
s.setInitError(err.Error())
return
}
s.initProgress.mu.Lock()
s.initProgress.TotalCards = totalCards
s.initProgress.mu.Unlock()
s.logger.Info("开始加载卡数据", zap.Int64("total_cards", totalCards))
// 使用游标分批加载
var lastID uint = 0
batchCount := 0
for {
select {
case <-s.stopChan:
s.logger.Info("渐进式初始化被中断")
return
default:
}
// 加载一批卡
var cards []*model.IotCard
err := s.db.WithContext(ctx).
Where("id > ?", lastID).
Order("id ASC").
Limit(config.InitBatchSize).
Find(&cards).Error
if err != nil {
s.logger.Error("加载卡数据失败", zap.Error(err))
s.setInitError(err.Error())
return
}
if len(cards) == 0 {
break
}
// 批量处理这批卡(使用 Pipeline 提高性能)
if err := s.initCardsBatch(ctx, cards); err != nil {
s.logger.Warn("批量初始化卡轮询失败", zap.Error(err))
}
lastID = cards[len(cards)-1].ID
batchCount++
s.initProgress.mu.Lock()
s.initProgress.LoadedCards += int64(len(cards))
s.initProgress.LastBatchTime = time.Now()
s.initProgress.mu.Unlock()
s.logger.Info("完成一批卡初始化",
zap.Int("batch", batchCount),
zap.Int("batch_size", len(cards)),
zap.Int64("loaded", s.initProgress.LoadedCards),
zap.Int64("total", totalCards))
// 批次间休眠,避免打爆数据库
time.Sleep(config.InitBatchSleepDuration)
}
s.initProgress.mu.Lock()
s.initProgress.Status = "completed"
s.initProgress.mu.Unlock()
s.initCompleted.Store(true)
s.logger.Info("渐进式初始化完成",
zap.Int64("total_loaded", s.initProgress.LoadedCards),
zap.Duration("duration", time.Since(s.initProgress.StartTime)))
}
// initCardsBatch 批量初始化卡的轮询
// 使用 Redis Pipeline 批量写入,大幅提高初始化性能
// 10万张卡从 ~60秒 优化到 ~5秒
func (s *Scheduler) initCardsBatch(ctx context.Context, cards []*model.IotCard) error {
if len(cards) == 0 {
return nil
}
config := DefaultSchedulerConfig()
now := time.Now()
pipe := s.redis.Pipeline()
for _, card := range cards {
// 匹配配置
cfg := s.MatchConfig(card)
if cfg == nil {
continue // 无匹配配置,不需要轮询
}
// 添加到相应的轮询队列
if cfg.RealnameCheckInterval != nil && *cfg.RealnameCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *cfg.RealnameCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueueRealnameKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
if cfg.CarddataCheckInterval != nil && *cfg.CarddataCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.CarddataCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueueCarddataKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
if cfg.PackageCheckInterval != nil && *cfg.PackageCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.PackageCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueuePackageKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
// 缓存卡信息到 Redis
cacheKey := constants.RedisPollingCardInfoKey(card.ID)
cacheData := map[string]interface{}{
"id": card.ID,
"iccid": card.ICCID,
"card_category": card.CardCategory,
"real_name_status": card.RealNameStatus,
"network_status": card.NetworkStatus,
"carrier_id": card.CarrierID,
"cached_at": now.Unix(),
}
pipe.HSet(ctx, cacheKey, cacheData)
pipe.Expire(ctx, cacheKey, config.CardCacheTTL)
}
// 执行 Pipeline
_, err := pipe.Exec(ctx)
return err
}
// initCardPolling 初始化单张卡的轮询(保留用于懒加载场景)
func (s *Scheduler) initCardPolling(ctx context.Context, card *model.IotCard) error {
// 匹配配置
config := s.MatchConfig(card)
if config == nil {
return nil // 无匹配配置,不需要轮询
}
now := time.Now()
// 添加到相应的轮询队列
if config.RealnameCheckInterval != nil && *config.RealnameCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *config.RealnameCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueueRealnameKey(), card.ID, nextCheck); err != nil {
return err
}
}
if config.CarddataCheckInterval != nil && *config.CarddataCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.CarddataCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueueCarddataKey(), card.ID, nextCheck); err != nil {
return err
}
}
if config.PackageCheckInterval != nil && *config.PackageCheckInterval > 0 {
// 套餐检查使用流量检查时间作为参考
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.PackageCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueuePackageKey(), card.ID, nextCheck); err != nil {
return err
}
}
// 缓存卡信息到 Redis
return s.cacheCardInfo(ctx, card, now)
}
// MatchConfig 匹配轮询配置
// 按优先级返回第一个匹配的配置
func (s *Scheduler) MatchConfig(card *model.IotCard) *model.PollingConfig {
s.configCacheLock.RLock()
defer s.configCacheLock.RUnlock()
for _, cfg := range s.configCache {
if s.matchConfigConditions(cfg, card) {
return cfg
}
}
return nil
}
// matchConfigConditions 检查卡是否匹配配置条件
func (s *Scheduler) matchConfigConditions(cfg *model.PollingConfig, card *model.IotCard) bool {
// 检查卡状态条件
if cfg.CardCondition != "" {
cardCondition := s.getCardCondition(card)
if cfg.CardCondition != cardCondition {
return false
}
}
// 检查卡业务类型
if cfg.CardCategory != "" {
if cfg.CardCategory != card.CardCategory {
return false
}
}
// 检查运营商
if cfg.CarrierID != nil {
if *cfg.CarrierID != card.CarrierID {
return false
}
}
return true
}
// getCardCondition 获取卡的状态条件
func (s *Scheduler) getCardCondition(card *model.IotCard) string {
// 根据卡的实名状态和激活状态确定条件
if card.RealNameStatus == 0 || card.RealNameStatus == 1 {
return "not_real_name" // 未实名
}
if card.RealNameStatus == 2 {
if card.NetworkStatus == 1 {
return "activated" // 已激活
}
return "real_name" // 已实名但未激活
}
if card.NetworkStatus == 0 {
return "suspended" // 已停用
}
return ""
}
// calculateNextCheckTime 计算下次检查时间
func (s *Scheduler) calculateNextCheckTime(lastCheckAt *time.Time, intervalSeconds int) time.Time {
now := time.Now()
if lastCheckAt == nil {
// 首次检查,立即执行(加上随机抖动避免集中)
jitter := time.Duration(now.UnixNano()%int64(intervalSeconds)) * time.Second / 10
return now.Add(jitter)
}
// 计算下次检查时间
nextCheck := lastCheckAt.Add(time.Duration(intervalSeconds) * time.Second)
if nextCheck.Before(now) {
// 如果已过期,立即执行
return now
}
return nextCheck
}
// addToQueue 添加卡到轮询队列
func (s *Scheduler) addToQueue(ctx context.Context, queueKey string, cardID uint, nextCheck time.Time) error {
score := float64(nextCheck.Unix())
member := formatUint(cardID)
return s.redis.ZAdd(ctx, queueKey, redis.Z{
Score: score,
Member: member,
}).Err()
}
// cacheCardInfo 缓存卡信息到 Redis
func (s *Scheduler) cacheCardInfo(ctx context.Context, card *model.IotCard, cachedAt time.Time) error {
key := constants.RedisPollingCardInfoKey(card.ID)
config := DefaultSchedulerConfig()
data := map[string]interface{}{
"id": card.ID,
"iccid": card.ICCID,
"card_category": card.CardCategory,
"real_name_status": card.RealNameStatus,
"network_status": card.NetworkStatus,
"carrier_id": card.CarrierID,
"cached_at": cachedAt.Unix(),
}
pipe := s.redis.Pipeline()
pipe.HSet(ctx, key, data)
pipe.Expire(ctx, key, config.CardCacheTTL)
_, err := pipe.Exec(ctx)
return err
}
// setInitError 设置初始化错误
func (s *Scheduler) setInitError(msg string) {
s.initProgress.mu.Lock()
s.initProgress.Status = "failed"
s.initProgress.ErrorMessage = msg
s.initProgress.mu.Unlock()
}
// GetInitProgress 获取初始化进度
func (s *Scheduler) GetInitProgress() InitProgress {
s.initProgress.mu.RLock()
defer s.initProgress.mu.RUnlock()
return InitProgress{
TotalCards: s.initProgress.TotalCards,
LoadedCards: s.initProgress.LoadedCards,
StartTime: s.initProgress.StartTime,
LastBatchTime: s.initProgress.LastBatchTime,
Status: s.initProgress.Status,
ErrorMessage: s.initProgress.ErrorMessage,
}
}
// IsInitCompleted 检查初始化是否完成
func (s *Scheduler) IsInitCompleted() bool {
return s.initCompleted.Load()
}
// RefreshConfigs 刷新配置缓存
func (s *Scheduler) RefreshConfigs(ctx context.Context) error {
return s.loadConfigs(ctx)
}
// SetResetService 设置流量重置服务(用于依赖注入)
func (s *Scheduler) SetResetService(resetService interface{}) {
if rs, ok := resetService.(*DataResetHandler); ok {
s.dataResetHandler = rs
}
}
// SetActivationService 设置套餐激活服务(用于依赖注入)
func (s *Scheduler) SetActivationService(activationHandler *PackageActivationHandler) {
s.packageActivationHandler = activationHandler
}