package queue import ( "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/internal/task" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/storage" ) type Handler struct { mux *asynq.ServeMux logger *zap.Logger db *gorm.DB redis *redis.Client storage *storage.Service } func NewHandler(db *gorm.DB, redis *redis.Client, storageSvc *storage.Service, logger *zap.Logger) *Handler { return &Handler{ mux: asynq.NewServeMux(), logger: logger, db: db, redis: redis, storage: storageSvc, } } func (h *Handler) RegisterHandlers() *asynq.ServeMux { emailHandler := task.NewEmailHandler(h.redis, h.logger) syncHandler := task.NewSyncHandler(h.db, h.logger) simHandler := task.NewSIMHandler(h.db, h.redis, h.logger) h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend) h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend)) h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync) h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync)) h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync) h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync)) h.registerIotCardImportHandler() h.registerDeviceImportHandler() h.registerCommissionStatsHandlers() h.logger.Info("所有任务处理器注册完成") return h.mux } func (h *Handler) registerIotCardImportHandler() { importTaskStore := postgres.NewIotCardImportTaskStore(h.db, h.redis) iotCardStore := postgres.NewIotCardStore(h.db, h.redis) iotCardImportHandler := task.NewIotCardImportHandler(h.db, h.redis, importTaskStore, iotCardStore, h.storage, h.logger) h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport) h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport)) } func (h *Handler) registerDeviceImportHandler() { importTaskStore := postgres.NewDeviceImportTaskStore(h.db, h.redis) deviceStore := postgres.NewDeviceStore(h.db, h.redis) bindingStore := postgres.NewDeviceSimBindingStore(h.db, h.redis) iotCardStore := postgres.NewIotCardStore(h.db, h.redis) deviceImportHandler := task.NewDeviceImportHandler(h.db, h.redis, importTaskStore, deviceStore, bindingStore, iotCardStore, h.storage, h.logger) h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport) h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport)) } func (h *Handler) registerCommissionStatsHandlers() { statsStore := postgres.NewShopSeriesCommissionStatsStore(h.db) allocationStore := postgres.NewShopPackageAllocationStore(h.db) updateHandler := task.NewCommissionStatsUpdateHandler(h.redis, statsStore, allocationStore, h.logger) syncHandler := task.NewCommissionStatsSyncHandler(h.db, h.redis, statsStore, h.logger) archiveHandler := task.NewCommissionStatsArchiveHandler(h.db, h.redis, statsStore, h.logger) h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate) h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate)) h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync) h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync)) h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive) h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive)) } // GetMux 获取 ServeMux(用于启动 Worker 服务器) func (h *Handler) GetMux() *asynq.ServeMux { return h.mux }