All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
## 变更概述 将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。 ## 数据库变更 - 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record - 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record - 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包 - 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备 ## 代码变更 - Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型 - Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存 - Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务 - Bootstrap 层:更新 Store 和 Service 依赖注入 - 常量层:按钱包类型重新组织常量和 Redis Key 生成函数 ## 技术特性 - 乐观锁:使用 version 字段防止并发冲突 - 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤 - 事务管理:所有余额变动使用事务保证 ACID - 缓存策略:Cache-Aside 模式,余额变动后删除缓存 ## 业务影响 - 代理钱包和卡钱包业务完全隔离,互不影响 - 为独立监控、优化、扩展打下基础 - 提升代理钱包的稳定性和独立性 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
745 lines
22 KiB
Go
745 lines
22 KiB
Go
package polling
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/hibiken/asynq"
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"gorm.io/gorm"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
)
|
||
|
||
// Scheduler 轮询调度器
|
||
// 负责管理 IoT 卡的定期检查任务(实名、流量、套餐)
|
||
type Scheduler struct {
|
||
db *gorm.DB
|
||
redis *redis.Client
|
||
queueClient *asynq.Client
|
||
logger *zap.Logger
|
||
configStore *postgres.PollingConfigStore
|
||
iotCardStore *postgres.IotCardStore
|
||
concurrencyStore *postgres.PollingConcurrencyConfigStore
|
||
|
||
// 任务 19: 套餐激活检查处理器
|
||
packageActivationHandler *PackageActivationHandler
|
||
// 任务 20: 流量重置调度处理器
|
||
dataResetHandler *DataResetHandler
|
||
|
||
// 配置缓存
|
||
configCache []*model.PollingConfig
|
||
configCacheLock sync.RWMutex
|
||
configCacheTime time.Time
|
||
|
||
// 初始化状态
|
||
initProgress *InitProgress
|
||
initCompleted atomic.Bool
|
||
|
||
// 控制信号
|
||
stopChan chan struct{}
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
// InitProgress 初始化进度
|
||
type InitProgress struct {
|
||
mu sync.RWMutex
|
||
TotalCards int64 `json:"total_cards"` // 总卡数
|
||
LoadedCards int64 `json:"loaded_cards"` // 已加载卡数
|
||
StartTime time.Time `json:"start_time"` // 开始时间
|
||
LastBatchTime time.Time `json:"last_batch_time"` // 最后一批处理时间
|
||
Status string `json:"status"` // 状态: pending, running, completed, failed
|
||
ErrorMessage string `json:"error_message"` // 错误信息
|
||
}
|
||
|
||
// SchedulerConfig 调度器配置
|
||
// 设计目标:支持一亿张卡规模
|
||
type SchedulerConfig struct {
|
||
ScheduleInterval time.Duration // 调度循环间隔(默认 1 秒,支持高吞吐)
|
||
InitBatchSize int // 初始化每批加载数量(默认 100000)
|
||
InitBatchSleepDuration time.Duration // 初始化批次间休眠时间(默认 500ms)
|
||
ConfigCacheTTL time.Duration // 配置缓存 TTL(默认 5 分钟)
|
||
CardCacheTTL time.Duration // 卡信息缓存 TTL(默认 7 天)
|
||
ScheduleBatchSize int // 每次调度取出的卡数(默认 50000)
|
||
MaxManualBatchSize int // 手动触发每次处理数量(默认 1000)
|
||
}
|
||
|
||
// DefaultSchedulerConfig 默认调度器配置
|
||
// 单 Worker 设计吞吐:50000 张/秒,支持多 Worker 水平扩展
|
||
func DefaultSchedulerConfig() *SchedulerConfig {
|
||
return &SchedulerConfig{
|
||
ScheduleInterval: 1 * time.Second, // 1秒调度一次,提高响应速度
|
||
InitBatchSize: 100000, // 10万张/批初始化
|
||
InitBatchSleepDuration: 500 * time.Millisecond, // 500ms 间隔,加快初始化
|
||
ConfigCacheTTL: 5 * time.Minute,
|
||
CardCacheTTL: 7 * 24 * time.Hour,
|
||
ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张
|
||
MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张
|
||
}
|
||
}
|
||
|
||
// NewScheduler 创建调度器实例
|
||
func NewScheduler(
|
||
db *gorm.DB,
|
||
redisClient *redis.Client,
|
||
queueClient *asynq.Client,
|
||
logger *zap.Logger,
|
||
) *Scheduler {
|
||
return &Scheduler{
|
||
db: db,
|
||
redis: redisClient,
|
||
queueClient: queueClient,
|
||
logger: logger,
|
||
configStore: postgres.NewPollingConfigStore(db),
|
||
iotCardStore: postgres.NewIotCardStore(db, redisClient),
|
||
concurrencyStore: postgres.NewPollingConcurrencyConfigStore(db),
|
||
packageActivationHandler: NewPackageActivationHandler(db, redisClient, queueClient, nil, logger),
|
||
dataResetHandler: NewDataResetHandler(nil, logger), // ResetService 需要通过 SetResetService 注入
|
||
initProgress: &InitProgress{
|
||
Status: "pending",
|
||
},
|
||
stopChan: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Start 启动调度器
|
||
// 快速启动:10秒内完成配置加载和调度器启动
|
||
func (s *Scheduler) Start(ctx context.Context) error {
|
||
startTime := time.Now()
|
||
s.logger.Info("轮询调度器启动中...")
|
||
|
||
// 1. 加载轮询配置到缓存
|
||
if err := s.loadConfigs(ctx); err != nil {
|
||
s.logger.Error("加载轮询配置失败", zap.Error(err))
|
||
return err
|
||
}
|
||
s.logger.Info("轮询配置已加载", zap.Int("config_count", len(s.configCache)))
|
||
|
||
// 2. 初始化并发控制配置
|
||
if err := s.initConcurrencyConfigs(ctx); err != nil {
|
||
s.logger.Warn("初始化并发控制配置失败,使用默认值", zap.Error(err))
|
||
}
|
||
|
||
// 3. 启动调度循环(非阻塞)
|
||
s.wg.Add(1)
|
||
go s.scheduleLoop(ctx)
|
||
|
||
// 4. 启动后台渐进式初始化(非阻塞)
|
||
s.wg.Add(1)
|
||
go s.progressiveInit(ctx)
|
||
|
||
elapsed := time.Since(startTime)
|
||
s.logger.Info("轮询调度器已启动",
|
||
zap.Duration("startup_time", elapsed),
|
||
zap.Bool("fast_start", elapsed < 10*time.Second))
|
||
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止调度器
|
||
func (s *Scheduler) Stop() {
|
||
s.logger.Info("正在停止轮询调度器...")
|
||
close(s.stopChan)
|
||
s.wg.Wait()
|
||
s.logger.Info("轮询调度器已停止")
|
||
}
|
||
|
||
// loadConfigs 加载轮询配置到缓存
|
||
func (s *Scheduler) loadConfigs(ctx context.Context) error {
|
||
configs, err := s.configStore.ListEnabled(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
s.configCacheLock.Lock()
|
||
s.configCache = configs
|
||
s.configCacheTime = time.Now()
|
||
s.configCacheLock.Unlock()
|
||
|
||
// 同步到 Redis 缓存
|
||
return s.syncConfigsToRedis(ctx, configs)
|
||
}
|
||
|
||
// syncConfigsToRedis 同步配置到 Redis
|
||
func (s *Scheduler) syncConfigsToRedis(ctx context.Context, configs []*model.PollingConfig) error {
|
||
key := constants.RedisPollingConfigsCacheKey()
|
||
|
||
// 序列化配置列表为 JSON
|
||
configData := make([]interface{}, 0, len(configs)*2)
|
||
for _, cfg := range configs {
|
||
jsonData, err := json.Marshal(cfg)
|
||
if err != nil {
|
||
s.logger.Warn("序列化轮询配置失败", zap.Uint("config_id", cfg.ID), zap.Error(err))
|
||
continue
|
||
}
|
||
configData = append(configData, cfg.ID, string(jsonData))
|
||
}
|
||
|
||
if len(configData) > 0 {
|
||
pipe := s.redis.Pipeline()
|
||
pipe.Del(ctx, key)
|
||
// 使用 HSET 存储配置
|
||
pipe.HSet(ctx, key, configData...)
|
||
pipe.Expire(ctx, key, 24*time.Hour)
|
||
_, err := pipe.Exec(ctx)
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// initConcurrencyConfigs 初始化并发控制配置到 Redis
|
||
func (s *Scheduler) initConcurrencyConfigs(ctx context.Context) error {
|
||
configs, err := s.concurrencyStore.List(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
for _, cfg := range configs {
|
||
key := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType)
|
||
if err := s.redis.Set(ctx, key, cfg.MaxConcurrency, 0).Err(); err != nil {
|
||
s.logger.Warn("设置并发配置失败",
|
||
zap.String("task_type", cfg.TaskType),
|
||
zap.Error(err))
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// scheduleLoop 调度循环
|
||
// 每 10 秒执行一次,从 Redis Sorted Set 获取到期的卡,生成 Asynq 任务
|
||
func (s *Scheduler) scheduleLoop(ctx context.Context) {
|
||
defer s.wg.Done()
|
||
|
||
config := DefaultSchedulerConfig()
|
||
ticker := time.NewTicker(config.ScheduleInterval)
|
||
defer ticker.Stop()
|
||
|
||
s.logger.Info("调度循环已启动", zap.Duration("interval", config.ScheduleInterval))
|
||
|
||
for {
|
||
select {
|
||
case <-s.stopChan:
|
||
s.logger.Info("调度循环收到停止信号")
|
||
return
|
||
case <-ticker.C:
|
||
s.processSchedule(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// processSchedule 处理一次调度
|
||
func (s *Scheduler) processSchedule(ctx context.Context) {
|
||
now := time.Now().Unix()
|
||
|
||
// 1. 优先处理手动触发队列
|
||
s.processManualQueue(ctx, constants.TaskTypePollingRealname)
|
||
s.processManualQueue(ctx, constants.TaskTypePollingCarddata)
|
||
s.processManualQueue(ctx, constants.TaskTypePollingPackage)
|
||
|
||
// 2. 处理定时队列
|
||
s.processTimedQueue(ctx, constants.RedisPollingQueueRealnameKey(), constants.TaskTypePollingRealname, now)
|
||
s.processTimedQueue(ctx, constants.RedisPollingQueueCarddataKey(), constants.TaskTypePollingCarddata, now)
|
||
s.processTimedQueue(ctx, constants.RedisPollingQueuePackageKey(), constants.TaskTypePollingPackage, now)
|
||
|
||
// 任务 19.6: 套餐激活检查(每次调度都执行,内部会限流)
|
||
if s.packageActivationHandler != nil {
|
||
if err := s.packageActivationHandler.HandlePackageActivationCheck(ctx); err != nil {
|
||
s.logger.Warn("套餐激活检查失败", zap.Error(err))
|
||
}
|
||
}
|
||
|
||
// 任务 20.6: 流量重置调度(每次调度都执行,内部会限流)
|
||
if s.dataResetHandler != nil {
|
||
if err := s.dataResetHandler.HandleDataReset(ctx); err != nil {
|
||
s.logger.Warn("流量重置调度失败", zap.Error(err))
|
||
}
|
||
}
|
||
}
|
||
|
||
// processManualQueue 处理手动触发队列
|
||
// 优化:批量读取和提交,提高吞吐
|
||
func (s *Scheduler) processManualQueue(ctx context.Context, taskType string) {
|
||
config := DefaultSchedulerConfig()
|
||
key := constants.RedisPollingManualQueueKey(taskType)
|
||
|
||
// 批量读取手动触发任务
|
||
cardIDs := make([]string, 0, config.MaxManualBatchSize)
|
||
for i := 0; i < config.MaxManualBatchSize; i++ {
|
||
cardIDStr, err := s.redis.LPop(ctx, key).Result()
|
||
if err != nil {
|
||
if err != redis.Nil {
|
||
s.logger.Error("读取手动触发队列失败",
|
||
zap.String("task_type", taskType),
|
||
zap.Error(err))
|
||
}
|
||
break
|
||
}
|
||
cardIDs = append(cardIDs, cardIDStr)
|
||
}
|
||
|
||
// 批量提交任务
|
||
if len(cardIDs) > 0 {
|
||
s.enqueueBatch(ctx, taskType, cardIDs, true)
|
||
}
|
||
}
|
||
|
||
// processTimedQueue 处理定时队列
|
||
// 优化:支持大批量处理,每次最多取 ScheduleBatchSize 张卡
|
||
func (s *Scheduler) processTimedQueue(ctx context.Context, queueKey, taskType string, now int64) {
|
||
config := DefaultSchedulerConfig()
|
||
|
||
// 获取所有到期的卡(score <= now)
|
||
// 使用 ZRANGEBYSCORE 获取,每次最多取 ScheduleBatchSize 张
|
||
cardIDs, err := s.redis.ZRangeByScore(ctx, queueKey, &redis.ZRangeBy{
|
||
Min: "-inf",
|
||
Max: formatInt64(now),
|
||
Count: int64(config.ScheduleBatchSize),
|
||
}).Result()
|
||
|
||
if err != nil {
|
||
if err != redis.Nil {
|
||
s.logger.Error("读取定时队列失败",
|
||
zap.String("queue_key", queueKey),
|
||
zap.Error(err))
|
||
}
|
||
return
|
||
}
|
||
|
||
if len(cardIDs) == 0 {
|
||
return
|
||
}
|
||
|
||
// 只在数量较大时打印日志,避免日志过多
|
||
if len(cardIDs) >= 1000 {
|
||
s.logger.Info("处理定时队列",
|
||
zap.String("task_type", taskType),
|
||
zap.Int("card_count", len(cardIDs)))
|
||
}
|
||
|
||
// 移除已取出的卡(使用最后一个卡的 score 作为边界,更精确)
|
||
if err := s.redis.ZRemRangeByScore(ctx, queueKey, "-inf", formatInt64(now)).Err(); err != nil {
|
||
s.logger.Error("移除已处理的卡失败", zap.Error(err))
|
||
}
|
||
|
||
// 批量提交任务(使用 goroutine 并行提交,提高吞吐)
|
||
s.enqueueBatch(ctx, taskType, cardIDs, false)
|
||
}
|
||
|
||
// enqueueBatch 批量提交任务到 Asynq 队列
|
||
// 使用多 goroutine 并行提交,提高吞吐量
|
||
func (s *Scheduler) enqueueBatch(ctx context.Context, taskType string, cardIDs []string, isManual bool) {
|
||
if len(cardIDs) == 0 {
|
||
return
|
||
}
|
||
|
||
// 分批并行提交,每批 1000 个,最多 10 个并行
|
||
batchSize := 1000
|
||
maxParallel := 10
|
||
sem := make(chan struct{}, maxParallel)
|
||
var wg sync.WaitGroup
|
||
|
||
for i := 0; i < len(cardIDs); i += batchSize {
|
||
end := i + batchSize
|
||
if end > len(cardIDs) {
|
||
end = len(cardIDs)
|
||
}
|
||
batch := cardIDs[i:end]
|
||
|
||
wg.Add(1)
|
||
sem <- struct{}{} // 获取信号量
|
||
|
||
go func(batch []string) {
|
||
defer wg.Done()
|
||
defer func() { <-sem }() // 释放信号量
|
||
|
||
for _, cardID := range batch {
|
||
if err := s.enqueueTask(ctx, taskType, cardID, isManual); err != nil {
|
||
s.logger.Warn("提交任务失败",
|
||
zap.String("task_type", taskType),
|
||
zap.String("card_id", cardID),
|
||
zap.Error(err))
|
||
}
|
||
}
|
||
}(batch)
|
||
}
|
||
|
||
wg.Wait()
|
||
}
|
||
|
||
// enqueueTask 提交任务到 Asynq 队列
|
||
func (s *Scheduler) enqueueTask(ctx context.Context, taskType, cardID string, isManual bool) error {
|
||
payload := map[string]interface{}{
|
||
"card_id": cardID,
|
||
"is_manual": isManual,
|
||
"timestamp": time.Now().Unix(),
|
||
}
|
||
|
||
task := asynq.NewTask(taskType, mustMarshal(payload),
|
||
asynq.MaxRetry(0), // 不重试,失败后重新入队
|
||
asynq.Timeout(30*time.Second), // 30秒超时
|
||
asynq.Queue(constants.QueueDefault),
|
||
)
|
||
|
||
_, err := s.queueClient.Enqueue(task)
|
||
return err
|
||
}
|
||
|
||
// progressiveInit 渐进式初始化
|
||
// 分批加载卡数据到 Redis,每批 10 万张,sleep 1 秒
|
||
func (s *Scheduler) progressiveInit(ctx context.Context) {
|
||
defer s.wg.Done()
|
||
|
||
config := DefaultSchedulerConfig()
|
||
|
||
s.initProgress.mu.Lock()
|
||
s.initProgress.Status = "running"
|
||
s.initProgress.StartTime = time.Now()
|
||
s.initProgress.mu.Unlock()
|
||
|
||
s.logger.Info("开始渐进式初始化...")
|
||
|
||
// 获取总卡数
|
||
var totalCards int64
|
||
if err := s.db.Model(&model.IotCard{}).Count(&totalCards).Error; err != nil {
|
||
s.logger.Error("获取卡总数失败", zap.Error(err))
|
||
s.setInitError(err.Error())
|
||
return
|
||
}
|
||
|
||
s.initProgress.mu.Lock()
|
||
s.initProgress.TotalCards = totalCards
|
||
s.initProgress.mu.Unlock()
|
||
|
||
s.logger.Info("开始加载卡数据", zap.Int64("total_cards", totalCards))
|
||
|
||
// 使用游标分批加载
|
||
var lastID uint = 0
|
||
batchCount := 0
|
||
|
||
for {
|
||
select {
|
||
case <-s.stopChan:
|
||
s.logger.Info("渐进式初始化被中断")
|
||
return
|
||
default:
|
||
}
|
||
|
||
// 加载一批卡
|
||
var cards []*model.IotCard
|
||
err := s.db.WithContext(ctx).
|
||
Where("id > ?", lastID).
|
||
Order("id ASC").
|
||
Limit(config.InitBatchSize).
|
||
Find(&cards).Error
|
||
|
||
if err != nil {
|
||
s.logger.Error("加载卡数据失败", zap.Error(err))
|
||
s.setInitError(err.Error())
|
||
return
|
||
}
|
||
|
||
if len(cards) == 0 {
|
||
break
|
||
}
|
||
|
||
// 批量处理这批卡(使用 Pipeline 提高性能)
|
||
if err := s.initCardsBatch(ctx, cards); err != nil {
|
||
s.logger.Warn("批量初始化卡轮询失败", zap.Error(err))
|
||
}
|
||
|
||
lastID = cards[len(cards)-1].ID
|
||
batchCount++
|
||
|
||
s.initProgress.mu.Lock()
|
||
s.initProgress.LoadedCards += int64(len(cards))
|
||
s.initProgress.LastBatchTime = time.Now()
|
||
s.initProgress.mu.Unlock()
|
||
|
||
s.logger.Info("完成一批卡初始化",
|
||
zap.Int("batch", batchCount),
|
||
zap.Int("batch_size", len(cards)),
|
||
zap.Int64("loaded", s.initProgress.LoadedCards),
|
||
zap.Int64("total", totalCards))
|
||
|
||
// 批次间休眠,避免打爆数据库
|
||
time.Sleep(config.InitBatchSleepDuration)
|
||
}
|
||
|
||
s.initProgress.mu.Lock()
|
||
s.initProgress.Status = "completed"
|
||
s.initProgress.mu.Unlock()
|
||
s.initCompleted.Store(true)
|
||
|
||
s.logger.Info("渐进式初始化完成",
|
||
zap.Int64("total_loaded", s.initProgress.LoadedCards),
|
||
zap.Duration("duration", time.Since(s.initProgress.StartTime)))
|
||
}
|
||
|
||
// initCardsBatch 批量初始化卡的轮询
|
||
// 使用 Redis Pipeline 批量写入,大幅提高初始化性能
|
||
// 10万张卡从 ~60秒 优化到 ~5秒
|
||
func (s *Scheduler) initCardsBatch(ctx context.Context, cards []*model.IotCard) error {
|
||
if len(cards) == 0 {
|
||
return nil
|
||
}
|
||
|
||
config := DefaultSchedulerConfig()
|
||
now := time.Now()
|
||
pipe := s.redis.Pipeline()
|
||
|
||
for _, card := range cards {
|
||
// 匹配配置
|
||
cfg := s.MatchConfig(card)
|
||
if cfg == nil {
|
||
continue // 无匹配配置,不需要轮询
|
||
}
|
||
|
||
// 添加到相应的轮询队列
|
||
if cfg.RealnameCheckInterval != nil && *cfg.RealnameCheckInterval > 0 {
|
||
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *cfg.RealnameCheckInterval)
|
||
pipe.ZAdd(ctx, constants.RedisPollingQueueRealnameKey(), redis.Z{
|
||
Score: float64(nextCheck.Unix()),
|
||
Member: card.ID,
|
||
})
|
||
}
|
||
|
||
if cfg.CarddataCheckInterval != nil && *cfg.CarddataCheckInterval > 0 {
|
||
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.CarddataCheckInterval)
|
||
pipe.ZAdd(ctx, constants.RedisPollingQueueCarddataKey(), redis.Z{
|
||
Score: float64(nextCheck.Unix()),
|
||
Member: card.ID,
|
||
})
|
||
}
|
||
|
||
if cfg.PackageCheckInterval != nil && *cfg.PackageCheckInterval > 0 {
|
||
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *cfg.PackageCheckInterval)
|
||
pipe.ZAdd(ctx, constants.RedisPollingQueuePackageKey(), redis.Z{
|
||
Score: float64(nextCheck.Unix()),
|
||
Member: card.ID,
|
||
})
|
||
}
|
||
|
||
// 缓存卡信息到 Redis
|
||
cacheKey := constants.RedisPollingCardInfoKey(card.ID)
|
||
cacheData := map[string]interface{}{
|
||
"id": card.ID,
|
||
"iccid": card.ICCID,
|
||
"card_category": card.CardCategory,
|
||
"real_name_status": card.RealNameStatus,
|
||
"network_status": card.NetworkStatus,
|
||
"carrier_id": card.CarrierID,
|
||
"cached_at": now.Unix(),
|
||
}
|
||
pipe.HSet(ctx, cacheKey, cacheData)
|
||
pipe.Expire(ctx, cacheKey, config.CardCacheTTL)
|
||
}
|
||
|
||
// 执行 Pipeline
|
||
_, err := pipe.Exec(ctx)
|
||
return err
|
||
}
|
||
|
||
// initCardPolling 初始化单张卡的轮询(保留用于懒加载场景)
|
||
func (s *Scheduler) initCardPolling(ctx context.Context, card *model.IotCard) error {
|
||
// 匹配配置
|
||
config := s.MatchConfig(card)
|
||
if config == nil {
|
||
return nil // 无匹配配置,不需要轮询
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
// 添加到相应的轮询队列
|
||
if config.RealnameCheckInterval != nil && *config.RealnameCheckInterval > 0 {
|
||
nextCheck := s.calculateNextCheckTime(card.LastRealNameCheckAt, *config.RealnameCheckInterval)
|
||
if err := s.addToQueue(ctx, constants.RedisPollingQueueRealnameKey(), card.ID, nextCheck); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
if config.CarddataCheckInterval != nil && *config.CarddataCheckInterval > 0 {
|
||
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.CarddataCheckInterval)
|
||
if err := s.addToQueue(ctx, constants.RedisPollingQueueCarddataKey(), card.ID, nextCheck); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
if config.PackageCheckInterval != nil && *config.PackageCheckInterval > 0 {
|
||
// 套餐检查使用流量检查时间作为参考
|
||
nextCheck := s.calculateNextCheckTime(card.LastDataCheckAt, *config.PackageCheckInterval)
|
||
if err := s.addToQueue(ctx, constants.RedisPollingQueuePackageKey(), card.ID, nextCheck); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// 缓存卡信息到 Redis
|
||
return s.cacheCardInfo(ctx, card, now)
|
||
}
|
||
|
||
// MatchConfig 匹配轮询配置
|
||
// 按优先级返回第一个匹配的配置
|
||
func (s *Scheduler) MatchConfig(card *model.IotCard) *model.PollingConfig {
|
||
s.configCacheLock.RLock()
|
||
defer s.configCacheLock.RUnlock()
|
||
|
||
for _, cfg := range s.configCache {
|
||
if s.matchConfigConditions(cfg, card) {
|
||
return cfg
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// matchConfigConditions 检查卡是否匹配配置条件
|
||
func (s *Scheduler) matchConfigConditions(cfg *model.PollingConfig, card *model.IotCard) bool {
|
||
// 检查卡状态条件
|
||
if cfg.CardCondition != "" {
|
||
cardCondition := s.getCardCondition(card)
|
||
if cfg.CardCondition != cardCondition {
|
||
return false
|
||
}
|
||
}
|
||
|
||
// 检查卡业务类型
|
||
if cfg.CardCategory != "" {
|
||
if cfg.CardCategory != card.CardCategory {
|
||
return false
|
||
}
|
||
}
|
||
|
||
// 检查运营商
|
||
if cfg.CarrierID != nil {
|
||
if *cfg.CarrierID != card.CarrierID {
|
||
return false
|
||
}
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
// getCardCondition 获取卡的状态条件
|
||
func (s *Scheduler) getCardCondition(card *model.IotCard) string {
|
||
// 根据卡的实名状态和激活状态确定条件
|
||
if card.RealNameStatus == 0 || card.RealNameStatus == 1 {
|
||
return "not_real_name" // 未实名
|
||
}
|
||
if card.RealNameStatus == 2 {
|
||
if card.NetworkStatus == 1 {
|
||
return "activated" // 已激活
|
||
}
|
||
return "real_name" // 已实名但未激活
|
||
}
|
||
if card.NetworkStatus == 0 {
|
||
return "suspended" // 已停用
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// calculateNextCheckTime 计算下次检查时间
|
||
func (s *Scheduler) calculateNextCheckTime(lastCheckAt *time.Time, intervalSeconds int) time.Time {
|
||
now := time.Now()
|
||
|
||
if lastCheckAt == nil {
|
||
// 首次检查,立即执行(加上随机抖动避免集中)
|
||
jitter := time.Duration(now.UnixNano()%int64(intervalSeconds)) * time.Second / 10
|
||
return now.Add(jitter)
|
||
}
|
||
|
||
// 计算下次检查时间
|
||
nextCheck := lastCheckAt.Add(time.Duration(intervalSeconds) * time.Second)
|
||
if nextCheck.Before(now) {
|
||
// 如果已过期,立即执行
|
||
return now
|
||
}
|
||
|
||
return nextCheck
|
||
}
|
||
|
||
// addToQueue 添加卡到轮询队列
|
||
func (s *Scheduler) addToQueue(ctx context.Context, queueKey string, cardID uint, nextCheck time.Time) error {
|
||
score := float64(nextCheck.Unix())
|
||
member := formatUint(cardID)
|
||
|
||
return s.redis.ZAdd(ctx, queueKey, redis.Z{
|
||
Score: score,
|
||
Member: member,
|
||
}).Err()
|
||
}
|
||
|
||
// cacheCardInfo 缓存卡信息到 Redis
|
||
func (s *Scheduler) cacheCardInfo(ctx context.Context, card *model.IotCard, cachedAt time.Time) error {
|
||
key := constants.RedisPollingCardInfoKey(card.ID)
|
||
config := DefaultSchedulerConfig()
|
||
|
||
data := map[string]interface{}{
|
||
"id": card.ID,
|
||
"iccid": card.ICCID,
|
||
"card_category": card.CardCategory,
|
||
"real_name_status": card.RealNameStatus,
|
||
"network_status": card.NetworkStatus,
|
||
"carrier_id": card.CarrierID,
|
||
"cached_at": cachedAt.Unix(),
|
||
}
|
||
|
||
pipe := s.redis.Pipeline()
|
||
pipe.HSet(ctx, key, data)
|
||
pipe.Expire(ctx, key, config.CardCacheTTL)
|
||
_, err := pipe.Exec(ctx)
|
||
|
||
return err
|
||
}
|
||
|
||
// setInitError 设置初始化错误
|
||
func (s *Scheduler) setInitError(msg string) {
|
||
s.initProgress.mu.Lock()
|
||
s.initProgress.Status = "failed"
|
||
s.initProgress.ErrorMessage = msg
|
||
s.initProgress.mu.Unlock()
|
||
}
|
||
|
||
// GetInitProgress 获取初始化进度
|
||
func (s *Scheduler) GetInitProgress() InitProgress {
|
||
s.initProgress.mu.RLock()
|
||
defer s.initProgress.mu.RUnlock()
|
||
|
||
return InitProgress{
|
||
TotalCards: s.initProgress.TotalCards,
|
||
LoadedCards: s.initProgress.LoadedCards,
|
||
StartTime: s.initProgress.StartTime,
|
||
LastBatchTime: s.initProgress.LastBatchTime,
|
||
Status: s.initProgress.Status,
|
||
ErrorMessage: s.initProgress.ErrorMessage,
|
||
}
|
||
}
|
||
|
||
// IsInitCompleted 检查初始化是否完成
|
||
func (s *Scheduler) IsInitCompleted() bool {
|
||
return s.initCompleted.Load()
|
||
}
|
||
|
||
// RefreshConfigs 刷新配置缓存
|
||
func (s *Scheduler) RefreshConfigs(ctx context.Context) error {
|
||
return s.loadConfigs(ctx)
|
||
}
|
||
|
||
// SetResetService 设置流量重置服务(用于依赖注入)
|
||
func (s *Scheduler) SetResetService(resetService interface{}) {
|
||
if rs, ok := resetService.(*DataResetHandler); ok {
|
||
s.dataResetHandler = rs
|
||
}
|
||
}
|
||
|
||
// SetActivationService 设置套餐激活服务(用于依赖注入)
|
||
func (s *Scheduler) SetActivationService(activationHandler *PackageActivationHandler) {
|
||
s.packageActivationHandler = activationHandler
|
||
}
|