Files
junhong_cmp_fiber/pkg/queue/handler.go
huang ce0783f96e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m30s
feat: 实现设备管理和设备导入功能,修复测试问题
主要变更:
- 实现设备管理模块(创建、查询、列表、更新状态、删除)
- 实现设备批量导入功能(CSV 解析、ICCID 绑定、异步任务处理)
- 添加设备-SIM 卡绑定约束(部分唯一索引防止并发问题)
- 修复 fee_rate 数据库字段类型(numeric -> bigint)
- 修复测试数据隔离问题(基于增量断言)
- 修复集成测试中间件顺序问题
- 清理无用测试文件(PersonalCustomer、Email 相关)
- 归档 enterprise-card-authorization 变更
2026-01-26 18:05:12 +08:00

78 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package queue
import (
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/internal/task"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/storage"
)
type Handler struct {
mux *asynq.ServeMux
logger *zap.Logger
db *gorm.DB
redis *redis.Client
storage *storage.Service
}
func NewHandler(db *gorm.DB, redis *redis.Client, storageSvc *storage.Service, logger *zap.Logger) *Handler {
return &Handler{
mux: asynq.NewServeMux(),
logger: logger,
db: db,
redis: redis,
storage: storageSvc,
}
}
func (h *Handler) RegisterHandlers() *asynq.ServeMux {
emailHandler := task.NewEmailHandler(h.redis, h.logger)
syncHandler := task.NewSyncHandler(h.db, h.logger)
simHandler := task.NewSIMHandler(h.db, h.redis, h.logger)
h.mux.HandleFunc(constants.TaskTypeEmailSend, emailHandler.HandleEmailSend)
h.logger.Info("注册邮件发送任务处理器", zap.String("task_type", constants.TaskTypeEmailSend))
h.mux.HandleFunc(constants.TaskTypeDataSync, syncHandler.HandleDataSync)
h.logger.Info("注册数据同步任务处理器", zap.String("task_type", constants.TaskTypeDataSync))
h.mux.HandleFunc(constants.TaskTypeSIMStatusSync, simHandler.HandleSIMStatusSync)
h.logger.Info("注册 SIM 状态同步任务处理器", zap.String("task_type", constants.TaskTypeSIMStatusSync))
h.registerIotCardImportHandler()
h.registerDeviceImportHandler()
h.logger.Info("所有任务处理器注册完成")
return h.mux
}
func (h *Handler) registerIotCardImportHandler() {
importTaskStore := postgres.NewIotCardImportTaskStore(h.db, h.redis)
iotCardStore := postgres.NewIotCardStore(h.db, h.redis)
iotCardImportHandler := task.NewIotCardImportHandler(h.db, h.redis, importTaskStore, iotCardStore, h.storage, h.logger)
h.mux.HandleFunc(constants.TaskTypeIotCardImport, iotCardImportHandler.HandleIotCardImport)
h.logger.Info("注册 IoT 卡导入任务处理器", zap.String("task_type", constants.TaskTypeIotCardImport))
}
func (h *Handler) registerDeviceImportHandler() {
importTaskStore := postgres.NewDeviceImportTaskStore(h.db, h.redis)
deviceStore := postgres.NewDeviceStore(h.db, h.redis)
bindingStore := postgres.NewDeviceSimBindingStore(h.db, h.redis)
iotCardStore := postgres.NewIotCardStore(h.db, h.redis)
deviceImportHandler := task.NewDeviceImportHandler(h.db, h.redis, importTaskStore, deviceStore, bindingStore, iotCardStore, h.storage, h.logger)
h.mux.HandleFunc(constants.TaskTypeDeviceImport, deviceImportHandler.HandleDeviceImport)
h.logger.Info("注册设备导入任务处理器", zap.String("task_type", constants.TaskTypeDeviceImport))
}
// GetMux 获取 ServeMux用于启动 Worker 服务器)
func (h *Handler) GetMux() *asynq.ServeMux {
return h.mux
}