package admin import ( "github.com/gofiber/fiber/v2" "github.com/break/junhong_cmp_fiber/internal/model/dto" "github.com/break/junhong_cmp_fiber/internal/service/polling" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/middleware" "github.com/break/junhong_cmp_fiber/pkg/response" ) // PollingConcurrencyHandler 轮询并发控制处理器 type PollingConcurrencyHandler struct { service *polling.ConcurrencyService } // NewPollingConcurrencyHandler 创建轮询并发控制处理器 func NewPollingConcurrencyHandler(service *polling.ConcurrencyService) *PollingConcurrencyHandler { return &PollingConcurrencyHandler{service: service} } // List 获取所有并发配置 // @Summary 获取轮询并发配置列表 // @Description 获取所有轮询任务类型的并发配置及当前状态 // @Tags 轮询管理-并发控制 // @Accept json // @Produce json // @Security BearerAuth // @Success 200 {object} response.Response{data=dto.PollingConcurrencyListResp} // @Router /api/admin/polling-concurrency [get] func (h *PollingConcurrencyHandler) List(c *fiber.Ctx) error { ctx := c.UserContext() statuses, err := h.service.List(ctx) if err != nil { return err } items := make([]*dto.PollingConcurrencyResp, 0, len(statuses)) for _, s := range statuses { items = append(items, &dto.PollingConcurrencyResp{ TaskType: s.TaskType, TaskTypeName: s.TaskTypeName, MaxConcurrency: s.MaxConcurrency, Current: s.Current, Available: s.Available, Utilization: s.Utilization, }) } return response.Success(c, &dto.PollingConcurrencyListResp{Items: items}) } // Get 获取指定任务类型的并发配置 // @Summary 获取指定任务类型的并发配置 // @Description 获取指定轮询任务类型的并发配置及当前状态 // @Tags 轮询管理-并发控制 // @Accept json // @Produce json // @Security BearerAuth // @Param task_type path string true "任务类型" // @Success 200 {object} response.Response{data=dto.PollingConcurrencyResp} // @Router /api/admin/polling-concurrency/{task_type} [get] func (h *PollingConcurrencyHandler) Get(c *fiber.Ctx) error { ctx := c.UserContext() taskType := c.Params("task_type") if taskType == "" { return errors.New(errors.CodeInvalidParam, "任务类型不能为空") } status, err := h.service.GetByTaskType(ctx, taskType) if err != nil { return err } return response.Success(c, &dto.PollingConcurrencyResp{ TaskType: status.TaskType, TaskTypeName: status.TaskTypeName, MaxConcurrency: status.MaxConcurrency, Current: status.Current, Available: status.Available, Utilization: status.Utilization, }) } // Update 更新并发配置 // @Summary 更新轮询并发配置 // @Description 更新指定轮询任务类型的最大并发数 // @Tags 轮询管理-并发控制 // @Accept json // @Produce json // @Security BearerAuth // @Param task_type path string true "任务类型" // @Param request body dto.UpdatePollingConcurrencyReq true "更新请求" // @Success 200 {object} response.Response // @Router /api/admin/polling-concurrency/{task_type} [put] func (h *PollingConcurrencyHandler) Update(c *fiber.Ctx) error { ctx := c.UserContext() taskType := c.Params("task_type") if taskType == "" { return errors.New(errors.CodeInvalidParam, "任务类型不能为空") } var req dto.UpdatePollingConcurrencyReq if err := c.BodyParser(&req); err != nil { return errors.New(errors.CodeInvalidParam) } userID := middleware.GetUserIDFromContext(ctx) if err := h.service.UpdateMaxConcurrency(ctx, taskType, req.MaxConcurrency, userID); err != nil { return err } return response.Success(c, nil) } // Reset 重置并发计数 // @Summary 重置轮询并发计数 // @Description 重置指定轮询任务类型的当前并发计数为0(用于信号量修复) // @Tags 轮询管理-并发控制 // @Accept json // @Produce json // @Security BearerAuth // @Param request body dto.ResetPollingConcurrencyReq true "重置请求" // @Success 200 {object} response.Response // @Router /api/admin/polling-concurrency/reset [post] func (h *PollingConcurrencyHandler) Reset(c *fiber.Ctx) error { ctx := c.UserContext() var req dto.ResetPollingConcurrencyReq if err := c.BodyParser(&req); err != nil { return errors.New(errors.CodeInvalidParam) } if req.TaskType == "" { return errors.New(errors.CodeInvalidParam, "任务类型不能为空") } if err := h.service.ResetConcurrency(ctx, req.TaskType); err != nil { return err } return response.Success(c, nil) }