Files
junhong_cmp_fiber/pkg/queue/handler.go
huang c665f32976
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m54s
feat: 套餐系统升级 - Worker 重构、流量重置、文档与规范更新
- 重构 Worker 启动流程,引入 bootstrap 模块统一管理依赖注入
- 实现套餐流量重置服务(日/月/年周期重置)
- 新增套餐激活排队、加油包绑定、囤货待实名激活逻辑
- 新增订单创建幂等性防重(Redis 业务键 + 分布式锁)
- 更新 AGENTS.md/CLAUDE.md:新增注释规范、幂等性规范,移除测试要求
- 添加套餐系统升级完整文档(API文档、使用指南、功能总结、运维指南)
- 归档 OpenSpec package-system-upgrade 变更,同步 specs 到主目录
- 新增 queue types 抽象和 Redis 常量定义
2026-02-12 14:24:15 +08:00

186 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queue
import (
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/gateway"
"github.com/break/junhong_cmp_fiber/internal/polling"
"github.com/break/junhong_cmp_fiber/internal/task"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/storage"
)
type Handler struct {
mux *asynq.ServeMux
logger *zap.Logger
db *gorm.DB
redis *redis.Client
storage *storage.Service
gatewayClient *gateway.Client
pollingCallback task.PollingCallback
workerResult *WorkerBootstrapResult
asynqClient *asynq.Client
}
func NewHandler(
db *gorm.DB,
redis *redis.Client,
storageSvc *storage.Service,
gatewayClient *gateway.Client,
pollingCallback task.PollingCallback,
workerResult *WorkerBootstrapResult,
asynqClient *asynq.Client,
logger *zap.Logger,
) *Handler {
return &Handler{
mux: asynq.NewServeMux(),
logger: logger,
db: db,
redis: redis,
storage: storageSvc,
gatewayClient: gatewayClient,
pollingCallback: pollingCallback,
workerResult: workerResult,
asynqClient: asynqClient,
}
}
func (h *Handler) RegisterHandlers() *asynq.ServeMux {
emailHandler := task.NewEmailHandler(h.redis, h.logger)
syncHandler := task.NewSyncHandler(h.db, h.logger)
simHandler := task.NewSIMHandler(h.db, h.redis, h.logger)
h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend)
h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend))
h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync)
h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync))
h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync)
h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync))
h.registerIotCardImportHandler()
h.registerDeviceImportHandler()
h.registerCommissionStatsHandlers()
h.registerCommissionCalculationHandler()
h.registerPollingHandlers()
h.registerPackageActivationHandlers()
h.logger.Info("所有任务处理器注册完成")
return h.mux
}
func (h *Handler) registerIotCardImportHandler() {
iotCardImportHandler := task.NewIotCardImportHandler(
h.db,
h.redis,
h.workerResult.Stores.IotCardImportTask,
h.workerResult.Stores.IotCard,
h.storage,
h.pollingCallback,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport)
h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport))
}
func (h *Handler) registerDeviceImportHandler() {
deviceImportHandler := task.NewDeviceImportHandler(
h.db,
h.redis,
h.workerResult.Stores.DeviceImportTask,
h.workerResult.Stores.Device,
h.workerResult.Stores.DeviceSimBinding,
h.workerResult.Stores.IotCard,
h.storage,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport)
h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport))
}
func (h *Handler) registerCommissionStatsHandlers() {
updateHandler := task.NewCommissionStatsUpdateHandler(
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.workerResult.Stores.ShopPackageAllocation,
h.logger,
)
syncHandler := task.NewCommissionStatsSyncHandler(
h.db,
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.logger,
)
archiveHandler := task.NewCommissionStatsArchiveHandler(
h.db,
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate)
h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync)
h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive)
h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive))
}
func (h *Handler) registerCommissionCalculationHandler() {
commissionCalculationHandler := task.NewCommissionCalculationHandler(
h.db,
h.workerResult.Services.CommissionCalculation,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeCommission, commissionCalculationHandler.HandleCommissionCalculation)
h.logger.Info("注册佣金计算任务处理器", zap.String("task_type", constants.TaskTypeCommission))
}
func (h *Handler) registerPollingHandlers() {
pollingHandler := task.NewPollingHandler(
h.db,
h.redis,
h.gatewayClient,
h.workerResult.Services.UsageService,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypePollingRealname, pollingHandler.HandleRealnameCheck)
h.logger.Info("注册实名检查任务处理器", zap.String("task_type", constants.TaskTypePollingRealname))
h.mux.HandleFunc(constants.TaskTypePollingCarddata, pollingHandler.HandleCarddataCheck)
h.logger.Info("注册流量检查任务处理器", zap.String("task_type", constants.TaskTypePollingCarddata))
h.mux.HandleFunc(constants.TaskTypePollingPackage, pollingHandler.HandlePackageCheck)
h.logger.Info("注册套餐检查任务处理器", zap.String("task_type", constants.TaskTypePollingPackage))
}
func (h *Handler) registerPackageActivationHandlers() {
packageActivationHandler := polling.NewPackageActivationHandler(
h.db,
h.redis,
h.asynqClient,
h.workerResult.Services.ActivationService,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypePackageFirstActivation, packageActivationHandler.HandlePackageFirstActivation)
h.logger.Info("注册首次实名激活任务处理器", zap.String("task_type", constants.TaskTypePackageFirstActivation))
h.mux.HandleFunc(constants.TaskTypePackageQueueActivation, packageActivationHandler.HandlePackageQueueActivation)
h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation))
}
// GetMux 获取 ServeMux用于启动 Worker 服务器)
func (h *Handler) GetMux() *asynq.ServeMux {
return h.mux
}