package task import ( "context" "fmt" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "go.uber.org/zap" "gorm.io/gorm" ) // DataSyncPayload 数据同步任务载荷 type DataSyncPayload struct { RequestID string `json:"request_id"` SyncType string `json:"sync_type"` // sim_status, flow_usage, real_name StartDate string `json:"start_date"` // YYYY-MM-DD EndDate string `json:"end_date"` // YYYY-MM-DD BatchSize int `json:"batch_size"` // 批量大小 } // SyncHandler 数据同步任务处理器 type SyncHandler struct { db *gorm.DB logger *zap.Logger } // NewSyncHandler 创建数据同步任务处理器 func NewSyncHandler(db *gorm.DB, logger *zap.Logger) *SyncHandler { return &SyncHandler{ db: db, logger: logger, } } // HandleDataSync 处理数据同步任务 func (h *SyncHandler) HandleDataSync(ctx context.Context, task *asynq.Task) error { // 解析任务载荷 var payload DataSyncPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析数据同步任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } // 验证载荷 if err := h.validatePayload(&payload); err != nil { h.logger.Error("数据同步任务载荷验证失败", zap.Error(err), zap.String("request_id", payload.RequestID), ) return asynq.SkipRetry } // 设置默认批量大小 if payload.BatchSize <= 0 { payload.BatchSize = 100 } // 记录任务开始 h.logger.Info("开始处理数据同步任务", zap.String("request_id", payload.RequestID), zap.String("sync_type", payload.SyncType), zap.String("start_date", payload.StartDate), zap.String("end_date", payload.EndDate), zap.Int("batch_size", payload.BatchSize), ) // 执行数据同步 if err := h.syncData(ctx, &payload); err != nil { h.logger.Error("数据同步失败", zap.Error(err), zap.String("request_id", payload.RequestID), zap.String("sync_type", payload.SyncType), ) return err // 同步失败,可以重试 } // 记录任务完成 h.logger.Info("数据同步成功", zap.String("request_id", payload.RequestID), zap.String("sync_type", payload.SyncType), ) return nil } // validatePayload 验证数据同步载荷 func (h *SyncHandler) validatePayload(payload *DataSyncPayload) error { if payload.RequestID == "" { return fmt.Errorf("request_id 不能为空") } if payload.SyncType == "" { return fmt.Errorf("sync_type 不能为空") } validTypes := []string{"sim_status", "flow_usage", "real_name"} valid := false for _, t := range validTypes { if payload.SyncType == t { valid = true break } } if !valid { return fmt.Errorf("sync_type 无效,必须为 sim_status, flow_usage, real_name 之一") } if payload.StartDate == "" { return fmt.Errorf("start_date 不能为空") } if payload.EndDate == "" { return fmt.Errorf("end_date 不能为空") } return nil } // syncData 执行数据同步 func (h *SyncHandler) syncData(ctx context.Context, payload *DataSyncPayload) error { // TODO: 实际实现中需要调用外部 API 或数据源进行同步 // 模拟批量同步 totalRecords := 500 // 假设有 500 条记录需要同步 batches := (totalRecords + payload.BatchSize - 1) / payload.BatchSize for i := 0; i < batches; i++ { // 检查上下文是否已取消 select { case <-ctx.Done(): return ctx.Err() default: } // 模拟批量处理 offset := i * payload.BatchSize limit := payload.BatchSize if offset+limit > totalRecords { limit = totalRecords - offset } h.logger.Debug("同步批次", zap.String("sync_type", payload.SyncType), zap.Int("batch", i+1), zap.Int("total_batches", batches), zap.Int("offset", offset), zap.Int("limit", limit), ) // 模拟处理延迟 time.Sleep(200 * time.Millisecond) // TODO: 实际实现中需要: // 1. 从外部 API 获取数据 // 2. 使用事务批量更新数据库 // 3. 记录同步状态 } h.logger.Info("批量同步完成", zap.String("sync_type", payload.SyncType), zap.Int("total_records", totalRecords), zap.Int("batches", batches), ) return nil }