package queue import ( "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/gateway" "github.com/break/junhong_cmp_fiber/internal/polling" "github.com/break/junhong_cmp_fiber/internal/service/commission_calculation" "github.com/break/junhong_cmp_fiber/internal/service/commission_stats" packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/internal/task" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/storage" ) type Handler struct { mux *asynq.ServeMux logger *zap.Logger db *gorm.DB redis *redis.Client storage *storage.Service gatewayClient *gateway.Client pollingCallback task.PollingCallback } func NewHandler(db *gorm.DB, redis *redis.Client, storageSvc *storage.Service, gatewayClient *gateway.Client, pollingCallback task.PollingCallback, logger *zap.Logger) *Handler { return &Handler{ mux: asynq.NewServeMux(), logger: logger, db: db, redis: redis, storage: storageSvc, gatewayClient: gatewayClient, pollingCallback: pollingCallback, } } func (h *Handler) RegisterHandlers() *asynq.ServeMux { emailHandler := task.NewEmailHandler(h.redis, h.logger) syncHandler := task.NewSyncHandler(h.db, h.logger) simHandler := task.NewSIMHandler(h.db, h.redis, h.logger) h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend) h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend)) h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync) h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync)) h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync) h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync)) h.registerIotCardImportHandler() h.registerDeviceImportHandler() h.registerCommissionStatsHandlers() h.registerCommissionCalculationHandler() h.registerPollingHandlers() h.registerPackageActivationHandlers() h.logger.Info("所有任务处理器注册完成") return h.mux } func (h *Handler) registerIotCardImportHandler() { importTaskStore := postgres.NewIotCardImportTaskStore(h.db, h.redis) iotCardStore := postgres.NewIotCardStore(h.db, h.redis) iotCardImportHandler := task.NewIotCardImportHandler(h.db, h.redis, importTaskStore, iotCardStore, h.storage, h.pollingCallback, h.logger) h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport) h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport)) } func (h *Handler) registerDeviceImportHandler() { importTaskStore := postgres.NewDeviceImportTaskStore(h.db, h.redis) deviceStore := postgres.NewDeviceStore(h.db, h.redis) bindingStore := postgres.NewDeviceSimBindingStore(h.db, h.redis) iotCardStore := postgres.NewIotCardStore(h.db, h.redis) deviceImportHandler := task.NewDeviceImportHandler(h.db, h.redis, importTaskStore, deviceStore, bindingStore, iotCardStore, h.storage, h.logger) h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport) h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport)) } func (h *Handler) registerCommissionStatsHandlers() { statsStore := postgres.NewShopSeriesCommissionStatsStore(h.db) allocationStore := postgres.NewShopPackageAllocationStore(h.db) updateHandler := task.NewCommissionStatsUpdateHandler(h.redis, statsStore, allocationStore, h.logger) syncHandler := task.NewCommissionStatsSyncHandler(h.db, h.redis, statsStore, h.logger) archiveHandler := task.NewCommissionStatsArchiveHandler(h.db, h.redis, statsStore, h.logger) h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate) h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate)) h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync) h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync)) h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive) h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive)) } func (h *Handler) registerCommissionCalculationHandler() { // 创建所有需要的 Store 实例 commissionRecordStore := postgres.NewCommissionRecordStore(h.db, h.redis) shopStore := postgres.NewShopStore(h.db, h.redis) shopPackageAllocationStore := postgres.NewShopPackageAllocationStore(h.db) shopSeriesAllocationStore := postgres.NewShopSeriesAllocationStore(h.db) packageSeriesStore := postgres.NewPackageSeriesStore(h.db) iotCardStore := postgres.NewIotCardStore(h.db, h.redis) deviceStore := postgres.NewDeviceStore(h.db, h.redis) walletStore := postgres.NewWalletStore(h.db, h.redis) walletTransactionStore := postgres.NewWalletTransactionStore(h.db, h.redis) orderStore := postgres.NewOrderStore(h.db, h.redis) orderItemStore := postgres.NewOrderItemStore(h.db, h.redis) packageStore := postgres.NewPackageStore(h.db) commissionStatsStore := postgres.NewShopSeriesCommissionStatsStore(h.db) // 创建 commission_stats.Service commissionStatsService := commission_stats.New(commissionStatsStore) // 创建 commission_calculation.Service commissionCalculationService := commission_calculation.New( h.db, commissionRecordStore, shopStore, shopPackageAllocationStore, shopSeriesAllocationStore, packageSeriesStore, iotCardStore, deviceStore, walletStore, walletTransactionStore, orderStore, orderItemStore, packageStore, commissionStatsStore, commissionStatsService, h.logger, ) // 创建并注册 Handler commissionCalculationHandler := task.NewCommissionCalculationHandler(h.db, commissionCalculationService, h.logger) h.mux.HandleFunc(constants.TaskTypeCommission, commissionCalculationHandler.HandleCommissionCalculation) h.logger.Info("注册佣金计算任务处理器", zap.String("task_type", constants.TaskTypeCommission)) } // registerPollingHandlers 注册轮询任务处理器 func (h *Handler) registerPollingHandlers() { // 创建套餐相关 Store 和 Service(用于流量扣减) packageUsageStore := postgres.NewPackageUsageStore(h.db, h.redis) packageUsageDailyRecordStore := postgres.NewPackageUsageDailyRecordStore(h.db, h.redis) usageService := packagepkg.NewUsageService(h.db, h.redis, packageUsageStore, packageUsageDailyRecordStore, h.logger) pollingHandler := task.NewPollingHandler(h.db, h.redis, h.gatewayClient, usageService, h.logger) h.mux.HandleFunc(constants.TaskTypePollingRealname, pollingHandler.HandleRealnameCheck) h.logger.Info("注册实名检查任务处理器", zap.String("task_type", constants.TaskTypePollingRealname)) h.mux.HandleFunc(constants.TaskTypePollingCarddata, pollingHandler.HandleCarddataCheck) h.logger.Info("注册流量检查任务处理器", zap.String("task_type", constants.TaskTypePollingCarddata)) h.mux.HandleFunc(constants.TaskTypePollingPackage, pollingHandler.HandlePackageCheck) h.logger.Info("注册套餐检查任务处理器", zap.String("task_type", constants.TaskTypePollingPackage)) } // registerPackageActivationHandlers 注册套餐激活任务处理器 // 任务 22.6 和 23.6: 注册首次实名激活和排队激活任务 Handler func (h *Handler) registerPackageActivationHandlers() { // 创建套餐相关 Store 和 Service packageUsageStore := postgres.NewPackageUsageStore(h.db, h.redis) packageStore := postgres.NewPackageStore(h.db) packageUsageDailyRecordStore := postgres.NewPackageUsageDailyRecordStore(h.db, h.redis) activationService := packagepkg.NewActivationService( h.db, h.redis, packageUsageStore, packageStore, packageUsageDailyRecordStore, h.logger, ) // 创建 Asynq 客户端用于任务提交 redisOpt := asynq.RedisClientOpt{ Addr: h.redis.Options().Addr, Password: h.redis.Options().Password, DB: h.redis.Options().DB, } queueClient := asynq.NewClient(redisOpt) // 创建套餐激活处理器 packageActivationHandler := polling.NewPackageActivationHandler( h.db, h.redis, queueClient, activationService, h.logger, ) // 任务 22.6: 注册首次实名激活任务 Handler h.mux.HandleFunc(constants.TaskTypePackageFirstActivation, packageActivationHandler.HandlePackageFirstActivation) h.logger.Info("注册首次实名激活任务处理器", zap.String("task_type", constants.TaskTypePackageFirstActivation)) // 任务 23.6: 注册排队激活任务 Handler h.mux.HandleFunc(constants.TaskTypePackageQueueActivation, packageActivationHandler.HandlePackageQueueActivation) h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation)) } // GetMux 获取 ServeMux(用于启动 Worker 服务器) func (h *Handler) GetMux() *asynq.ServeMux { return h.mux }