Files
junhong_cmp_fiber/pkg/queue/handler.go
huang b9c3875c08
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m3s
feat: 新增数据库迁移,重命名 device_no 为 virtual_no,新增 iot_card.virtual_no 和 package.virtual_ratio 字段
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-14 18:27:28 +08:00

210 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queue
import (
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/gateway"
"github.com/break/junhong_cmp_fiber/internal/polling"
"github.com/break/junhong_cmp_fiber/internal/task"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/storage"
)
type Handler struct {
mux *asynq.ServeMux
logger *zap.Logger
db *gorm.DB
redis *redis.Client
storage *storage.Service
gatewayClient *gateway.Client
pollingCallback task.PollingCallback
workerResult *WorkerBootstrapResult
asynqClient *asynq.Client
}
func NewHandler(
db *gorm.DB,
redis *redis.Client,
storageSvc *storage.Service,
gatewayClient *gateway.Client,
pollingCallback task.PollingCallback,
workerResult *WorkerBootstrapResult,
asynqClient *asynq.Client,
logger *zap.Logger,
) *Handler {
return &Handler{
mux: asynq.NewServeMux(),
logger: logger,
db: db,
redis: redis,
storage: storageSvc,
gatewayClient: gatewayClient,
pollingCallback: pollingCallback,
workerResult: workerResult,
asynqClient: asynqClient,
}
}
func (h *Handler) RegisterHandlers() *asynq.ServeMux {
emailHandler := task.NewEmailHandler(h.redis, h.logger)
syncHandler := task.NewSyncHandler(h.db, h.logger)
simHandler := task.NewSIMHandler(h.db, h.redis, h.logger)
h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend)
h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend))
h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync)
h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync))
h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync)
h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync))
h.registerIotCardImportHandler()
h.registerDeviceImportHandler()
h.registerCommissionStatsHandlers()
h.registerCommissionCalculationHandler()
h.registerPollingHandlers()
h.registerPackageActivationHandlers()
h.registerOrderExpireHandler()
h.registerAlertCheckHandler()
h.registerDataCleanupHandler()
h.logger.Info("所有任务处理器注册完成")
return h.mux
}
func (h *Handler) registerIotCardImportHandler() {
iotCardImportHandler := task.NewIotCardImportHandler(
h.db,
h.redis,
h.workerResult.Stores.IotCardImportTask,
h.workerResult.Stores.IotCard,
h.storage,
h.pollingCallback,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport)
h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport))
}
func (h *Handler) registerDeviceImportHandler() {
deviceImportHandler := task.NewDeviceImportHandler(
h.db,
h.redis,
h.workerResult.Stores.DeviceImportTask,
h.workerResult.Stores.Device,
h.workerResult.Stores.DeviceSimBinding,
h.workerResult.Stores.IotCard,
h.storage,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport)
h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport))
}
func (h *Handler) registerCommissionStatsHandlers() {
updateHandler := task.NewCommissionStatsUpdateHandler(
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.workerResult.Stores.ShopPackageAllocation,
h.logger,
)
syncHandler := task.NewCommissionStatsSyncHandler(
h.db,
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.logger,
)
archiveHandler := task.NewCommissionStatsArchiveHandler(
h.db,
h.redis,
h.workerResult.Stores.ShopSeriesCommissionStats,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate)
h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync)
h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive)
h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive))
}
func (h *Handler) registerCommissionCalculationHandler() {
commissionCalculationHandler := task.NewCommissionCalculationHandler(
h.db,
h.workerResult.Services.CommissionCalculation,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypeCommission, commissionCalculationHandler.HandleCommissionCalculation)
h.logger.Info("注册佣金计算任务处理器", zap.String("task_type", constants.TaskTypeCommission))
}
func (h *Handler) registerPollingHandlers() {
pollingHandler := task.NewPollingHandler(
h.db,
h.redis,
h.gatewayClient,
h.workerResult.Services.UsageService,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypePollingRealname, pollingHandler.HandleRealnameCheck)
h.logger.Info("注册实名检查任务处理器", zap.String("task_type", constants.TaskTypePollingRealname))
h.mux.HandleFunc(constants.TaskTypePollingCarddata, pollingHandler.HandleCarddataCheck)
h.logger.Info("注册流量检查任务处理器", zap.String("task_type", constants.TaskTypePollingCarddata))
h.mux.HandleFunc(constants.TaskTypePollingPackage, pollingHandler.HandlePackageCheck)
h.logger.Info("注册套餐检查任务处理器", zap.String("task_type", constants.TaskTypePollingPackage))
h.mux.HandleFunc(constants.TaskTypePollingProtect, pollingHandler.HandleProtectConsistencyCheck)
h.logger.Info("注册保护期一致性检查任务处理器", zap.String("task_type", constants.TaskTypePollingProtect))
}
func (h *Handler) registerPackageActivationHandlers() {
packageActivationHandler := polling.NewPackageActivationHandler(
h.db,
h.redis,
h.asynqClient,
h.workerResult.Services.ActivationService,
h.logger,
)
h.mux.HandleFunc(constants.TaskTypePackageFirstActivation, packageActivationHandler.HandlePackageFirstActivation)
h.logger.Info("注册首次实名激活任务处理器", zap.String("task_type", constants.TaskTypePackageFirstActivation))
h.mux.HandleFunc(constants.TaskTypePackageQueueActivation, packageActivationHandler.HandlePackageQueueActivation)
h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation))
}
func (h *Handler) registerOrderExpireHandler() {
orderExpireHandler := task.NewOrderExpireHandler(h.workerResult.Services.OrderExpirer, h.logger)
h.mux.HandleFunc(constants.TaskTypeOrderExpire, orderExpireHandler.HandleOrderExpire)
h.logger.Info("注册订单超时取消任务处理器", zap.String("task_type", constants.TaskTypeOrderExpire))
}
func (h *Handler) registerAlertCheckHandler() {
alertCheckHandler := task.NewAlertCheckHandler(h.workerResult.Services.AlertService, h.logger)
h.mux.HandleFunc(constants.TaskTypeAlertCheck, alertCheckHandler.HandleAlertCheck)
h.logger.Info("注册告警检查任务处理器", zap.String("task_type", constants.TaskTypeAlertCheck))
}
func (h *Handler) registerDataCleanupHandler() {
dataCleanupHandler := task.NewDataCleanupHandler(h.workerResult.Services.CleanupService, h.logger)
h.mux.HandleFunc(constants.TaskTypeDataCleanup, dataCleanupHandler.HandleDataCleanup)
h.logger.Info("注册数据清理任务处理器", zap.String("task_type", constants.TaskTypeDataCleanup))
}
// GetMux 获取 ServeMux用于启动 Worker 服务器)
func (h *Handler) GetMux() *asynq.ServeMux {
return h.mux
}