package admin import ( "fmt" "time" "github.com/go-playground/validator/v10" "github.com/gofiber/fiber/v2" "github.com/google/uuid" "github.com/hibiken/asynq" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/task" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/break/junhong_cmp_fiber/pkg/response" ) // TaskHandler 任务处理器 type TaskHandler struct { queueClient *queue.Client logger *zap.Logger validator *validator.Validate } // NewTaskHandler 创建任务处理器实例 func NewTaskHandler(queueClient *queue.Client, logger *zap.Logger) *TaskHandler { return &TaskHandler{ queueClient: queueClient, logger: logger, validator: validator.New(), } } // SubmitEmailTaskRequest 提交邮件任务请求 type SubmitEmailTaskRequest struct { To string `json:"to" validate:"required,email"` Subject string `json:"subject" validate:"required,min=1,max=200"` Body string `json:"body" validate:"required,min=1"` CC []string `json:"cc,omitempty" validate:"omitempty,dive,email"` Attachments []string `json:"attachments,omitempty"` RequestID string `json:"request_id,omitempty"` } // SubmitSyncTaskRequest 提交数据同步任务请求 type SubmitSyncTaskRequest struct { SyncType string `json:"sync_type" validate:"required,oneof=sim_status flow_usage real_name"` StartDate string `json:"start_date" validate:"required"` EndDate string `json:"end_date" validate:"required"` BatchSize int `json:"batch_size,omitempty" validate:"omitempty,min=1,max=1000"` RequestID string `json:"request_id,omitempty"` Priority string `json:"priority,omitempty" validate:"omitempty,oneof=critical default low"` } // TaskResponse 任务响应 type TaskResponse struct { TaskID string `json:"task_id"` Queue string `json:"queue"` Status string `json:"status"` } // SubmitEmailTask 提交邮件发送任务 // @Summary 提交邮件发送任务 // @Description 异步发送邮件 // @Tags 任务 // @Accept json // @Produce json // @Param request body SubmitEmailTaskRequest true "邮件任务参数" // @Success 200 {object} response.Response{data=TaskResponse} // @Failure 400 {object} response.Response // @Router /api/v1/tasks/email [post] func (h *TaskHandler) SubmitEmailTask(c *fiber.Ctx) error { var req SubmitEmailTaskRequest if err := c.BodyParser(&req); err != nil { h.logger.Warn("解析邮件任务请求失败", zap.Error(err)) return errors.New(errors.CodeInvalidParam, "请求参数格式错误") } // 验证参数 if err := h.validator.Struct(&req); err != nil { h.logger.Warn("邮件任务参数验证失败", zap.Error(err)) return errors.New(errors.CodeInvalidParam, err.Error()) } // 生成 RequestID(如果未提供) if req.RequestID == "" { req.RequestID = generateRequestID("email") } // 构造任务载荷 payload := &task.EmailPayload{ RequestID: req.RequestID, To: req.To, Subject: req.Subject, Body: req.Body, CC: req.CC, Attachments: req.Attachments, } // 提交任务到队列 err := h.queueClient.EnqueueTask( c.Context(), constants.TaskTypeEmailSend, payload, asynq.Queue(constants.QueueDefault), asynq.MaxRetry(constants.DefaultRetryMax), asynq.Timeout(constants.DefaultTimeout), ) if err != nil { h.logger.Error("提交邮件任务失败", zap.String("to", req.To), zap.String("request_id", req.RequestID), zap.Error(err)) return errors.New(errors.CodeInternalError, "任务提交失败") } h.logger.Info("邮件任务提交成功", zap.String("queue", constants.QueueDefault), zap.String("to", req.To), zap.String("request_id", req.RequestID)) return response.SuccessWithMessage(c, TaskResponse{ TaskID: req.RequestID, Queue: constants.QueueDefault, Status: "queued", }, "邮件任务已提交") } // SubmitSyncTask 提交数据同步任务 // @Summary 提交数据同步任务 // @Description 异步执行数据同步 // @Tags 任务 // @Accept json // @Produce json // @Param request body SubmitSyncTaskRequest true "同步任务参数" // @Success 200 {object} response.Response{data=TaskResponse} // @Failure 400 {object} response.Response // @Router /api/v1/tasks/sync [post] func (h *TaskHandler) SubmitSyncTask(c *fiber.Ctx) error { var req SubmitSyncTaskRequest if err := c.BodyParser(&req); err != nil { h.logger.Warn("解析同步任务请求失败", zap.Error(err)) return errors.New(errors.CodeInvalidParam, "请求参数格式错误") } // 验证参数 if err := h.validator.Struct(&req); err != nil { h.logger.Warn("同步任务参数验证失败", zap.Error(err)) return errors.New(errors.CodeInvalidParam, err.Error()) } // 生成 RequestID(如果未提供) if req.RequestID == "" { req.RequestID = generateRequestID("sync") } // 设置默认批量大小 if req.BatchSize == 0 { req.BatchSize = 100 } // 确定队列优先级 queueName := constants.QueueDefault if req.Priority == "critical" { queueName = constants.QueueCritical } else if req.Priority == "low" { queueName = constants.QueueLow } // 构造任务载荷 payload := &task.DataSyncPayload{ RequestID: req.RequestID, SyncType: req.SyncType, StartDate: req.StartDate, EndDate: req.EndDate, BatchSize: req.BatchSize, } // 提交任务到队列 err := h.queueClient.EnqueueTask( c.Context(), constants.TaskTypeDataSync, payload, asynq.Queue(queueName), asynq.MaxRetry(constants.DefaultRetryMax), asynq.Timeout(constants.DefaultTimeout), ) if err != nil { h.logger.Error("提交同步任务失败", zap.String("sync_type", req.SyncType), zap.String("request_id", req.RequestID), zap.Error(err)) return errors.New(errors.CodeInternalError, "任务提交失败") } h.logger.Info("同步任务提交成功", zap.String("queue", queueName), zap.String("sync_type", req.SyncType), zap.String("request_id", req.RequestID)) return response.SuccessWithMessage(c, TaskResponse{ TaskID: req.RequestID, Queue: queueName, Status: "queued", }, "同步任务已提交") } // generateRequestID 生成请求 ID func generateRequestID(prefix string) string { return fmt.Sprintf("%s-%s-%d", prefix, uuid.New().String(), time.Now().UnixNano()) }