All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m3s
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
210 lines
7.7 KiB
Go
210 lines
7.7 KiB
Go
package queue
|
||
|
||
import (
|
||
"github.com/hibiken/asynq"
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"gorm.io/gorm"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/gateway"
|
||
"github.com/break/junhong_cmp_fiber/internal/polling"
|
||
"github.com/break/junhong_cmp_fiber/internal/task"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
"github.com/break/junhong_cmp_fiber/pkg/storage"
|
||
)
|
||
|
||
type Handler struct {
|
||
mux *asynq.ServeMux
|
||
logger *zap.Logger
|
||
db *gorm.DB
|
||
redis *redis.Client
|
||
storage *storage.Service
|
||
gatewayClient *gateway.Client
|
||
pollingCallback task.PollingCallback
|
||
workerResult *WorkerBootstrapResult
|
||
asynqClient *asynq.Client
|
||
}
|
||
|
||
func NewHandler(
|
||
db *gorm.DB,
|
||
redis *redis.Client,
|
||
storageSvc *storage.Service,
|
||
gatewayClient *gateway.Client,
|
||
pollingCallback task.PollingCallback,
|
||
workerResult *WorkerBootstrapResult,
|
||
asynqClient *asynq.Client,
|
||
logger *zap.Logger,
|
||
) *Handler {
|
||
return &Handler{
|
||
mux: asynq.NewServeMux(),
|
||
logger: logger,
|
||
db: db,
|
||
redis: redis,
|
||
storage: storageSvc,
|
||
gatewayClient: gatewayClient,
|
||
pollingCallback: pollingCallback,
|
||
workerResult: workerResult,
|
||
asynqClient: asynqClient,
|
||
}
|
||
}
|
||
|
||
func (h *Handler) RegisterHandlers() *asynq.ServeMux {
|
||
emailHandler := task.NewEmailHandler(h.redis, h.logger)
|
||
syncHandler := task.NewSyncHandler(h.db, h.logger)
|
||
simHandler := task.NewSIMHandler(h.db, h.redis, h.logger)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend)
|
||
h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync)
|
||
h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync)
|
||
h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync))
|
||
|
||
h.registerIotCardImportHandler()
|
||
h.registerDeviceImportHandler()
|
||
h.registerCommissionStatsHandlers()
|
||
h.registerCommissionCalculationHandler()
|
||
h.registerPollingHandlers()
|
||
h.registerPackageActivationHandlers()
|
||
h.registerOrderExpireHandler()
|
||
h.registerAlertCheckHandler()
|
||
h.registerDataCleanupHandler()
|
||
|
||
h.logger.Info("所有任务处理器注册完成")
|
||
return h.mux
|
||
}
|
||
|
||
func (h *Handler) registerIotCardImportHandler() {
|
||
iotCardImportHandler := task.NewIotCardImportHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.workerResult.Stores.IotCardImportTask,
|
||
h.workerResult.Stores.IotCard,
|
||
h.storage,
|
||
h.pollingCallback,
|
||
h.logger,
|
||
)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport)
|
||
h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport))
|
||
}
|
||
|
||
func (h *Handler) registerDeviceImportHandler() {
|
||
deviceImportHandler := task.NewDeviceImportHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.workerResult.Stores.DeviceImportTask,
|
||
h.workerResult.Stores.Device,
|
||
h.workerResult.Stores.DeviceSimBinding,
|
||
h.workerResult.Stores.IotCard,
|
||
h.storage,
|
||
h.logger,
|
||
)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport)
|
||
h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport))
|
||
}
|
||
|
||
func (h *Handler) registerCommissionStatsHandlers() {
|
||
updateHandler := task.NewCommissionStatsUpdateHandler(
|
||
h.redis,
|
||
h.workerResult.Stores.ShopSeriesCommissionStats,
|
||
h.workerResult.Stores.ShopPackageAllocation,
|
||
h.logger,
|
||
)
|
||
syncHandler := task.NewCommissionStatsSyncHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.workerResult.Stores.ShopSeriesCommissionStats,
|
||
h.logger,
|
||
)
|
||
archiveHandler := task.NewCommissionStatsArchiveHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.workerResult.Stores.ShopSeriesCommissionStats,
|
||
h.logger,
|
||
)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate)
|
||
h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync)
|
||
h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive)
|
||
h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive))
|
||
}
|
||
|
||
func (h *Handler) registerCommissionCalculationHandler() {
|
||
commissionCalculationHandler := task.NewCommissionCalculationHandler(
|
||
h.db,
|
||
h.workerResult.Services.CommissionCalculation,
|
||
h.logger,
|
||
)
|
||
h.mux.HandleFunc(constants.TaskTypeCommission, commissionCalculationHandler.HandleCommissionCalculation)
|
||
h.logger.Info("注册佣金计算任务处理器", zap.String("task_type", constants.TaskTypeCommission))
|
||
}
|
||
|
||
func (h *Handler) registerPollingHandlers() {
|
||
pollingHandler := task.NewPollingHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.gatewayClient,
|
||
h.workerResult.Services.UsageService,
|
||
h.logger,
|
||
)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePollingRealname, pollingHandler.HandleRealnameCheck)
|
||
h.logger.Info("注册实名检查任务处理器", zap.String("task_type", constants.TaskTypePollingRealname))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePollingCarddata, pollingHandler.HandleCarddataCheck)
|
||
h.logger.Info("注册流量检查任务处理器", zap.String("task_type", constants.TaskTypePollingCarddata))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePollingPackage, pollingHandler.HandlePackageCheck)
|
||
h.logger.Info("注册套餐检查任务处理器", zap.String("task_type", constants.TaskTypePollingPackage))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePollingProtect, pollingHandler.HandleProtectConsistencyCheck)
|
||
h.logger.Info("注册保护期一致性检查任务处理器", zap.String("task_type", constants.TaskTypePollingProtect))
|
||
}
|
||
|
||
func (h *Handler) registerPackageActivationHandlers() {
|
||
packageActivationHandler := polling.NewPackageActivationHandler(
|
||
h.db,
|
||
h.redis,
|
||
h.asynqClient,
|
||
h.workerResult.Services.ActivationService,
|
||
h.logger,
|
||
)
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePackageFirstActivation, packageActivationHandler.HandlePackageFirstActivation)
|
||
h.logger.Info("注册首次实名激活任务处理器", zap.String("task_type", constants.TaskTypePackageFirstActivation))
|
||
|
||
h.mux.HandleFunc(constants.TaskTypePackageQueueActivation, packageActivationHandler.HandlePackageQueueActivation)
|
||
h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation))
|
||
}
|
||
|
||
func (h *Handler) registerOrderExpireHandler() {
|
||
orderExpireHandler := task.NewOrderExpireHandler(h.workerResult.Services.OrderExpirer, h.logger)
|
||
h.mux.HandleFunc(constants.TaskTypeOrderExpire, orderExpireHandler.HandleOrderExpire)
|
||
h.logger.Info("注册订单超时取消任务处理器", zap.String("task_type", constants.TaskTypeOrderExpire))
|
||
}
|
||
|
||
func (h *Handler) registerAlertCheckHandler() {
|
||
alertCheckHandler := task.NewAlertCheckHandler(h.workerResult.Services.AlertService, h.logger)
|
||
h.mux.HandleFunc(constants.TaskTypeAlertCheck, alertCheckHandler.HandleAlertCheck)
|
||
h.logger.Info("注册告警检查任务处理器", zap.String("task_type", constants.TaskTypeAlertCheck))
|
||
}
|
||
|
||
func (h *Handler) registerDataCleanupHandler() {
|
||
dataCleanupHandler := task.NewDataCleanupHandler(h.workerResult.Services.CleanupService, h.logger)
|
||
h.mux.HandleFunc(constants.TaskTypeDataCleanup, dataCleanupHandler.HandleDataCleanup)
|
||
h.logger.Info("注册数据清理任务处理器", zap.String("task_type", constants.TaskTypeDataCleanup))
|
||
}
|
||
|
||
// GetMux 获取 ServeMux(用于启动 Worker 服务器)
|
||
func (h *Handler) GetMux() *asynq.ServeMux {
|
||
return h.mux
|
||
}
|