Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
217 lines
6.2 KiB
Go
217 lines
6.2 KiB
Go
package handler
|
||
|
||
import (
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/go-playground/validator/v10"
|
||
"github.com/gofiber/fiber/v2"
|
||
"github.com/google/uuid"
|
||
"github.com/hibiken/asynq"
|
||
"go.uber.org/zap"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/task"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||
"github.com/break/junhong_cmp_fiber/pkg/queue"
|
||
"github.com/break/junhong_cmp_fiber/pkg/response"
|
||
)
|
||
|
||
// TaskHandler 任务处理器
|
||
type TaskHandler struct {
|
||
queueClient *queue.Client
|
||
logger *zap.Logger
|
||
validator *validator.Validate
|
||
}
|
||
|
||
// NewTaskHandler 创建任务处理器实例
|
||
func NewTaskHandler(queueClient *queue.Client, logger *zap.Logger) *TaskHandler {
|
||
return &TaskHandler{
|
||
queueClient: queueClient,
|
||
logger: logger,
|
||
validator: validator.New(),
|
||
}
|
||
}
|
||
|
||
// SubmitEmailTaskRequest 提交邮件任务请求
|
||
type SubmitEmailTaskRequest struct {
|
||
To string `json:"to" validate:"required,email"`
|
||
Subject string `json:"subject" validate:"required,min=1,max=200"`
|
||
Body string `json:"body" validate:"required,min=1"`
|
||
CC []string `json:"cc,omitempty" validate:"omitempty,dive,email"`
|
||
Attachments []string `json:"attachments,omitempty"`
|
||
RequestID string `json:"request_id,omitempty"`
|
||
}
|
||
|
||
// SubmitSyncTaskRequest 提交数据同步任务请求
|
||
type SubmitSyncTaskRequest struct {
|
||
SyncType string `json:"sync_type" validate:"required,oneof=sim_status flow_usage real_name"`
|
||
StartDate string `json:"start_date" validate:"required"`
|
||
EndDate string `json:"end_date" validate:"required"`
|
||
BatchSize int `json:"batch_size,omitempty" validate:"omitempty,min=1,max=1000"`
|
||
RequestID string `json:"request_id,omitempty"`
|
||
Priority string `json:"priority,omitempty" validate:"omitempty,oneof=critical default low"`
|
||
}
|
||
|
||
// TaskResponse 任务响应
|
||
type TaskResponse struct {
|
||
TaskID string `json:"task_id"`
|
||
Queue string `json:"queue"`
|
||
Status string `json:"status"`
|
||
}
|
||
|
||
// SubmitEmailTask 提交邮件发送任务
|
||
// @Summary 提交邮件发送任务
|
||
// @Description 异步发送邮件
|
||
// @Tags 任务
|
||
// @Accept json
|
||
// @Produce json
|
||
// @Param request body SubmitEmailTaskRequest true "邮件任务参数"
|
||
// @Success 200 {object} response.Response{data=TaskResponse}
|
||
// @Failure 400 {object} response.Response
|
||
// @Router /api/v1/tasks/email [post]
|
||
func (h *TaskHandler) SubmitEmailTask(c *fiber.Ctx) error {
|
||
var req SubmitEmailTaskRequest
|
||
if err := c.BodyParser(&req); err != nil {
|
||
h.logger.Warn("解析邮件任务请求失败",
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInvalidParam, "请求参数格式错误")
|
||
}
|
||
|
||
// 验证参数
|
||
if err := h.validator.Struct(&req); err != nil {
|
||
h.logger.Warn("邮件任务参数验证失败",
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInvalidParam, err.Error())
|
||
}
|
||
|
||
// 生成 RequestID(如果未提供)
|
||
if req.RequestID == "" {
|
||
req.RequestID = generateRequestID("email")
|
||
}
|
||
|
||
// 构造任务载荷
|
||
payload := &task.EmailPayload{
|
||
RequestID: req.RequestID,
|
||
To: req.To,
|
||
Subject: req.Subject,
|
||
Body: req.Body,
|
||
CC: req.CC,
|
||
Attachments: req.Attachments,
|
||
}
|
||
|
||
// 提交任务到队列
|
||
err := h.queueClient.EnqueueTask(
|
||
c.Context(),
|
||
constants.TaskTypeEmailSend,
|
||
payload,
|
||
asynq.Queue(constants.QueueDefault),
|
||
asynq.MaxRetry(constants.DefaultRetryMax),
|
||
asynq.Timeout(constants.DefaultTimeout),
|
||
)
|
||
if err != nil {
|
||
h.logger.Error("提交邮件任务失败",
|
||
zap.String("to", req.To),
|
||
zap.String("request_id", req.RequestID),
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInternalError, "任务提交失败")
|
||
}
|
||
|
||
h.logger.Info("邮件任务提交成功",
|
||
zap.String("queue", constants.QueueDefault),
|
||
zap.String("to", req.To),
|
||
zap.String("request_id", req.RequestID))
|
||
|
||
return response.SuccessWithMessage(c, TaskResponse{
|
||
TaskID: req.RequestID,
|
||
Queue: constants.QueueDefault,
|
||
Status: "queued",
|
||
}, "邮件任务已提交")
|
||
}
|
||
|
||
// SubmitSyncTask 提交数据同步任务
|
||
// @Summary 提交数据同步任务
|
||
// @Description 异步执行数据同步
|
||
// @Tags 任务
|
||
// @Accept json
|
||
// @Produce json
|
||
// @Param request body SubmitSyncTaskRequest true "同步任务参数"
|
||
// @Success 200 {object} response.Response{data=TaskResponse}
|
||
// @Failure 400 {object} response.Response
|
||
// @Router /api/v1/tasks/sync [post]
|
||
func (h *TaskHandler) SubmitSyncTask(c *fiber.Ctx) error {
|
||
var req SubmitSyncTaskRequest
|
||
if err := c.BodyParser(&req); err != nil {
|
||
h.logger.Warn("解析同步任务请求失败",
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInvalidParam, "请求参数格式错误")
|
||
}
|
||
|
||
// 验证参数
|
||
if err := h.validator.Struct(&req); err != nil {
|
||
h.logger.Warn("同步任务参数验证失败",
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInvalidParam, err.Error())
|
||
}
|
||
|
||
// 生成 RequestID(如果未提供)
|
||
if req.RequestID == "" {
|
||
req.RequestID = generateRequestID("sync")
|
||
}
|
||
|
||
// 设置默认批量大小
|
||
if req.BatchSize == 0 {
|
||
req.BatchSize = 100
|
||
}
|
||
|
||
// 确定队列优先级
|
||
queueName := constants.QueueDefault
|
||
if req.Priority == "critical" {
|
||
queueName = constants.QueueCritical
|
||
} else if req.Priority == "low" {
|
||
queueName = constants.QueueLow
|
||
}
|
||
|
||
// 构造任务载荷
|
||
payload := &task.DataSyncPayload{
|
||
RequestID: req.RequestID,
|
||
SyncType: req.SyncType,
|
||
StartDate: req.StartDate,
|
||
EndDate: req.EndDate,
|
||
BatchSize: req.BatchSize,
|
||
}
|
||
|
||
// 提交任务到队列
|
||
err := h.queueClient.EnqueueTask(
|
||
c.Context(),
|
||
constants.TaskTypeDataSync,
|
||
payload,
|
||
asynq.Queue(queueName),
|
||
asynq.MaxRetry(constants.DefaultRetryMax),
|
||
asynq.Timeout(constants.DefaultTimeout),
|
||
)
|
||
if err != nil {
|
||
h.logger.Error("提交同步任务失败",
|
||
zap.String("sync_type", req.SyncType),
|
||
zap.String("request_id", req.RequestID),
|
||
zap.Error(err))
|
||
return errors.New(errors.CodeInternalError, "任务提交失败")
|
||
}
|
||
|
||
h.logger.Info("同步任务提交成功",
|
||
zap.String("queue", queueName),
|
||
zap.String("sync_type", req.SyncType),
|
||
zap.String("request_id", req.RequestID))
|
||
|
||
return response.SuccessWithMessage(c, TaskResponse{
|
||
TaskID: req.RequestID,
|
||
Queue: queueName,
|
||
Status: "queued",
|
||
}, "同步任务已提交")
|
||
}
|
||
|
||
// generateRequestID 生成请求 ID
|
||
func generateRequestID(prefix string) string {
|
||
return fmt.Sprintf("%s-%s-%d", prefix, uuid.New().String(), time.Now().UnixNano())
|
||
}
|