Files
junhong_cmp_fiber/pkg/queue/handler.go
huang 353621d923
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m33s
移除所有测试代码和测试要求
**变更说明**:
- 删除所有 *_test.go 文件(单元测试、集成测试、验收测试、流程测试)
- 删除整个 tests/ 目录
- 更新 CLAUDE.md:用"测试禁令"章节替换所有测试要求
- 删除测试生成 Skill (openspec-generate-acceptance-tests)
- 删除测试生成命令 (opsx:gen-tests)
- 更新 tasks.md:删除所有测试相关任务

**新规范**:
-  禁止编写任何形式的自动化测试
-  禁止创建 *_test.go 文件
-  禁止在任务中包含测试相关工作
-  仅当用户明确要求时才编写测试

**原因**:
业务系统的正确性通过人工验证和生产环境监控保证,测试代码维护成本高于价值。

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-11 17:13:42 +08:00

216 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queue
import (
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/gateway"
"github.com/break/junhong_cmp_fiber/internal/polling"
"github.com/break/junhong_cmp_fiber/internal/service/commission_calculation"
"github.com/break/junhong_cmp_fiber/internal/service/commission_stats"
packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/internal/task"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/storage"
)
type Handler struct {
mux *asynq.ServeMux
logger *zap.Logger
db *gorm.DB
redis *redis.Client
storage *storage.Service
gatewayClient *gateway.Client
pollingCallback task.PollingCallback
}
func NewHandler(db *gorm.DB, redis *redis.Client, storageSvc *storage.Service, gatewayClient *gateway.Client, pollingCallback task.PollingCallback, logger *zap.Logger) *Handler {
return &Handler{
mux: asynq.NewServeMux(),
logger: logger,
db: db,
redis: redis,
storage: storageSvc,
gatewayClient: gatewayClient,
pollingCallback: pollingCallback,
}
}
func (h *Handler) RegisterHandlers() *asynq.ServeMux {
emailHandler := task.NewEmailHandler(h.redis, h.logger)
syncHandler := task.NewSyncHandler(h.db, h.logger)
simHandler := task.NewSIMHandler(h.db, h.redis, h.logger)
h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend)
h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend))
h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync)
h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync))
h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync)
h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync))
h.registerIotCardImportHandler()
h.registerDeviceImportHandler()
h.registerCommissionStatsHandlers()
h.registerCommissionCalculationHandler()
h.registerPollingHandlers()
h.registerPackageActivationHandlers()
h.logger.Info("所有任务处理器注册完成")
return h.mux
}
func (h *Handler) registerIotCardImportHandler() {
importTaskStore := postgres.NewIotCardImportTaskStore(h.db, h.redis)
iotCardStore := postgres.NewIotCardStore(h.db, h.redis)
iotCardImportHandler := task.NewIotCardImportHandler(h.db, h.redis, importTaskStore, iotCardStore, h.storage, h.pollingCallback, h.logger)
h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport)
h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport))
}
func (h *Handler) registerDeviceImportHandler() {
importTaskStore := postgres.NewDeviceImportTaskStore(h.db, h.redis)
deviceStore := postgres.NewDeviceStore(h.db, h.redis)
bindingStore := postgres.NewDeviceSimBindingStore(h.db, h.redis)
iotCardStore := postgres.NewIotCardStore(h.db, h.redis)
deviceImportHandler := task.NewDeviceImportHandler(h.db, h.redis, importTaskStore, deviceStore, bindingStore, iotCardStore, h.storage, h.logger)
h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport)
h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport))
}
func (h *Handler) registerCommissionStatsHandlers() {
statsStore := postgres.NewShopSeriesCommissionStatsStore(h.db)
allocationStore := postgres.NewShopPackageAllocationStore(h.db)
updateHandler := task.NewCommissionStatsUpdateHandler(h.redis, statsStore, allocationStore, h.logger)
syncHandler := task.NewCommissionStatsSyncHandler(h.db, h.redis, statsStore, h.logger)
archiveHandler := task.NewCommissionStatsArchiveHandler(h.db, h.redis, statsStore, h.logger)
h.mux.HandleFunc(constants.TaskTypeCommissionStatsUpdate, updateHandler.HandleCommissionStatsUpdate)
h.logger.Info("注册佣金统计更新任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsUpdate))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsSync, syncHandler.HandleCommissionStatsSync)
h.logger.Info("注册佣金统计同步任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsSync))
h.mux.HandleFunc(constants.TaskTypeCommissionStatsArchive, archiveHandler.HandleCommissionStatsArchive)
h.logger.Info("注册佣金统计归档任务处理器", zap.String("task_type", constants.TaskTypeCommissionStatsArchive))
}
func (h *Handler) registerCommissionCalculationHandler() {
// 创建所有需要的 Store 实例
commissionRecordStore := postgres.NewCommissionRecordStore(h.db, h.redis)
shopStore := postgres.NewShopStore(h.db, h.redis)
shopPackageAllocationStore := postgres.NewShopPackageAllocationStore(h.db)
shopSeriesAllocationStore := postgres.NewShopSeriesAllocationStore(h.db)
packageSeriesStore := postgres.NewPackageSeriesStore(h.db)
iotCardStore := postgres.NewIotCardStore(h.db, h.redis)
deviceStore := postgres.NewDeviceStore(h.db, h.redis)
walletStore := postgres.NewWalletStore(h.db, h.redis)
walletTransactionStore := postgres.NewWalletTransactionStore(h.db, h.redis)
orderStore := postgres.NewOrderStore(h.db, h.redis)
orderItemStore := postgres.NewOrderItemStore(h.db, h.redis)
packageStore := postgres.NewPackageStore(h.db)
commissionStatsStore := postgres.NewShopSeriesCommissionStatsStore(h.db)
// 创建 commission_stats.Service
commissionStatsService := commission_stats.New(commissionStatsStore)
// 创建 commission_calculation.Service
commissionCalculationService := commission_calculation.New(
h.db,
commissionRecordStore,
shopStore,
shopPackageAllocationStore,
shopSeriesAllocationStore,
packageSeriesStore,
iotCardStore,
deviceStore,
walletStore,
walletTransactionStore,
orderStore,
orderItemStore,
packageStore,
commissionStatsStore,
commissionStatsService,
h.logger,
)
// 创建并注册 Handler
commissionCalculationHandler := task.NewCommissionCalculationHandler(h.db, commissionCalculationService, h.logger)
h.mux.HandleFunc(constants.TaskTypeCommission, commissionCalculationHandler.HandleCommissionCalculation)
h.logger.Info("注册佣金计算任务处理器", zap.String("task_type", constants.TaskTypeCommission))
}
// registerPollingHandlers 注册轮询任务处理器
func (h *Handler) registerPollingHandlers() {
// 创建套餐相关 Store 和 Service用于流量扣减
packageUsageStore := postgres.NewPackageUsageStore(h.db, h.redis)
packageUsageDailyRecordStore := postgres.NewPackageUsageDailyRecordStore(h.db, h.redis)
usageService := packagepkg.NewUsageService(h.db, h.redis, packageUsageStore, packageUsageDailyRecordStore, h.logger)
pollingHandler := task.NewPollingHandler(h.db, h.redis, h.gatewayClient, usageService, h.logger)
h.mux.HandleFunc(constants.TaskTypePollingRealname, pollingHandler.HandleRealnameCheck)
h.logger.Info("注册实名检查任务处理器", zap.String("task_type", constants.TaskTypePollingRealname))
h.mux.HandleFunc(constants.TaskTypePollingCarddata, pollingHandler.HandleCarddataCheck)
h.logger.Info("注册流量检查任务处理器", zap.String("task_type", constants.TaskTypePollingCarddata))
h.mux.HandleFunc(constants.TaskTypePollingPackage, pollingHandler.HandlePackageCheck)
h.logger.Info("注册套餐检查任务处理器", zap.String("task_type", constants.TaskTypePollingPackage))
}
// registerPackageActivationHandlers 注册套餐激活任务处理器
// 任务 22.6 和 23.6: 注册首次实名激活和排队激活任务 Handler
func (h *Handler) registerPackageActivationHandlers() {
// 创建套餐相关 Store 和 Service
packageUsageStore := postgres.NewPackageUsageStore(h.db, h.redis)
packageStore := postgres.NewPackageStore(h.db)
packageUsageDailyRecordStore := postgres.NewPackageUsageDailyRecordStore(h.db, h.redis)
activationService := packagepkg.NewActivationService(
h.db,
h.redis,
packageUsageStore,
packageStore,
packageUsageDailyRecordStore,
h.logger,
)
// 创建 Asynq 客户端用于任务提交
redisOpt := asynq.RedisClientOpt{
Addr: h.redis.Options().Addr,
Password: h.redis.Options().Password,
DB: h.redis.Options().DB,
}
queueClient := asynq.NewClient(redisOpt)
// 创建套餐激活处理器
packageActivationHandler := polling.NewPackageActivationHandler(
h.db,
h.redis,
queueClient,
activationService,
h.logger,
)
// 任务 22.6: 注册首次实名激活任务 Handler
h.mux.HandleFunc(constants.TaskTypePackageFirstActivation, packageActivationHandler.HandlePackageFirstActivation)
h.logger.Info("注册首次实名激活任务处理器", zap.String("task_type", constants.TaskTypePackageFirstActivation))
// 任务 23.6: 注册排队激活任务 Handler
h.mux.HandleFunc(constants.TaskTypePackageQueueActivation, packageActivationHandler.HandlePackageQueueActivation)
h.logger.Info("注册排队激活任务处理器", zap.String("task_type", constants.TaskTypePackageQueueActivation))
}
// GetMux 获取 ServeMux用于启动 Worker 服务器)
func (h *Handler) GetMux() *asynq.ServeMux {
return h.mux
}