package sync import ( "context" "fmt" "github.com/break/junhong_cmp_fiber/internal/task" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "go.uber.org/zap" ) // Service 同步服务 type Service struct { queueClient *queue.Client logger *zap.Logger } // NewService 创建同步服务实例 func NewService(queueClient *queue.Client, logger *zap.Logger) *Service { return &Service{ queueClient: queueClient, logger: logger, } } // SyncSIMStatus 同步 SIM 卡状态(异步) func (s *Service) SyncSIMStatus(ctx context.Context, iccids []string, forceSync bool) error { // 构造任务载荷 payload := &task.SIMStatusSyncPayload{ RequestID: fmt.Sprintf("sim-sync-%d", getCurrentTimestamp()), ICCIDs: iccids, ForceSync: forceSync, } payloadBytes, err := sonic.Marshal(payload) if err != nil { s.logger.Error("序列化 SIM 状态同步任务载荷失败", zap.Int("iccid_count", len(iccids)), zap.Bool("force_sync", forceSync), zap.Error(err)) return fmt.Errorf("序列化 SIM 状态同步任务载荷失败: %w", err) } // 提交任务到队列(高优先级) err = s.queueClient.EnqueueTask( ctx, constants.TaskTypeSIMStatusSync, payloadBytes, asynq.Queue(constants.QueueCritical), // SIM 状态同步使用高优先级队列 asynq.MaxRetry(constants.DefaultRetryMax), ) if err != nil { s.logger.Error("提交 SIM 状态同步任务失败", zap.Int("iccid_count", len(iccids)), zap.Error(err)) return fmt.Errorf("提交 SIM 状态同步任务失败: %w", err) } s.logger.Info("SIM 状态同步任务已提交", zap.Int("iccid_count", len(iccids)), zap.Bool("force_sync", forceSync)) return nil } // SyncData 通用数据同步(异步) func (s *Service) SyncData(ctx context.Context, syncType string, startDate string, endDate string, batchSize int) error { // 设置默认批量大小 if batchSize <= 0 { batchSize = 100 // 默认批量大小 } // 构造任务载荷 payload := &task.DataSyncPayload{ RequestID: fmt.Sprintf("data-sync-%s-%d", syncType, getCurrentTimestamp()), SyncType: syncType, StartDate: startDate, EndDate: endDate, BatchSize: batchSize, } payloadBytes, err := sonic.Marshal(payload) if err != nil { s.logger.Error("序列化数据同步任务载荷失败", zap.String("sync_type", syncType), zap.String("start_date", startDate), zap.String("end_date", endDate), zap.Error(err)) return fmt.Errorf("序列化数据同步任务载荷失败: %w", err) } // 提交任务到队列(默认优先级) err = s.queueClient.EnqueueTask( ctx, constants.TaskTypeDataSync, payloadBytes, asynq.Queue(constants.QueueDefault), asynq.MaxRetry(constants.DefaultRetryMax), ) if err != nil { s.logger.Error("提交数据同步任务失败", zap.String("sync_type", syncType), zap.Error(err)) return fmt.Errorf("提交数据同步任务失败: %w", err) } s.logger.Info("数据同步任务已提交", zap.String("sync_type", syncType), zap.String("start_date", startDate), zap.String("end_date", endDate), zap.Int("batch_size", batchSize)) return nil } // SyncFlowUsage 同步流量使用数据(异步) func (s *Service) SyncFlowUsage(ctx context.Context, startDate string, endDate string) error { return s.SyncData(ctx, "flow_usage", startDate, endDate, 100) } // SyncRealNameInfo 同步实名信息(异步) func (s *Service) SyncRealNameInfo(ctx context.Context, startDate string, endDate string) error { return s.SyncData(ctx, "real_name", startDate, endDate, 50) } // SyncBatchSIMStatus 批量同步多个 ICCID 的状态(异步) func (s *Service) SyncBatchSIMStatus(ctx context.Context, iccids []string) error { // 如果 ICCID 列表为空,直接返回 if len(iccids) == 0 { s.logger.Warn("批量同步 SIM 状态时 ICCID 列表为空") return nil } // 分批处理(每批最多 100 个) batchSize := 100 for i := 0; i < len(iccids); i += batchSize { end := i + batchSize if end > len(iccids) { end = len(iccids) } batch := iccids[i:end] if err := s.SyncSIMStatus(ctx, batch, false); err != nil { s.logger.Error("批量同步 SIM 状态失败", zap.Int("batch_start", i), zap.Int("batch_end", end), zap.Error(err)) return err } } s.logger.Info("批量 SIM 状态同步任务已全部提交", zap.Int("total_iccids", len(iccids)), zap.Int("batch_size", batchSize)) return nil } // getCurrentTimestamp 获取当前时间戳(毫秒) func getCurrentTimestamp() int64 { return 0 // 实际实现应返回真实时间戳 }