package task import ( "context" "fmt" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/pkg/constants" ) // SIMStatusSyncPayload SIM 卡状态同步任务载荷 type SIMStatusSyncPayload struct { RequestID string `json:"request_id"` ICCIDs []string `json:"iccids"` // ICCID 列表 ForceSync bool `json:"force_sync"` // 强制同步(忽略缓存) } // SIMHandler SIM 卡状态同步任务处理器 type SIMHandler struct { db *gorm.DB redis *redis.Client logger *zap.Logger } // NewSIMHandler 创建 SIM 卡状态同步任务处理器 func NewSIMHandler(db *gorm.DB, redis *redis.Client, logger *zap.Logger) *SIMHandler { return &SIMHandler{ db: db, redis: redis, logger: logger, } } // HandleSIMStatusSync 处理 SIM 卡状态同步任务 func (h *SIMHandler) HandleSIMStatusSync(ctx context.Context, task *asynq.Task) error { // 解析任务载荷 var payload SIMStatusSyncPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析 SIM 状态同步任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } // 验证载荷 if err := h.validatePayload(&payload); err != nil { h.logger.Error("SIM 状态同步任务载荷验证失败", zap.Error(err), zap.String("request_id", payload.RequestID), ) return asynq.SkipRetry } // 幂等性检查 lockKey := constants.RedisTaskLockKey(payload.RequestID) locked, err := h.acquireLock(ctx, lockKey) if err != nil { h.logger.Error("获取任务锁失败", zap.Error(err), zap.String("request_id", payload.RequestID), ) return err } if !locked { h.logger.Info("任务已执行,跳过(幂等性)", zap.String("request_id", payload.RequestID), zap.Int("iccid_count", len(payload.ICCIDs)), ) return nil } // 记录任务开始 h.logger.Info("开始处理 SIM 卡状态同步任务", zap.String("request_id", payload.RequestID), zap.Int("iccid_count", len(payload.ICCIDs)), zap.Bool("force_sync", payload.ForceSync), ) // 执行状态同步 if err := h.syncSIMStatus(ctx, &payload); err != nil { h.logger.Error("SIM 卡状态同步失败", zap.Error(err), zap.String("request_id", payload.RequestID), ) return err } // 记录任务完成 h.logger.Info("SIM 卡状态同步成功", zap.String("request_id", payload.RequestID), zap.Int("iccid_count", len(payload.ICCIDs)), ) return nil } // validatePayload 验证 SIM 状态同步载荷 func (h *SIMHandler) validatePayload(payload *SIMStatusSyncPayload) error { if payload.RequestID == "" { return fmt.Errorf("request_id 不能为空") } if len(payload.ICCIDs) == 0 { return fmt.Errorf("iccids 不能为空") } if len(payload.ICCIDs) > 1000 { return fmt.Errorf("单次同步 ICCID 数量不能超过 1000") } return nil } // acquireLock 获取 Redis 锁 func (h *SIMHandler) acquireLock(ctx context.Context, key string) (bool, error) { result, err := h.redis.SetNX(ctx, key, "1", 24*time.Hour).Result() if err != nil { return false, fmt.Errorf("设置 Redis 锁失败: %w", err) } return result, nil } // syncSIMStatus 执行 SIM 卡状态同步 func (h *SIMHandler) syncSIMStatus(ctx context.Context, payload *SIMStatusSyncPayload) error { // TODO: 实际实现中需要调用运营商 API 获取 SIM 卡状态 // 批量处理 ICCID batchSize := 100 for i := 0; i < len(payload.ICCIDs); i += batchSize { // 检查上下文是否已取消 select { case <-ctx.Done(): return ctx.Err() default: } end := i + batchSize if end > len(payload.ICCIDs) { end = len(payload.ICCIDs) } batch := payload.ICCIDs[i:end] h.logger.Debug("同步 SIM 卡状态批次", zap.Int("batch_start", i), zap.Int("batch_end", end), zap.Int("batch_size", len(batch)), ) // 模拟调用外部 API time.Sleep(200 * time.Millisecond) // TODO: 实际实现中需要: // 1. 调用运营商 API 获取状态 // 2. 使用事务批量更新数据库 // 3. 更新 Redis 缓存 // 4. 记录同步日志 } h.logger.Info("SIM 卡状态批量同步完成", zap.Int("total_iccids", len(payload.ICCIDs)), zap.Int("batch_size", batchSize), ) return nil }