1. 需求确认
2. 结构的一些变更
This commit is contained in:
164
internal/handler/admin/account.go
Normal file
164
internal/handler/admin/account.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/response"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
accountService "github.com/break/junhong_cmp_fiber/internal/service/account"
|
||||
)
|
||||
|
||||
// AccountHandler 账号 Handler
|
||||
type AccountHandler struct {
|
||||
service *accountService.Service
|
||||
}
|
||||
|
||||
// NewAccountHandler 创建账号 Handler
|
||||
func NewAccountHandler(service *accountService.Service) *AccountHandler {
|
||||
return &AccountHandler{service: service}
|
||||
}
|
||||
|
||||
// Create 创建账号
|
||||
// POST /api/v1/accounts
|
||||
func (h *AccountHandler) Create(c *fiber.Ctx) error {
|
||||
var req model.CreateAccountRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
account, err := h.service.Create(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, account)
|
||||
}
|
||||
|
||||
// Get 获取账号详情
|
||||
// GET /api/v1/accounts/:id
|
||||
func (h *AccountHandler) Get(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
account, err := h.service.Get(c.UserContext(), uint(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, account)
|
||||
}
|
||||
|
||||
// Update 更新账号
|
||||
// PUT /api/v1/accounts/:id
|
||||
func (h *AccountHandler) Update(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
var req model.UpdateAccountRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
account, err := h.service.Update(c.UserContext(), uint(id), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, account)
|
||||
}
|
||||
|
||||
// Delete 删除账号
|
||||
// DELETE /api/v1/accounts/:id
|
||||
func (h *AccountHandler) Delete(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
if err := h.service.Delete(c.UserContext(), uint(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, nil)
|
||||
}
|
||||
|
||||
// List 查询账号列表
|
||||
// GET /api/v1/accounts
|
||||
func (h *AccountHandler) List(c *fiber.Ctx) error {
|
||||
var req model.AccountListRequest
|
||||
if err := c.QueryParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
accounts, total, err := h.service.List(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.SuccessWithPagination(c, accounts, total, req.Page, req.PageSize)
|
||||
}
|
||||
|
||||
// AssignRoles 为账号分配角色
|
||||
// POST /api/v1/accounts/:id/roles
|
||||
func (h *AccountHandler) AssignRoles(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
var req model.AssignRolesRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
ars, err := h.service.AssignRoles(c.UserContext(), uint(id), req.RoleIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, ars)
|
||||
}
|
||||
|
||||
// GetRoles 获取账号的所有角色
|
||||
// GET /api/v1/accounts/:id/roles
|
||||
func (h *AccountHandler) GetRoles(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
roles, err := h.service.GetRoles(c.UserContext(), uint(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, roles)
|
||||
}
|
||||
|
||||
// RemoveRole 移除账号的角色
|
||||
// DELETE /api/v1/accounts/:account_id/roles/:role_id
|
||||
func (h *AccountHandler) RemoveRole(c *fiber.Ctx) error {
|
||||
accountID, err := strconv.ParseUint(c.Params("account_id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的账号 ID")
|
||||
}
|
||||
|
||||
roleID, err := strconv.ParseUint(c.Params("role_id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
if err := h.service.RemoveRole(c.UserContext(), uint(accountID), uint(roleID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, nil)
|
||||
}
|
||||
118
internal/handler/admin/permission.go
Normal file
118
internal/handler/admin/permission.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/response"
|
||||
|
||||
permissionService "github.com/break/junhong_cmp_fiber/internal/service/permission"
|
||||
)
|
||||
|
||||
// PermissionHandler 权限 Handler
|
||||
type PermissionHandler struct {
|
||||
service *permissionService.Service
|
||||
}
|
||||
|
||||
// NewPermissionHandler 创建权限 Handler
|
||||
func NewPermissionHandler(service *permissionService.Service) *PermissionHandler {
|
||||
return &PermissionHandler{service: service}
|
||||
}
|
||||
|
||||
// Create 创建权限
|
||||
// POST /api/v1/permissions
|
||||
func (h *PermissionHandler) Create(c *fiber.Ctx) error {
|
||||
var req model.CreatePermissionRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
permission, err := h.service.Create(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, permission)
|
||||
}
|
||||
|
||||
// Get 获取权限详情
|
||||
// GET /api/v1/permissions/:id
|
||||
func (h *PermissionHandler) Get(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的权限 ID")
|
||||
}
|
||||
|
||||
permission, err := h.service.Get(c.UserContext(), uint(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, permission)
|
||||
}
|
||||
|
||||
// Update 更新权限
|
||||
// PUT /api/v1/permissions/:id
|
||||
func (h *PermissionHandler) Update(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的权限 ID")
|
||||
}
|
||||
|
||||
var req model.UpdatePermissionRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
permission, err := h.service.Update(c.UserContext(), uint(id), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, permission)
|
||||
}
|
||||
|
||||
// Delete 删除权限
|
||||
// DELETE /api/v1/permissions/:id
|
||||
func (h *PermissionHandler) Delete(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的权限 ID")
|
||||
}
|
||||
|
||||
if err := h.service.Delete(c.UserContext(), uint(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, nil)
|
||||
}
|
||||
|
||||
// List 查询权限列表
|
||||
// GET /api/v1/permissions
|
||||
func (h *PermissionHandler) List(c *fiber.Ctx) error {
|
||||
var req model.PermissionListRequest
|
||||
if err := c.QueryParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
permissions, total, err := h.service.List(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.SuccessWithPagination(c, permissions, total, req.Page, req.PageSize)
|
||||
}
|
||||
|
||||
// GetTree 获取权限树
|
||||
// GET /api/v1/permissions/tree
|
||||
func (h *PermissionHandler) GetTree(c *fiber.Ctx) error {
|
||||
tree, err := h.service.GetTree(c.UserContext())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, tree)
|
||||
}
|
||||
164
internal/handler/admin/role.go
Normal file
164
internal/handler/admin/role.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/response"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
roleService "github.com/break/junhong_cmp_fiber/internal/service/role"
|
||||
)
|
||||
|
||||
// RoleHandler 角色 Handler
|
||||
type RoleHandler struct {
|
||||
service *roleService.Service
|
||||
}
|
||||
|
||||
// NewRoleHandler 创建角色 Handler
|
||||
func NewRoleHandler(service *roleService.Service) *RoleHandler {
|
||||
return &RoleHandler{service: service}
|
||||
}
|
||||
|
||||
// Create 创建角色
|
||||
// POST /api/v1/roles
|
||||
func (h *RoleHandler) Create(c *fiber.Ctx) error {
|
||||
var req model.CreateRoleRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
role, err := h.service.Create(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, role)
|
||||
}
|
||||
|
||||
// Get 获取角色详情
|
||||
// GET /api/v1/roles/:id
|
||||
func (h *RoleHandler) Get(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
role, err := h.service.Get(c.UserContext(), uint(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, role)
|
||||
}
|
||||
|
||||
// Update 更新角色
|
||||
// PUT /api/v1/roles/:id
|
||||
func (h *RoleHandler) Update(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
var req model.UpdateRoleRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
role, err := h.service.Update(c.UserContext(), uint(id), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, role)
|
||||
}
|
||||
|
||||
// Delete 删除角色
|
||||
// DELETE /api/v1/roles/:id
|
||||
func (h *RoleHandler) Delete(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
if err := h.service.Delete(c.UserContext(), uint(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, nil)
|
||||
}
|
||||
|
||||
// List 查询角色列表
|
||||
// GET /api/v1/roles
|
||||
func (h *RoleHandler) List(c *fiber.Ctx) error {
|
||||
var req model.RoleListRequest
|
||||
if err := c.QueryParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
roles, total, err := h.service.List(c.UserContext(), &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.SuccessWithPagination(c, roles, total, req.Page, req.PageSize)
|
||||
}
|
||||
|
||||
// AssignPermissions 为角色分配权限
|
||||
// POST /api/v1/roles/:id/permissions
|
||||
func (h *RoleHandler) AssignPermissions(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
var req model.AssignPermissionsRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
|
||||
}
|
||||
|
||||
rps, err := h.service.AssignPermissions(c.UserContext(), uint(id), req.PermIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, rps)
|
||||
}
|
||||
|
||||
// GetPermissions 获取角色的所有权限
|
||||
// GET /api/v1/roles/:id/permissions
|
||||
func (h *RoleHandler) GetPermissions(c *fiber.Ctx) error {
|
||||
id, err := strconv.ParseUint(c.Params("id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
permissions, err := h.service.GetPermissions(c.UserContext(), uint(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, permissions)
|
||||
}
|
||||
|
||||
// RemovePermission 移除角色的权限
|
||||
// DELETE /api/v1/roles/:role_id/permissions/:perm_id
|
||||
func (h *RoleHandler) RemovePermission(c *fiber.Ctx) error {
|
||||
roleID, err := strconv.ParseUint(c.Params("role_id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的角色 ID")
|
||||
}
|
||||
|
||||
permID, err := strconv.ParseUint(c.Params("perm_id"), 10, 64)
|
||||
if err != nil {
|
||||
return errors.New(errors.CodeInvalidParam, "无效的权限 ID")
|
||||
}
|
||||
|
||||
if err := h.service.RemovePermission(c.UserContext(), uint(roleID), uint(permID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return response.Success(c, nil)
|
||||
}
|
||||
216
internal/handler/admin/task.go
Normal file
216
internal/handler/admin/task.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/task"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/queue"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/response"
|
||||
)
|
||||
|
||||
// TaskHandler 任务处理器
|
||||
type TaskHandler struct {
|
||||
queueClient *queue.Client
|
||||
logger *zap.Logger
|
||||
validator *validator.Validate
|
||||
}
|
||||
|
||||
// NewTaskHandler 创建任务处理器实例
|
||||
func NewTaskHandler(queueClient *queue.Client, logger *zap.Logger) *TaskHandler {
|
||||
return &TaskHandler{
|
||||
queueClient: queueClient,
|
||||
logger: logger,
|
||||
validator: validator.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitEmailTaskRequest 提交邮件任务请求
|
||||
type SubmitEmailTaskRequest struct {
|
||||
To string `json:"to" validate:"required,email"`
|
||||
Subject string `json:"subject" validate:"required,min=1,max=200"`
|
||||
Body string `json:"body" validate:"required,min=1"`
|
||||
CC []string `json:"cc,omitempty" validate:"omitempty,dive,email"`
|
||||
Attachments []string `json:"attachments,omitempty"`
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
}
|
||||
|
||||
// SubmitSyncTaskRequest 提交数据同步任务请求
|
||||
type SubmitSyncTaskRequest struct {
|
||||
SyncType string `json:"sync_type" validate:"required,oneof=sim_status flow_usage real_name"`
|
||||
StartDate string `json:"start_date" validate:"required"`
|
||||
EndDate string `json:"end_date" validate:"required"`
|
||||
BatchSize int `json:"batch_size,omitempty" validate:"omitempty,min=1,max=1000"`
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
Priority string `json:"priority,omitempty" validate:"omitempty,oneof=critical default low"`
|
||||
}
|
||||
|
||||
// TaskResponse 任务响应
|
||||
type TaskResponse struct {
|
||||
TaskID string `json:"task_id"`
|
||||
Queue string `json:"queue"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// SubmitEmailTask 提交邮件发送任务
|
||||
// @Summary 提交邮件发送任务
|
||||
// @Description 异步发送邮件
|
||||
// @Tags 任务
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param request body SubmitEmailTaskRequest true "邮件任务参数"
|
||||
// @Success 200 {object} response.Response{data=TaskResponse}
|
||||
// @Failure 400 {object} response.Response
|
||||
// @Router /api/v1/tasks/email [post]
|
||||
func (h *TaskHandler) SubmitEmailTask(c *fiber.Ctx) error {
|
||||
var req SubmitEmailTaskRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
h.logger.Warn("解析邮件任务请求失败",
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数格式错误")
|
||||
}
|
||||
|
||||
// 验证参数
|
||||
if err := h.validator.Struct(&req); err != nil {
|
||||
h.logger.Warn("邮件任务参数验证失败",
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInvalidParam, err.Error())
|
||||
}
|
||||
|
||||
// 生成 RequestID(如果未提供)
|
||||
if req.RequestID == "" {
|
||||
req.RequestID = generateRequestID("email")
|
||||
}
|
||||
|
||||
// 构造任务载荷
|
||||
payload := &task.EmailPayload{
|
||||
RequestID: req.RequestID,
|
||||
To: req.To,
|
||||
Subject: req.Subject,
|
||||
Body: req.Body,
|
||||
CC: req.CC,
|
||||
Attachments: req.Attachments,
|
||||
}
|
||||
|
||||
// 提交任务到队列
|
||||
err := h.queueClient.EnqueueTask(
|
||||
c.Context(),
|
||||
constants.TaskTypeEmailSend,
|
||||
payload,
|
||||
asynq.Queue(constants.QueueDefault),
|
||||
asynq.MaxRetry(constants.DefaultRetryMax),
|
||||
asynq.Timeout(constants.DefaultTimeout),
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Error("提交邮件任务失败",
|
||||
zap.String("to", req.To),
|
||||
zap.String("request_id", req.RequestID),
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInternalError, "任务提交失败")
|
||||
}
|
||||
|
||||
h.logger.Info("邮件任务提交成功",
|
||||
zap.String("queue", constants.QueueDefault),
|
||||
zap.String("to", req.To),
|
||||
zap.String("request_id", req.RequestID))
|
||||
|
||||
return response.SuccessWithMessage(c, TaskResponse{
|
||||
TaskID: req.RequestID,
|
||||
Queue: constants.QueueDefault,
|
||||
Status: "queued",
|
||||
}, "邮件任务已提交")
|
||||
}
|
||||
|
||||
// SubmitSyncTask 提交数据同步任务
|
||||
// @Summary 提交数据同步任务
|
||||
// @Description 异步执行数据同步
|
||||
// @Tags 任务
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param request body SubmitSyncTaskRequest true "同步任务参数"
|
||||
// @Success 200 {object} response.Response{data=TaskResponse}
|
||||
// @Failure 400 {object} response.Response
|
||||
// @Router /api/v1/tasks/sync [post]
|
||||
func (h *TaskHandler) SubmitSyncTask(c *fiber.Ctx) error {
|
||||
var req SubmitSyncTaskRequest
|
||||
if err := c.BodyParser(&req); err != nil {
|
||||
h.logger.Warn("解析同步任务请求失败",
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInvalidParam, "请求参数格式错误")
|
||||
}
|
||||
|
||||
// 验证参数
|
||||
if err := h.validator.Struct(&req); err != nil {
|
||||
h.logger.Warn("同步任务参数验证失败",
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInvalidParam, err.Error())
|
||||
}
|
||||
|
||||
// 生成 RequestID(如果未提供)
|
||||
if req.RequestID == "" {
|
||||
req.RequestID = generateRequestID("sync")
|
||||
}
|
||||
|
||||
// 设置默认批量大小
|
||||
if req.BatchSize == 0 {
|
||||
req.BatchSize = 100
|
||||
}
|
||||
|
||||
// 确定队列优先级
|
||||
queueName := constants.QueueDefault
|
||||
if req.Priority == "critical" {
|
||||
queueName = constants.QueueCritical
|
||||
} else if req.Priority == "low" {
|
||||
queueName = constants.QueueLow
|
||||
}
|
||||
|
||||
// 构造任务载荷
|
||||
payload := &task.DataSyncPayload{
|
||||
RequestID: req.RequestID,
|
||||
SyncType: req.SyncType,
|
||||
StartDate: req.StartDate,
|
||||
EndDate: req.EndDate,
|
||||
BatchSize: req.BatchSize,
|
||||
}
|
||||
|
||||
// 提交任务到队列
|
||||
err := h.queueClient.EnqueueTask(
|
||||
c.Context(),
|
||||
constants.TaskTypeDataSync,
|
||||
payload,
|
||||
asynq.Queue(queueName),
|
||||
asynq.MaxRetry(constants.DefaultRetryMax),
|
||||
asynq.Timeout(constants.DefaultTimeout),
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Error("提交同步任务失败",
|
||||
zap.String("sync_type", req.SyncType),
|
||||
zap.String("request_id", req.RequestID),
|
||||
zap.Error(err))
|
||||
return errors.New(errors.CodeInternalError, "任务提交失败")
|
||||
}
|
||||
|
||||
h.logger.Info("同步任务提交成功",
|
||||
zap.String("queue", queueName),
|
||||
zap.String("sync_type", req.SyncType),
|
||||
zap.String("request_id", req.RequestID))
|
||||
|
||||
return response.SuccessWithMessage(c, TaskResponse{
|
||||
TaskID: req.RequestID,
|
||||
Queue: queueName,
|
||||
Status: "queued",
|
||||
}, "同步任务已提交")
|
||||
}
|
||||
|
||||
// generateRequestID 生成请求 ID
|
||||
func generateRequestID(prefix string) string {
|
||||
return fmt.Sprintf("%s-%s-%d", prefix, uuid.New().String(), time.Now().UnixNano())
|
||||
}
|
||||
Reference in New Issue
Block a user