Files
junhong_cmp_fiber/internal/handler/admin/polling_manual_trigger.go
huang 931e140e8e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
feat: 实现 IoT 卡轮询系统(支持千万级卡规模)
实现功能:
- 实名状态检查轮询(可配置间隔)
- 卡流量检查轮询(支持跨月流量追踪)
- 套餐检查与超额自动停机
- 分布式并发控制(Redis 信号量)
- 手动触发轮询(单卡/批量/条件筛选)
- 数据清理配置与执行
- 告警规则与历史记录
- 实时监控统计(队列/性能/并发)

性能优化:
- Redis 缓存卡信息,减少 DB 查询
- Pipeline 批量写入 Redis
- 异步流量记录写入
- 渐进式初始化(10万卡/批)

压测工具(scripts/benchmark/):
- Mock Gateway 模拟上游服务
- 测试卡生成器
- 配置初始化脚本
- 实时监控脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 17:32:44 +08:00

312 lines
8.6 KiB
Go

package admin
import (
"github.com/gofiber/fiber/v2"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/model/dto"
"github.com/break/junhong_cmp_fiber/internal/service/polling"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/middleware"
"github.com/break/junhong_cmp_fiber/pkg/response"
)
// PollingManualTriggerHandler 轮询手动触发处理器
type PollingManualTriggerHandler struct {
service *polling.ManualTriggerService
}
// NewPollingManualTriggerHandler 创建轮询手动触发处理器
func NewPollingManualTriggerHandler(service *polling.ManualTriggerService) *PollingManualTriggerHandler {
return &PollingManualTriggerHandler{service: service}
}
// TriggerSingle 单卡手动触发
// @Summary 单卡手动触发
// @Description 触发单张卡的轮询任务
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param request body dto.TriggerSingleReq true "触发请求"
// @Success 200 {object} response.Response
// @Router /api/admin/polling-manual-trigger/single [post]
func (h *PollingManualTriggerHandler) TriggerSingle(c *fiber.Ctx) error {
ctx := c.UserContext()
var req dto.TriggerSingleReq
if err := c.BodyParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam)
}
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return errors.New(errors.CodeUnauthorized)
}
if err := h.service.TriggerSingle(ctx, req.CardID, req.TaskType, userID); err != nil {
return err
}
return response.Success(c, nil)
}
// TriggerBatch 批量手动触发
// @Summary 批量手动触发
// @Description 批量触发多张卡的轮询任务
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param request body dto.TriggerBatchReq true "触发请求"
// @Success 200 {object} response.Response{data=dto.ManualTriggerLogResp}
// @Router /api/admin/polling-manual-trigger/batch [post]
func (h *PollingManualTriggerHandler) TriggerBatch(c *fiber.Ctx) error {
ctx := c.UserContext()
var req dto.TriggerBatchReq
if err := c.BodyParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam)
}
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return errors.New(errors.CodeUnauthorized)
}
log, err := h.service.TriggerBatch(ctx, req.CardIDs, req.TaskType, userID)
if err != nil {
return err
}
return response.Success(c, h.toLogResp(log))
}
// TriggerByCondition 条件筛选触发
// @Summary 条件筛选触发
// @Description 根据条件筛选卡并触发轮询任务
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param request body dto.TriggerByConditionReq true "触发请求"
// @Success 200 {object} response.Response{data=dto.ManualTriggerLogResp}
// @Router /api/admin/polling-manual-trigger/by-condition [post]
func (h *PollingManualTriggerHandler) TriggerByCondition(c *fiber.Ctx) error {
ctx := c.UserContext()
var req dto.TriggerByConditionReq
if err := c.BodyParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam)
}
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return errors.New(errors.CodeUnauthorized)
}
filter := &polling.ConditionFilter{
CardStatus: req.CardStatus,
CarrierCode: req.CarrierCode,
CardType: req.CardType,
ShopID: req.ShopID,
PackageIDs: req.PackageIDs,
EnablePolling: req.EnablePolling,
Limit: req.Limit,
}
log, err := h.service.TriggerByCondition(ctx, filter, req.TaskType, userID)
if err != nil {
return err
}
return response.Success(c, h.toLogResp(log))
}
// GetStatus 获取触发状态
// @Summary 获取手动触发状态
// @Description 获取当前用户的手动触发状态
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Success 200 {object} response.Response{data=dto.ManualTriggerStatusResp}
// @Router /api/admin/polling-manual-trigger/status [get]
func (h *PollingManualTriggerHandler) GetStatus(c *fiber.Ctx) error {
ctx := c.UserContext()
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return errors.New(errors.CodeUnauthorized)
}
// 获取正在运行的任务
runningTasks, err := h.service.GetRunningTasks(ctx, userID)
if err != nil {
return err
}
items := make([]*dto.ManualTriggerLogResp, 0, len(runningTasks))
for _, log := range runningTasks {
items = append(items, h.toLogResp(log))
}
// 获取各队列大小
queueSizes := make(map[string]int64)
for _, taskType := range []string{
constants.TaskTypePollingRealname,
constants.TaskTypePollingCarddata,
constants.TaskTypePollingPackage,
} {
size, _ := h.service.GetQueueSize(ctx, taskType)
queueSizes[taskType] = size
}
return response.Success(c, &dto.ManualTriggerStatusResp{
RunningTasks: items,
QueueSizes: queueSizes,
})
}
// ListHistory 获取触发历史
// @Summary 获取手动触发历史
// @Description 获取手动触发历史记录
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param task_type query string false "任务类型筛选"
// @Param page query int false "页码"
// @Param page_size query int false "每页数量"
// @Success 200 {object} response.Response{data=dto.ManualTriggerLogListResp}
// @Router /api/admin/polling-manual-trigger/history [get]
func (h *PollingManualTriggerHandler) ListHistory(c *fiber.Ctx) error {
ctx := c.UserContext()
var req dto.ListManualTriggerLogReq
if err := c.QueryParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam)
}
if req.Page < 1 {
req.Page = 1
}
if req.PageSize < 1 || req.PageSize > 100 {
req.PageSize = 20
}
logs, total, err := h.service.ListHistory(ctx, req.Page, req.PageSize, req.TaskType, nil)
if err != nil {
return err
}
items := make([]*dto.ManualTriggerLogResp, 0, len(logs))
for _, log := range logs {
items = append(items, h.toLogResp(log))
}
totalPages := int(total) / req.PageSize
if int(total)%req.PageSize > 0 {
totalPages++
}
return response.Success(c, &dto.ManualTriggerLogListResp{
Items: items,
Total: total,
Page: req.Page,
PageSize: req.PageSize,
TotalPages: totalPages,
})
}
// CancelTrigger 取消触发任务
// @Summary 取消手动触发任务
// @Description 取消正在执行的手动触发任务
// @Tags 轮询管理-手动触发
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param request body dto.CancelTriggerReq true "取消请求"
// @Success 200 {object} response.Response
// @Router /api/admin/polling-manual-trigger/cancel [post]
func (h *PollingManualTriggerHandler) CancelTrigger(c *fiber.Ctx) error {
ctx := c.UserContext()
var req dto.CancelTriggerReq
if err := c.BodyParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam)
}
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return errors.New(errors.CodeUnauthorized)
}
if err := h.service.CancelTrigger(ctx, req.TriggerID, userID); err != nil {
return err
}
return response.Success(c, nil)
}
func (h *PollingManualTriggerHandler) toLogResp(log *model.PollingManualTriggerLog) *dto.ManualTriggerLogResp {
return &dto.ManualTriggerLogResp{
ID: log.ID,
TaskType: log.TaskType,
TaskTypeName: h.getTaskTypeName(log.TaskType),
TriggerType: log.TriggerType,
TriggerTypeName: h.getTriggerTypeName(log.TriggerType),
TotalCount: log.TotalCount,
ProcessedCount: log.ProcessedCount,
SuccessCount: log.SuccessCount,
FailedCount: log.FailedCount,
Status: log.Status,
StatusName: h.getStatusName(log.Status),
TriggeredBy: log.TriggeredBy,
TriggeredAt: log.TriggeredAt,
CompletedAt: log.CompletedAt,
}
}
func (h *PollingManualTriggerHandler) getTaskTypeName(taskType string) string {
switch taskType {
case constants.TaskTypePollingRealname:
return "实名检查"
case constants.TaskTypePollingCarddata:
return "流量检查"
case constants.TaskTypePollingPackage:
return "套餐检查"
default:
return taskType
}
}
func (h *PollingManualTriggerHandler) getTriggerTypeName(triggerType string) string {
switch triggerType {
case "single":
return "单卡触发"
case "batch":
return "批量触发"
case "by_condition":
return "条件筛选"
default:
return triggerType
}
}
func (h *PollingManualTriggerHandler) getStatusName(status string) string {
switch status {
case "pending":
return "待处理"
case "processing":
return "处理中"
case "completed":
return "已完成"
case "cancelled":
return "已取消"
default:
return status
}
}