package polling import ( "context" "encoding/json" "sync" "sync/atomic" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" ) // Scheduler 轮询调度器 // 负责管理 IoT 卡的定期检查任务(实名、流量、套餐) type Scheduler struct { db *gorm.DB redis *redis.Client queueClient *asynq.Client logger *zap.Logger configStore *postgres.PollingConfigStore iotCardStore *postgres.IotCardStore concurrencyStore *postgres.PollingConcurrencyConfigStore // 任务 19: 套餐激活检查处理器 packageActivationHandler *PackageActivationHandler // 任务 20: 流量重置调度处理器 dataResetHandler *DataResetHandler // 配置缓存 configCache []*model.PollingConfig configCacheLock sync.RWMutex configCacheTime time.Time // 初始化状态 initProgress *InitProgress initCompleted atomic.Bool // 控制信号 stopChan chan struct{} wg sync.WaitGroup } // InitProgress 初始化进度 type InitProgress struct { mu sync.RWMutex TotalCards int64 `json:"total_cards"` // 总卡数 LoadedCards int64 `json:"loaded_cards"` // 已加载卡数 StartTime time.Time `json:"start_time"` // 开始时间 LastBatchTime time.Time `json:"last_batch_time"` // 最后一批处理时间 Status string `json:"status"` // 状态: pending, running, completed, failed ErrorMessage string `json:"error_message"` // 错误信息 } // SchedulerConfig 调度器配置 // 设计目标:支持一亿张卡规模 type SchedulerConfig struct { ScheduleInterval time.Duration // 调度循环间隔(默认 1 秒,支持高吞吐) InitBatchSize int // 初始化每批加载数量(默认 100000) InitBatchSleepDuration time.Duration // 初始化批次间休眠时间(默认 500ms) ConfigCacheTTL time.Duration // 配置缓存 TTL(默认 5 分钟) CardCacheTTL time.Duration // 卡信息缓存 TTL(默认 7 天) ScheduleBatchSize int // 每次调度取出的卡数(默认 50000) MaxManualBatchSize int // 手动触发每次处理数量(默认 1000) } // DefaultSchedulerConfig 默认调度器配置 // 单 Worker 设计吞吐:50000 张/秒,支持多 Worker 水平扩展 func DefaultSchedulerConfig() *SchedulerConfig { return &SchedulerConfig{ ScheduleInterval: 1 * time.Second, // 1秒调度一次,提高响应速度 InitBatchSize: 100000, // 10万张/批初始化 InitBatchSleepDuration: 500 * time.Millisecond, // 500ms 间隔,加快初始化 ConfigCacheTTL: 5 * time.Minute, CardCacheTTL: 7 * 24 * time.Hour, ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张 MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张 } } // NewScheduler 创建调度器实例 func NewScheduler( db *gorm.DB, redisClient *redis.Client, queueClient *asynq.Client, logger *zap.Logger, ) *Scheduler { return &Scheduler{ db: db, redis: redisClient, queueClient: queueClient, logger: logger, configStore: postgres.NewPollingConfigStore(db), iotCardStore: postgres.NewIotCardStore(db, redisClient), concurrencyStore: postgres.NewPollingConcurrencyConfigStore(db), packageActivationHandler: NewPackageActivationHandler(db, redisClient, queueClient, nil, logger), dataResetHandler: NewDataResetHandler(nil, logger), // ResetService 需要通过 SetResetService 注入 initProgress: &InitProgress{ Status: "pending", }, stopChan: make(chan struct{}), } } // Start 启动调度器 // 快速启动:10秒内完成配置加载和调度器启动 func (s *Scheduler) Start(ctx context.Context) error { startTime := time.Now() s.logger.Info("轮询调度器启动中...") // 1. 加载轮询配置到缓存 if err := s.loadConfigs(ctx); err != nil { s.logger.Error("加载轮询配置失败", zap.Error(err)) return err } s.logger.Info("轮询配置已加载", zap.Int("config_count", len(s.configCache))) // 2. 初始化并发控制配置 if err := s.initConcurrencyConfigs(ctx); err != nil { s.logger.Warn("初始化并发控制配置失败,使用默认值", zap.Error(err)) } // 3. 启动调度循环(非阻塞) s.wg.Add(1) go s.scheduleLoop(ctx) // 4. 启动后台渐进式初始化(非阻塞) s.wg.Add(1) go s.progressiveInit(ctx) elapsed := time.Since(startTime) s.logger.Info("轮询调度器已启动", zap.Duration("startup_time", elapsed), zap.Bool("fast_start", elapsed < 10*time.Second)) return nil } // Stop 停止调度器 func (s *Scheduler) Stop() { s.logger.Info("正在停止轮询调度器...") close(s.stopChan) s.wg.Wait() s.logger.Info("轮询调度器已停止") } // loadConfigs 加载轮询配置到缓存 func (s *Scheduler) loadConfigs(ctx context.Context) error { configs, err := s.configStore.ListEnabled(ctx) if err != nil { return err } s.configCacheLock.Lock() s.configCache = configs s.configCacheTime = time.Now() s.configCacheLock.Unlock() // 同步到 Redis 缓存 return s.syncConfigsToRedis(ctx, configs) } // syncConfigsToRedis 同步配置到 Redis func (s *Scheduler) syncConfigsToRedis(ctx context.Context, configs []*model.PollingConfig) error { key := constants.RedisPollingConfigsCacheKey() // 序列化配置列表为 JSON configData := make([]interface{}, 0, len(configs)*2) for _, cfg := range configs { jsonData, err := json.Marshal(cfg) if err != nil { s.logger.Warn("序列化轮询配置失败", zap.Uint("config_id", cfg.ID), zap.Error(err)) continue } configData = append(configData, cfg.ID, string(jsonData)) } if len(configData) > 0 { pipe := s.redis.Pipeline() pipe.Del(ctx, key) // 使用 HSET 存储配置 pipe.HSet(ctx, key, configData...) pipe.Expire(ctx, key, 24*time.Hour) _, err := pipe.Exec(ctx) return err } return nil } // initConcurrencyConfigs 初始化并发控制配置到 Redis func (s *Scheduler) initConcurrencyConfigs(ctx context.Context) error { configs, err := s.concurrencyStore.List(ctx) if err != nil { return err } for _, cfg := range configs { key := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType) if err := s.redis.Set(ctx, key, cfg.MaxConcurrency, 0).Err(); err != nil { s.logger.Warn("设置并发配置失败", zap.String("task_type", cfg.TaskType), zap.Error(err)) } } return nil } // scheduleLoop 调度循环 // 每 10 秒执行一次,从 Redis Sorted Set 获取到期的卡,生成 Asynq 任务 func (s *Scheduler) scheduleLoop(ctx context.Context) { defer s.wg.Done() config := DefaultSchedulerConfig() ticker := time.NewTicker(config.ScheduleInterval) defer ticker.Stop() s.logger.Info("调度循环已启动", zap.Duration("interval", config.ScheduleInterval)) for { select { case <-s.stopChan: s.logger.Info("调度循环收到停止信号") return case <-ticker.C: s.processSchedule(ctx) } } } // processSchedule 处理一次调度 func (s *Scheduler) processSchedule(ctx context.Context) { now := time.Now().Unix() // 1. 优先处理手动触发队列 s.processManualQueue(ctx, constants.TaskTypePollingRealname) s.processManualQueue(ctx, constants.TaskTypePollingCarddata) s.processManualQueue(ctx, constants.TaskTypePollingPackage) // 2. 处理定时队列 s.processTimedQueue(ctx, constants.RedisPollingQueueRealnameKey(), constants.TaskTypePollingRealname, now) s.processTimedQueue(ctx, constants.RedisPollingQueueCarddataKey(), constants.TaskTypePollingCarddata, now) s.processTimedQueue(ctx, constants.RedisPollingQueuePackageKey(), constants.TaskTypePollingPackage, now) // 任务 19.6: 套餐激活检查(每次调度都执行,内部会限流) if s.packageActivationHandler != nil { if err := s.packageActivationHandler.HandlePackageActivationCheck(ctx); err != nil { s.logger.Warn("套餐激活检查失败", zap.Error(err)) } } // 任务 20.6: 流量重置调度(每次调度都执行,内部会限流) if s.dataResetHandler != nil { if err := s.dataResetHandler.HandleDataReset(ctx); err != nil { s.logger.Warn("流量重置调度失败", zap.Error(err)) } } } // processManualQueue 处理手动触发队列 // 优化:批量读取和提交,提高吞吐 func (s *Scheduler) processManualQueue(ctx context.Context, taskType string) { config := DefaultSchedulerConfig() key := constants.RedisPollingManualQueueKey(taskType) // 批量读取手动触发任务 cardIDs := make([]string, 0, config.MaxManualBatchSize) for i := 0; i < config.MaxManualBatchSize; i++ { cardIDStr, err := s.redis.LPop(ctx, key).Result() if err != nil { if err != redis.Nil { s.logger.Error("读取手动触发队列失败", zap.String("task_type", taskType), zap.Error(err)) } break } cardIDs = append(cardIDs, cardIDStr) } // 批量提交任务 if len(cardIDs) > 0 { s.enqueueBatch(ctx, taskType, cardIDs, true) } } // processTimedQueue 处理定时队列 // 优化:支持大批量处理,每次最多取 ScheduleBatchSize 张卡 func (s *Scheduler) processTimedQueue(ctx context.Context, queueKey, taskType string, now int64) { config := DefaultSchedulerConfig() // 获取所有到期的卡(score <= now) // 使用 ZRANGEBYSCORE 获取,每次最多取 ScheduleBatchSize 张 cardIDs, err := s.redis.ZRangeByScore(ctx, queueKey, &redis.ZRangeBy{ Min: "-inf", Max: formatInt64(now), Count: int64(config.ScheduleBatchSize), }).Result() if err != nil { if err != redis.Nil { s.logger.Error("读取定时队列失败", zap.String("queue_key", queueKey), zap.Error(err)) } return } if len(cardIDs) == 0 { return } // 只在数量较大时打印日志,避免日志过多 if len(cardIDs) >= 1000 { s.logger.Info("处理定时队列", zap.String("task_type", taskType), zap.Int("card_count", len(cardIDs))) } // 移除已取出的卡(使用最后一个卡的 score 作为边界,更精确) if err := s.redis.ZRemRangeByScore(ctx, queueKey, "-inf", formatInt64(now)).Err(); err != nil { s.logger.Error("移除已处理的卡失败", zap.Error(err)) } // 批量提交任务(使用 goroutine 并行提交,提高吞吐) s.enqueueBatch(ctx, taskType, cardIDs, false) } // enqueueBatch 批量提交任务到 Asynq 队列 // 使用多 goroutine 并行提交,提高吞吐量 func (s *Scheduler) enqueueBatch(ctx context.Context, taskType string, cardIDs []string, isManual bool) { if len(cardIDs) == 0 { return } // 分批并行提交,每批 1000 个,最多 10 个并行 batchSize := 1000 maxParallel := 10 sem := make(chan struct{}, maxParallel) var wg sync.WaitGroup for i := 0; i < len(cardIDs); i += batchSize { end := i + batchSize if end > len(cardIDs) { end = len(cardIDs) } batch := cardIDs[i:end] wg.Add(1) sem <- struct{}{} // 获取信号量 go func(batch []string) { defer wg.Done() defer func() { <-sem }() // 释放信号量 for _, cardID := range batch { if err := s.enqueueTask(ctx, taskType, cardID, isManual); err != nil { s.logger.Warn("提交任务失败", zap.String("task_type", taskType), zap.String("card_id", cardID), zap.Error(err)) } } }(batch) } wg.Wait() } // enqueueTask 提交任务到 Asynq 队列 func (s *Scheduler) enqueueTask(ctx context.Context, taskType, cardID string, isManual bool) error { payload := map[string]interface{}{ "card_id": cardID, "is_manual": isManual, "timestamp": time.Now().Unix(), } task := asynq.NewTask(taskType, mustMarshal(payload), asynq.MaxRetry(0), // 不重试,失败后重新入队 asynq.Timeout(30*time.Second), // 30秒超时 asynq.Queue(constants.QueueDefault), ) _, err := s.queueClient.Enqueue(task) return err } // progressiveInit 渐进式初始化 // 分批加载卡数据到 Redis,每批 10 万张,sleep 1 秒 func (s *Scheduler) progressiveInit(ctx context.Context) { defer s.wg.Done() config := DefaultSchedulerConfig() s.initProgress.mu.Lock() s.initProgress.Status = "running" s.initProgress.StartTime = time.Now() s.initProgress.mu.Unlock() s.logger.Info("开始渐进式初始化...") // 获取总卡数 var totalCards int64 if err := s.db.Model(&model.IotCard{}).Count(&totalCards).Error; err != nil { s.logger.Error("获取卡总数失败", zap.Error(err)) s.setInitError(err.Error()) return } s.initProgress.mu.Lock() s.initProgress.TotalCards = totalCards s.initProgress.mu.Unlock() s.logger.Info("开始加载卡数据", zap.Int64("total_cards", totalCards)) // 使用游标分批加载 var lastID uint = 0 batchCount := 0 for { select { case <-s.stopChan: s.logger.Info("渐进式初始化被中断") return default: } // 加载一批卡 var cards []*model.IotCard err := s.db.WithContext(ctx). Where("id > ?", lastID). Order("id ASC"). Limit(config.InitBatchSize). Find(&cards).Error if err != nil { s.logger.Error("加载卡数据失败", zap.Error(err)) s.setInitError(err.Error()) return } if len(cards) == 0 { break } // 批量处理这批卡(使用 Pipeline 提高性能) if err := s.initCardsBatch(ctx, cards); err != nil { s.logger.Warn("批量初始化卡轮询失败", zap.Error(err)) } lastID = cards[len(cards)-1].ID batchCount++ s.initProgress.mu.Lock() s.initProgress.LoadedCards += int64(len(cards)) s.initProgress.LastBatchTime = time.Now() s.initProgress.mu.Unlock() s.logger.Info("完成一批卡初始化", zap.Int("batch", batchCount), zap.Int("batch_size", len(cards)), zap.Int64("loaded", s.initProgress.LoadedCards), zap.Int64("total", totalCards)) // 批次间休眠,避免打爆数据库 time.Sleep(config.InitBatchSleepDuration) } s.initProgress.mu.Lock() s.initProgress.Status = "completed" s.initProgress.mu.Unlock() s.initCompleted.Store(true) s.logger.Info("渐进式初始化完成", zap.Int64("total_loaded", s.initProgress.LoadedCards), zap.Duration("duration", time.Since(s.initProgress.StartTime))) } // initCardsBatch 批量初始化卡的轮询 // 使用 Redis Pipeline 批量写入,大幅提高初始化性能 // 10万张卡从 ~60秒 优化到 ~5秒 func (s *Scheduler) initCardsBatch(ctx context.Context, cards []*model.IotCard) error { if len(cards) == 0 { return nil } config := DefaultSchedulerConfig() now := time.Now() pipe := s.redis.Pipeline() for _, card := range cards { // 匹配配置 cfg := s.MatchConfig(card) if cfg == nil { continue // 无匹配配置,不需要轮询 } // 添加到相应的轮询队列 if cfg.RealnameCheckInterval != nil && *cfg.RealnameCheckInterval > 0 { nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *cfg.RealnameCheckInterval) pipe.ZAdd(ctx, constants.RedisPollingQueueRealnameKey(), redis.Z{ Score: float64(nextCheck.Unix()), Member: card.ID, }) } if cfg.CarddataCheckInterval != nil && *cfg.CarddataCheckInterval > 0 { nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.CarddataCheckInterval) pipe.ZAdd(ctx, constants.RedisPollingQueueCarddataKey(), redis.Z{ Score: float64(nextCheck.Unix()), Member: card.ID, }) } if cfg.PackageCheckInterval != nil && *cfg.PackageCheckInterval > 0 { nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.PackageCheckInterval) pipe.ZAdd(ctx, constants.RedisPollingQueuePackageKey(), redis.Z{ Score: float64(nextCheck.Unix()), Member: card.ID, }) } // 缓存卡信息到 Redis cacheKey := constants.RedisPollingCardInfoKey(card.ID) cacheData := map[string]interface{}{ "id": card.ID, "iccid": card.ICCID, "card_category": card.CardCategory, "real_name_status": card.RealNameStatus, "network_status": card.NetworkStatus, "carrier_id": card.CarrierID, "cached_at": now.Unix(), } pipe.HSet(ctx, cacheKey, cacheData) pipe.Expire(ctx, cacheKey, config.CardCacheTTL) } // 执行 Pipeline _, err := pipe.Exec(ctx) return err } // initCardPolling 初始化单张卡的轮询(保留用于懒加载场景) func (s *Scheduler) initCardPolling(ctx context.Context, card *model.IotCard) error { // 匹配配置 config := s.MatchConfig(card) if config == nil { return nil // 无匹配配置,不需要轮询 } now := time.Now() // 添加到相应的轮询队列 if config.RealnameCheckInterval != nil && *config.RealnameCheckInterval > 0 { nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *config.RealnameCheckInterval) if err := s.addToQueue(ctx, constants.RedisPollingQueueRealnameKey(), card.ID, nextCheck); err != nil { return err } } if config.CarddataCheckInterval != nil && *config.CarddataCheckInterval > 0 { nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.CarddataCheckInterval) if err := s.addToQueue(ctx, constants.RedisPollingQueueCarddataKey(), card.ID, nextCheck); err != nil { return err } } if config.PackageCheckInterval != nil && *config.PackageCheckInterval > 0 { // 套餐检查使用流量检查时间作为参考 nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.PackageCheckInterval) if err := s.addToQueue(ctx, constants.RedisPollingQueuePackageKey(), card.ID, nextCheck); err != nil { return err } } // 缓存卡信息到 Redis return s.cacheCardInfo(ctx, card, now) } // MatchConfig 匹配轮询配置 // 按优先级返回第一个匹配的配置 func (s *Scheduler) MatchConfig(card *model.IotCard) *model.PollingConfig { s.configCacheLock.RLock() defer s.configCacheLock.RUnlock() for _, cfg := range s.configCache { if s.matchConfigConditions(cfg, card) { return cfg } } return nil } // matchConfigConditions 检查卡是否匹配配置条件 func (s *Scheduler) matchConfigConditions(cfg *model.PollingConfig, card *model.IotCard) bool { // 检查卡状态条件 if cfg.CardCondition != "" { cardCondition := s.getCardCondition(card) if cfg.CardCondition != cardCondition { return false } } // 检查卡业务类型 if cfg.CardCategory != "" { if cfg.CardCategory != card.CardCategory { return false } } // 检查运营商 if cfg.CarrierID != nil { if *cfg.CarrierID != card.CarrierID { return false } } return true } // getCardCondition 获取卡的状态条件 func (s *Scheduler) getCardCondition(card *model.IotCard) string { // 根据卡的实名状态和激活状态确定条件 if card.RealNameStatus == 0 || card.RealNameStatus == 1 { return "not_real_name" // 未实名 } if card.RealNameStatus == 2 { if card.NetworkStatus == 1 { return "activated" // 已激活 } return "real_name" // 已实名但未激活 } if card.NetworkStatus == 0 { return "suspended" // 已停用 } return "" } // calculateNextCheckTime 计算下次检查时间 func (s *Scheduler) calculateNextCheckTime(lastCheckAt *time.Time, intervalSeconds int) time.Time { now := time.Now() if lastCheckAt == nil { // 首次检查,立即执行(加上随机抖动避免集中) jitter := time.Duration(now.UnixNano()%int64(intervalSeconds)) * time.Second / 10 return now.Add(jitter) } // 计算下次检查时间 nextCheck := lastCheckAt.Add(time.Duration(intervalSeconds) * time.Second) if nextCheck.Before(now) { // 如果已过期,立即执行 return now } return nextCheck } // addToQueue 添加卡到轮询队列 func (s *Scheduler) addToQueue(ctx context.Context, queueKey string, cardID uint, nextCheck time.Time) error { score := float64(nextCheck.Unix()) member := formatUint(cardID) return s.redis.ZAdd(ctx, queueKey, redis.Z{ Score: score, Member: member, }).Err() } // cacheCardInfo 缓存卡信息到 Redis func (s *Scheduler) cacheCardInfo(ctx context.Context, card *model.IotCard, cachedAt time.Time) error { key := constants.RedisPollingCardInfoKey(card.ID) config := DefaultSchedulerConfig() data := map[string]interface{}{ "id": card.ID, "iccid": card.ICCID, "card_category": card.CardCategory, "real_name_status": card.RealNameStatus, "network_status": card.NetworkStatus, "carrier_id": card.CarrierID, "cached_at": cachedAt.Unix(), } pipe := s.redis.Pipeline() pipe.HSet(ctx, key, data) pipe.Expire(ctx, key, config.CardCacheTTL) _, err := pipe.Exec(ctx) return err } // setInitError 设置初始化错误 func (s *Scheduler) setInitError(msg string) { s.initProgress.mu.Lock() s.initProgress.Status = "failed" s.initProgress.ErrorMessage = msg s.initProgress.mu.Unlock() } // GetInitProgress 获取初始化进度 func (s *Scheduler) GetInitProgress() InitProgress { s.initProgress.mu.RLock() defer s.initProgress.mu.RUnlock() return InitProgress{ TotalCards: s.initProgress.TotalCards, LoadedCards: s.initProgress.LoadedCards, StartTime: s.initProgress.StartTime, LastBatchTime: s.initProgress.LastBatchTime, Status: s.initProgress.Status, ErrorMessage: s.initProgress.ErrorMessage, } } // IsInitCompleted 检查初始化是否完成 func (s *Scheduler) IsInitCompleted() bool { return s.initCompleted.Load() } // RefreshConfigs 刷新配置缓存 func (s *Scheduler) RefreshConfigs(ctx context.Context) error { return s.loadConfigs(ctx) } // SetResetService 设置流量重置服务(用于依赖注入) func (s *Scheduler) SetResetService(resetService interface{}) { if rs, ok := resetService.(*DataResetHandler); ok { s.dataResetHandler = rs } } // SetActivationService 设置套餐激活服务(用于依赖注入) func (s *Scheduler) SetActivationService(activationHandler *PackageActivationHandler) { s.packageActivationHandler = activationHandler }