Files
junhong_cmp_fiber/internal/polling/scheduler.go
huang 931e140e8e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
feat: 实现 IoT 卡轮询系统(支持千万级卡规模)
实现功能:
- 实名状态检查轮询(可配置间隔)
- 卡流量检查轮询(支持跨月流量追踪)
- 套餐检查与超额自动停机
- 分布式并发控制(Redis 信号量)
- 手动触发轮询(单卡/批量/条件筛选)
- 数据清理配置与执行
- 告警规则与历史记录
- 实时监控统计(队列/性能/并发)

性能优化:
- Redis 缓存卡信息,减少 DB 查询
- Pipeline 批量写入 Redis
- 异步流量记录写入
- 渐进式初始化(10万卡/批)

压测工具(scripts/benchmark/):
- Mock Gateway 模拟上游服务
- 测试卡生成器
- 配置初始化脚本
- 实时监控脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 17:32:44 +08:00

712 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package polling
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// Scheduler 轮询调度器
// 负责管理 IoT 卡的定期检查任务(实名、流量、套餐)
type Scheduler struct {
db *gorm.DB
redis *redis.Client
queueClient *asynq.Client
logger *zap.Logger
configStore *postgres.PollingConfigStore
iotCardStore *postgres.IotCardStore
concurrencyStore *postgres.PollingConcurrencyConfigStore
// 配置缓存
configCache []*model.PollingConfig
configCacheLock sync.RWMutex
configCacheTime time.Time
// 初始化状态
initProgress *InitProgress
initCompleted atomic.Bool
// 控制信号
stopChan chan struct{}
wg sync.WaitGroup
}
// InitProgress 初始化进度
type InitProgress struct {
mu sync.RWMutex
TotalCards int64 `json:"total_cards"` // 总卡数
LoadedCards int64 `json:"loaded_cards"` // 已加载卡数
StartTime time.Time `json:"start_time"` // 开始时间
LastBatchTime time.Time `json:"last_batch_time"` // 最后一批处理时间
Status string `json:"status"` // 状态: pending, running, completed, failed
ErrorMessage string `json:"error_message"` // 错误信息
}
// SchedulerConfig 调度器配置
// 设计目标:支持一亿张卡规模
type SchedulerConfig struct {
ScheduleInterval time.Duration // 调度循环间隔(默认 1 秒,支持高吞吐)
InitBatchSize int // 初始化每批加载数量(默认 100000
InitBatchSleepDuration time.Duration // 初始化批次间休眠时间(默认 500ms
ConfigCacheTTL time.Duration // 配置缓存 TTL默认 5 分钟)
CardCacheTTL time.Duration // 卡信息缓存 TTL默认 7 天)
ScheduleBatchSize int // 每次调度取出的卡数(默认 50000
MaxManualBatchSize int // 手动触发每次处理数量(默认 1000
}
// DefaultSchedulerConfig 默认调度器配置
// 单 Worker 设计吞吐50000 张/秒,支持多 Worker 水平扩展
func DefaultSchedulerConfig() *SchedulerConfig {
return &SchedulerConfig{
ScheduleInterval: 1 * time.Second, // 1秒调度一次提高响应速度
InitBatchSize: 100000, // 10万张/批初始化
InitBatchSleepDuration: 500 * time.Millisecond, // 500ms 间隔,加快初始化
ConfigCacheTTL: 5 * time.Minute,
CardCacheTTL: 7 * 24 * time.Hour,
ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张
MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张
}
}
// NewScheduler 创建调度器实例
func NewScheduler(
db *gorm.DB,
redisClient *redis.Client,
queueClient *asynq.Client,
logger *zap.Logger,
) *Scheduler {
return &Scheduler{
db: db,
redis: redisClient,
queueClient: queueClient,
logger: logger,
configStore: postgres.NewPollingConfigStore(db),
iotCardStore: postgres.NewIotCardStore(db, redisClient),
concurrencyStore: postgres.NewPollingConcurrencyConfigStore(db),
initProgress: &InitProgress{
Status: "pending",
},
stopChan: make(chan struct{}),
}
}
// Start 启动调度器
// 快速启动10秒内完成配置加载和调度器启动
func (s *Scheduler) Start(ctx context.Context) error {
startTime := time.Now()
s.logger.Info("轮询调度器启动中...")
// 1. 加载轮询配置到缓存
if err := s.loadConfigs(ctx); err != nil {
s.logger.Error("加载轮询配置失败", zap.Error(err))
return err
}
s.logger.Info("轮询配置已加载", zap.Int("config_count", len(s.configCache)))
// 2. 初始化并发控制配置
if err := s.initConcurrencyConfigs(ctx); err != nil {
s.logger.Warn("初始化并发控制配置失败,使用默认值", zap.Error(err))
}
// 3. 启动调度循环(非阻塞)
s.wg.Add(1)
go s.scheduleLoop(ctx)
// 4. 启动后台渐进式初始化(非阻塞)
s.wg.Add(1)
go s.progressiveInit(ctx)
elapsed := time.Since(startTime)
s.logger.Info("轮询调度器已启动",
zap.Duration("startup_time", elapsed),
zap.Bool("fast_start", elapsed < 10*time.Second))
return nil
}
// Stop 停止调度器
func (s *Scheduler) Stop() {
s.logger.Info("正在停止轮询调度器...")
close(s.stopChan)
s.wg.Wait()
s.logger.Info("轮询调度器已停止")
}
// loadConfigs 加载轮询配置到缓存
func (s *Scheduler) loadConfigs(ctx context.Context) error {
configs, err := s.configStore.ListEnabled(ctx)
if err != nil {
return err
}
s.configCacheLock.Lock()
s.configCache = configs
s.configCacheTime = time.Now()
s.configCacheLock.Unlock()
// 同步到 Redis 缓存
return s.syncConfigsToRedis(ctx, configs)
}
// syncConfigsToRedis 同步配置到 Redis
func (s *Scheduler) syncConfigsToRedis(ctx context.Context, configs []*model.PollingConfig) error {
key := constants.RedisPollingConfigsCacheKey()
// 序列化配置列表为 JSON
configData := make([]interface{}, 0, len(configs)*2)
for _, cfg := range configs {
jsonData, err := json.Marshal(cfg)
if err != nil {
s.logger.Warn("序列化轮询配置失败", zap.Uint("config_id", cfg.ID), zap.Error(err))
continue
}
configData = append(configData, cfg.ID, string(jsonData))
}
if len(configData) > 0 {
pipe := s.redis.Pipeline()
pipe.Del(ctx, key)
// 使用 HSET 存储配置
pipe.HSet(ctx, key, configData...)
pipe.Expire(ctx, key, 24*time.Hour)
_, err := pipe.Exec(ctx)
return err
}
return nil
}
// initConcurrencyConfigs 初始化并发控制配置到 Redis
func (s *Scheduler) initConcurrencyConfigs(ctx context.Context) error {
configs, err := s.concurrencyStore.List(ctx)
if err != nil {
return err
}
for _, cfg := range configs {
key := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType)
if err := s.redis.Set(ctx, key, cfg.MaxConcurrency, 0).Err(); err != nil {
s.logger.Warn("设置并发配置失败",
zap.String("task_type", cfg.TaskType),
zap.Error(err))
}
}
return nil
}
// scheduleLoop 调度循环
// 每 10 秒执行一次,从 Redis Sorted Set 获取到期的卡,生成 Asynq 任务
func (s *Scheduler) scheduleLoop(ctx context.Context) {
defer s.wg.Done()
config := DefaultSchedulerConfig()
ticker := time.NewTicker(config.ScheduleInterval)
defer ticker.Stop()
s.logger.Info("调度循环已启动", zap.Duration("interval", config.ScheduleInterval))
for {
select {
case <-s.stopChan:
s.logger.Info("调度循环收到停止信号")
return
case <-ticker.C:
s.processSchedule(ctx)
}
}
}
// processSchedule 处理一次调度
func (s *Scheduler) processSchedule(ctx context.Context) {
now := time.Now().Unix()
// 1. 优先处理手动触发队列
s.processManualQueue(ctx, constants.TaskTypePollingRealname)
s.processManualQueue(ctx, constants.TaskTypePollingCarddata)
s.processManualQueue(ctx, constants.TaskTypePollingPackage)
// 2. 处理定时队列
s.processTimedQueue(ctx, constants.RedisPollingQueueRealnameKey(), constants.TaskTypePollingRealname, now)
s.processTimedQueue(ctx, constants.RedisPollingQueueCarddataKey(), constants.TaskTypePollingCarddata, now)
s.processTimedQueue(ctx, constants.RedisPollingQueuePackageKey(), constants.TaskTypePollingPackage, now)
}
// processManualQueue 处理手动触发队列
// 优化:批量读取和提交,提高吞吐
func (s *Scheduler) processManualQueue(ctx context.Context, taskType string) {
config := DefaultSchedulerConfig()
key := constants.RedisPollingManualQueueKey(taskType)
// 批量读取手动触发任务
cardIDs := make([]string, 0, config.MaxManualBatchSize)
for i := 0; i < config.MaxManualBatchSize; i++ {
cardIDStr, err := s.redis.LPop(ctx, key).Result()
if err != nil {
if err != redis.Nil {
s.logger.Error("读取手动触发队列失败",
zap.String("task_type", taskType),
zap.Error(err))
}
break
}
cardIDs = append(cardIDs, cardIDStr)
}
// 批量提交任务
if len(cardIDs) > 0 {
s.enqueueBatch(ctx, taskType, cardIDs, true)
}
}
// processTimedQueue 处理定时队列
// 优化:支持大批量处理,每次最多取 ScheduleBatchSize 张卡
func (s *Scheduler) processTimedQueue(ctx context.Context, queueKey, taskType string, now int64) {
config := DefaultSchedulerConfig()
// 获取所有到期的卡score <= now
// 使用 ZRANGEBYSCORE 获取,每次最多取 ScheduleBatchSize 张
cardIDs, err := s.redis.ZRangeByScore(ctx, queueKey, &redis.ZRangeBy{
Min: "-inf",
Max: formatInt64(now),
Count: int64(config.ScheduleBatchSize),
}).Result()
if err != nil {
if err != redis.Nil {
s.logger.Error("读取定时队列失败",
zap.String("queue_key", queueKey),
zap.Error(err))
}
return
}
if len(cardIDs) == 0 {
return
}
// 只在数量较大时打印日志,避免日志过多
if len(cardIDs) >= 1000 {
s.logger.Info("处理定时队列",
zap.String("task_type", taskType),
zap.Int("card_count", len(cardIDs)))
}
// 移除已取出的卡(使用最后一个卡的 score 作为边界,更精确)
if err := s.redis.ZRemRangeByScore(ctx, queueKey, "-inf", formatInt64(now)).Err(); err != nil {
s.logger.Error("移除已处理的卡失败", zap.Error(err))
}
// 批量提交任务(使用 goroutine 并行提交,提高吞吐)
s.enqueueBatch(ctx, taskType, cardIDs, false)
}
// enqueueBatch 批量提交任务到 Asynq 队列
// 使用多 goroutine 并行提交,提高吞吐量
func (s *Scheduler) enqueueBatch(ctx context.Context, taskType string, cardIDs []string, isManual bool) {
if len(cardIDs) == 0 {
return
}
// 分批并行提交,每批 1000 个,最多 10 个并行
batchSize := 1000
maxParallel := 10
sem := make(chan struct{}, maxParallel)
var wg sync.WaitGroup
for i := 0; i < len(cardIDs); i += batchSize {
end := i + batchSize
if end > len(cardIDs) {
end = len(cardIDs)
}
batch := cardIDs[i:end]
wg.Add(1)
sem <- struct{}{} // 获取信号量
go func(batch []string) {
defer wg.Done()
defer func() { <-sem }() // 释放信号量
for _, cardID := range batch {
if err := s.enqueueTask(ctx, taskType, cardID, isManual); err != nil {
s.logger.Warn("提交任务失败",
zap.String("task_type", taskType),
zap.String("card_id", cardID),
zap.Error(err))
}
}
}(batch)
}
wg.Wait()
}
// enqueueTask 提交任务到 Asynq 队列
func (s *Scheduler) enqueueTask(ctx context.Context, taskType, cardID string, isManual bool) error {
payload := map[string]interface{}{
"card_id": cardID,
"is_manual": isManual,
"timestamp": time.Now().Unix(),
}
task := asynq.NewTask(taskType, mustMarshal(payload),
asynq.MaxRetry(0), // 不重试,失败后重新入队
asynq.Timeout(30*time.Second), // 30秒超时
asynq.Queue(constants.QueueDefault),
)
_, err := s.queueClient.Enqueue(task)
return err
}
// progressiveInit 渐进式初始化
// 分批加载卡数据到 Redis每批 10 万张sleep 1 秒
func (s *Scheduler) progressiveInit(ctx context.Context) {
defer s.wg.Done()
config := DefaultSchedulerConfig()
s.initProgress.mu.Lock()
s.initProgress.Status = "running"
s.initProgress.StartTime = time.Now()
s.initProgress.mu.Unlock()
s.logger.Info("开始渐进式初始化...")
// 获取总卡数
var totalCards int64
if err := s.db.Model(&model.IotCard{}).Count(&totalCards).Error; err != nil {
s.logger.Error("获取卡总数失败", zap.Error(err))
s.setInitError(err.Error())
return
}
s.initProgress.mu.Lock()
s.initProgress.TotalCards = totalCards
s.initProgress.mu.Unlock()
s.logger.Info("开始加载卡数据", zap.Int64("total_cards", totalCards))
// 使用游标分批加载
var lastID uint = 0
batchCount := 0
for {
select {
case <-s.stopChan:
s.logger.Info("渐进式初始化被中断")
return
default:
}
// 加载一批卡
var cards []*model.IotCard
err := s.db.WithContext(ctx).
Where("id > ?", lastID).
Order("id ASC").
Limit(config.InitBatchSize).
Find(&cards).Error
if err != nil {
s.logger.Error("加载卡数据失败", zap.Error(err))
s.setInitError(err.Error())
return
}
if len(cards) == 0 {
break
}
// 批量处理这批卡(使用 Pipeline 提高性能)
if err := s.initCardsBatch(ctx, cards); err != nil {
s.logger.Warn("批量初始化卡轮询失败", zap.Error(err))
}
lastID = cards[len(cards)-1].ID
batchCount++
s.initProgress.mu.Lock()
s.initProgress.LoadedCards += int64(len(cards))
s.initProgress.LastBatchTime = time.Now()
s.initProgress.mu.Unlock()
s.logger.Info("完成一批卡初始化",
zap.Int("batch", batchCount),
zap.Int("batch_size", len(cards)),
zap.Int64("loaded", s.initProgress.LoadedCards),
zap.Int64("total", totalCards))
// 批次间休眠,避免打爆数据库
time.Sleep(config.InitBatchSleepDuration)
}
s.initProgress.mu.Lock()
s.initProgress.Status = "completed"
s.initProgress.mu.Unlock()
s.initCompleted.Store(true)
s.logger.Info("渐进式初始化完成",
zap.Int64("total_loaded", s.initProgress.LoadedCards),
zap.Duration("duration", time.Since(s.initProgress.StartTime)))
}
// initCardsBatch 批量初始化卡的轮询
// 使用 Redis Pipeline 批量写入,大幅提高初始化性能
// 10万张卡从 ~60秒 优化到 ~5秒
func (s *Scheduler) initCardsBatch(ctx context.Context, cards []*model.IotCard) error {
if len(cards) == 0 {
return nil
}
config := DefaultSchedulerConfig()
now := time.Now()
pipe := s.redis.Pipeline()
for _, card := range cards {
// 匹配配置
cfg := s.MatchConfig(card)
if cfg == nil {
continue // 无匹配配置,不需要轮询
}
// 添加到相应的轮询队列
if cfg.RealnameCheckInterval != nil && *cfg.RealnameCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *cfg.RealnameCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueueRealnameKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
if cfg.CarddataCheckInterval != nil && *cfg.CarddataCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.CarddataCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueueCarddataKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
if cfg.PackageCheckInterval != nil && *cfg.PackageCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.PackageCheckInterval)
pipe.ZAdd(ctx, constants.RedisPollingQueuePackageKey(), redis.Z{
Score: float64(nextCheck.Unix()),
Member: card.ID,
})
}
// 缓存卡信息到 Redis
cacheKey := constants.RedisPollingCardInfoKey(card.ID)
cacheData := map[string]interface{}{
"id": card.ID,
"iccid": card.ICCID,
"card_category": card.CardCategory,
"real_name_status": card.RealNameStatus,
"network_status": card.NetworkStatus,
"carrier_id": card.CarrierID,
"cached_at": now.Unix(),
}
pipe.HSet(ctx, cacheKey, cacheData)
pipe.Expire(ctx, cacheKey, config.CardCacheTTL)
}
// 执行 Pipeline
_, err := pipe.Exec(ctx)
return err
}
// initCardPolling 初始化单张卡的轮询(保留用于懒加载场景)
func (s *Scheduler) initCardPolling(ctx context.Context, card *model.IotCard) error {
// 匹配配置
config := s.MatchConfig(card)
if config == nil {
return nil // 无匹配配置,不需要轮询
}
now := time.Now()
// 添加到相应的轮询队列
if config.RealnameCheckInterval != nil && *config.RealnameCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *config.RealnameCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueueRealnameKey(), card.ID, nextCheck); err != nil {
return err
}
}
if config.CarddataCheckInterval != nil && *config.CarddataCheckInterval > 0 {
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.CarddataCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueueCarddataKey(), card.ID, nextCheck); err != nil {
return err
}
}
if config.PackageCheckInterval != nil && *config.PackageCheckInterval > 0 {
// 套餐检查使用流量检查时间作为参考
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.PackageCheckInterval)
if err := s.addToQueue(ctx, constants.RedisPollingQueuePackageKey(), card.ID, nextCheck); err != nil {
return err
}
}
// 缓存卡信息到 Redis
return s.cacheCardInfo(ctx, card, now)
}
// MatchConfig 匹配轮询配置
// 按优先级返回第一个匹配的配置
func (s *Scheduler) MatchConfig(card *model.IotCard) *model.PollingConfig {
s.configCacheLock.RLock()
defer s.configCacheLock.RUnlock()
for _, cfg := range s.configCache {
if s.matchConfigConditions(cfg, card) {
return cfg
}
}
return nil
}
// matchConfigConditions 检查卡是否匹配配置条件
func (s *Scheduler) matchConfigConditions(cfg *model.PollingConfig, card *model.IotCard) bool {
// 检查卡状态条件
if cfg.CardCondition != "" {
cardCondition := s.getCardCondition(card)
if cfg.CardCondition != cardCondition {
return false
}
}
// 检查卡业务类型
if cfg.CardCategory != "" {
if cfg.CardCategory != card.CardCategory {
return false
}
}
// 检查运营商
if cfg.CarrierID != nil {
if *cfg.CarrierID != card.CarrierID {
return false
}
}
return true
}
// getCardCondition 获取卡的状态条件
func (s *Scheduler) getCardCondition(card *model.IotCard) string {
// 根据卡的实名状态和激活状态确定条件
if card.RealNameStatus == 0 || card.RealNameStatus == 1 {
return "not_real_name" // 未实名
}
if card.RealNameStatus == 2 {
if card.NetworkStatus == 1 {
return "activated" // 已激活
}
return "real_name" // 已实名但未激活
}
if card.NetworkStatus == 0 {
return "suspended" // 已停用
}
return ""
}
// calculateNextCheckTime 计算下次检查时间
func (s *Scheduler) calculateNextCheckTime(lastCheckAt *time.Time, intervalSeconds int) time.Time {
now := time.Now()
if lastCheckAt == nil {
// 首次检查,立即执行(加上随机抖动避免集中)
jitter := time.Duration(now.UnixNano()%int64(intervalSeconds)) * time.Second / 10
return now.Add(jitter)
}
// 计算下次检查时间
nextCheck := lastCheckAt.Add(time.Duration(intervalSeconds) * time.Second)
if nextCheck.Before(now) {
// 如果已过期,立即执行
return now
}
return nextCheck
}
// addToQueue 添加卡到轮询队列
func (s *Scheduler) addToQueue(ctx context.Context, queueKey string, cardID uint, nextCheck time.Time) error {
score := float64(nextCheck.Unix())
member := formatUint(cardID)
return s.redis.ZAdd(ctx, queueKey, redis.Z{
Score: score,
Member: member,
}).Err()
}
// cacheCardInfo 缓存卡信息到 Redis
func (s *Scheduler) cacheCardInfo(ctx context.Context, card *model.IotCard, cachedAt time.Time) error {
key := constants.RedisPollingCardInfoKey(card.ID)
config := DefaultSchedulerConfig()
data := map[string]interface{}{
"id": card.ID,
"iccid": card.ICCID,
"card_category": card.CardCategory,
"real_name_status": card.RealNameStatus,
"network_status": card.NetworkStatus,
"carrier_id": card.CarrierID,
"cached_at": cachedAt.Unix(),
}
pipe := s.redis.Pipeline()
pipe.HSet(ctx, key, data)
pipe.Expire(ctx, key, config.CardCacheTTL)
_, err := pipe.Exec(ctx)
return err
}
// setInitError 设置初始化错误
func (s *Scheduler) setInitError(msg string) {
s.initProgress.mu.Lock()
s.initProgress.Status = "failed"
s.initProgress.ErrorMessage = msg
s.initProgress.mu.Unlock()
}
// GetInitProgress 获取初始化进度
func (s *Scheduler) GetInitProgress() InitProgress {
s.initProgress.mu.RLock()
defer s.initProgress.mu.RUnlock()
return InitProgress{
TotalCards: s.initProgress.TotalCards,
LoadedCards: s.initProgress.LoadedCards,
StartTime: s.initProgress.StartTime,
LastBatchTime: s.initProgress.LastBatchTime,
Status: s.initProgress.Status,
ErrorMessage: s.initProgress.ErrorMessage,
}
}
// IsInitCompleted 检查初始化是否完成
func (s *Scheduler) IsInitCompleted() bool {
return s.initCompleted.Load()
}
// RefreshConfigs 刷新配置缓存
func (s *Scheduler) RefreshConfigs(ctx context.Context) error {
return s.loadConfigs(ctx)
}