package task import ( "context" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" "github.com/break/junhong_cmp_fiber/pkg/validator" ) const batchSize = 1000 type IotCardImportPayload struct { TaskID uint `json:"task_id"` } type IotCardImportHandler struct { db *gorm.DB redis *redis.Client importTaskStore *postgres.IotCardImportTaskStore iotCardStore *postgres.IotCardStore logger *zap.Logger } func NewIotCardImportHandler(db *gorm.DB, redis *redis.Client, importTaskStore *postgres.IotCardImportTaskStore, iotCardStore *postgres.IotCardStore, logger *zap.Logger) *IotCardImportHandler { return &IotCardImportHandler{ db: db, redis: redis, importTaskStore: importTaskStore, iotCardStore: iotCardStore, logger: logger, } } func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *asynq.Task) error { ctx = pkggorm.SkipDataPermission(ctx) var payload IotCardImportPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析 IoT 卡导入任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID) if err != nil { h.logger.Error("获取导入任务失败", zap.Uint("task_id", payload.TaskID), zap.Error(err), ) return asynq.SkipRetry } if importTask.Status != model.ImportTaskStatusPending { h.logger.Info("导入任务已处理,跳过", zap.Uint("task_id", payload.TaskID), zap.Int("status", importTask.Status), ) return nil } h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "") h.logger.Info("开始处理 IoT 卡导入任务", zap.Uint("task_id", importTask.ID), zap.String("task_no", importTask.TaskNo), zap.Int("total_count", importTask.TotalCount), ) result := h.processImport(ctx, importTask) h.importTaskStore.UpdateResult(ctx, importTask.ID, result.successCount, result.skipCount, result.failCount, result.skippedItems, result.failedItems) if result.failCount > 0 && result.successCount == 0 { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败") } else { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "") } h.logger.Info("IoT 卡导入任务完成", zap.Uint("task_id", importTask.ID), zap.Int("success_count", result.successCount), zap.Int("skip_count", result.skipCount), zap.Int("fail_count", result.failCount), ) return nil } type importResult struct { successCount int skipCount int failCount int skippedItems model.ImportResultItems failedItems model.ImportResultItems } func (h *IotCardImportHandler) processImport(ctx context.Context, task *model.IotCardImportTask) *importResult { result := &importResult{ skippedItems: make(model.ImportResultItems, 0), failedItems: make(model.ImportResultItems, 0), } iccids := h.getICCIDsFromTask(task) if len(iccids) == 0 { return result } for i := 0; i < len(iccids); i += batchSize { end := min(i+batchSize, len(iccids)) batch := iccids[i:end] h.processBatch(ctx, task, batch, i+1, result) } return result } func (h *IotCardImportHandler) getICCIDsFromTask(task *model.IotCardImportTask) []string { return []string(task.ICCIDList) } func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []string, startLine int, result *importResult) { validICCIDs := make([]string, 0) lineMap := make(map[string]int) for i, iccid := range batch { line := startLine + i lineMap[iccid] = line validationResult := validator.ValidateICCID(iccid, task.CarrierType) if !validationResult.Valid { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: line, ICCID: iccid, Reason: validationResult.Message, }) result.failCount++ continue } validICCIDs = append(validICCIDs, iccid) } if len(validICCIDs) == 0 { return } existingMap, err := h.iotCardStore.ExistsByICCIDBatch(ctx, validICCIDs) if err != nil { h.logger.Error("批量检查 ICCID 是否存在失败", zap.Error(err), zap.Int("batch_size", len(validICCIDs)), ) for _, iccid := range validICCIDs { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: lineMap[iccid], ICCID: iccid, Reason: "数据库查询失败", }) result.failCount++ } return } newICCIDs := make([]string, 0) for _, iccid := range validICCIDs { if existingMap[iccid] { result.skippedItems = append(result.skippedItems, model.ImportResultItem{ Line: lineMap[iccid], ICCID: iccid, Reason: "ICCID 已存在", }) result.skipCount++ } else { newICCIDs = append(newICCIDs, iccid) } } if len(newICCIDs) == 0 { return } cards := make([]*model.IotCard, 0, len(newICCIDs)) now := time.Now() for _, iccid := range newICCIDs { card := &model.IotCard{ ICCID: iccid, CarrierID: task.CarrierID, BatchNo: task.BatchNo, Status: constants.IotCardStatusInStock, CardCategory: constants.CardCategoryNormal, ActivationStatus: constants.ActivationStatusInactive, RealNameStatus: constants.RealNameStatusNotVerified, NetworkStatus: constants.NetworkStatusOffline, } card.BaseModel.Creator = task.Creator card.BaseModel.Updater = task.Creator card.CreatedAt = now card.UpdatedAt = now cards = append(cards, card) } if err := h.iotCardStore.CreateBatch(ctx, cards); err != nil { h.logger.Error("批量创建 IoT 卡失败", zap.Error(err), zap.Int("batch_size", len(cards)), ) for _, iccid := range newICCIDs { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: lineMap[iccid], ICCID: iccid, Reason: "数据库写入失败", }) result.failCount++ } return } result.successCount += len(newICCIDs) }