package task import ( "context" "errors" "fmt" "path/filepath" "strings" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/storage" "github.com/break/junhong_cmp_fiber/pkg/utils" "github.com/break/junhong_cmp_fiber/pkg/validator" ) var ( ErrStorageNotConfigured = errors.New("对象存储服务未配置") ErrStorageKeyEmpty = errors.New("文件存储路径为空") ) const batchSize = 1000 type IotCardImportPayload struct { TaskID uint `json:"task_id"` } // PollingCallback 轮询回调接口 // 用于在卡创建/删除/状态变化时通知轮询系统 type PollingCallback interface { // OnBatchCardsCreated 批量卡创建时的回调 OnBatchCardsCreated(ctx context.Context, cards []*model.IotCard) } type IotCardImportHandler struct { db *gorm.DB redis *redis.Client importTaskStore *postgres.IotCardImportTaskStore iotCardStore *postgres.IotCardStore storageService *storage.Service pollingCallback PollingCallback logger *zap.Logger } func NewIotCardImportHandler( db *gorm.DB, redis *redis.Client, importTaskStore *postgres.IotCardImportTaskStore, iotCardStore *postgres.IotCardStore, storageSvc *storage.Service, pollingCallback PollingCallback, logger *zap.Logger, ) *IotCardImportHandler { return &IotCardImportHandler{ db: db, redis: redis, importTaskStore: importTaskStore, iotCardStore: iotCardStore, storageService: storageSvc, pollingCallback: pollingCallback, logger: logger, } } func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *asynq.Task) error { var payload IotCardImportPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析 IoT 卡导入任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID) if err != nil { h.logger.Error("获取导入任务失败", zap.Uint("task_id", payload.TaskID), zap.Error(err), ) return asynq.SkipRetry } if importTask.Status != model.ImportTaskStatusPending { h.logger.Info("导入任务已处理,跳过", zap.Uint("task_id", payload.TaskID), zap.Int("status", importTask.Status), ) return nil } h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "") h.logger.Info("开始处理 IoT 卡导入任务", zap.Uint("task_id", importTask.ID), zap.String("task_no", importTask.TaskNo), zap.String("storage_key", importTask.StorageKey), ) cards, totalCount, err := h.downloadAndParse(ctx, importTask) if err != nil { h.logger.Error("下载或解析 Excel 失败", zap.Uint("task_id", importTask.ID), zap.Error(err), ) h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error()) return asynq.SkipRetry } importTask.CardList = cards importTask.TotalCount = totalCount h.importTaskStore.UpdateCardList(ctx, importTask.ID, cards, totalCount) result := h.processImport(ctx, importTask) h.importTaskStore.UpdateResult(ctx, importTask.ID, result.successCount, result.skipCount, result.failCount, result.skippedItems, result.failedItems) if result.failCount > 0 && result.successCount == 0 { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败") } else { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "") } h.logger.Info("IoT 卡导入任务完成", zap.Uint("task_id", importTask.ID), zap.Int("success_count", result.successCount), zap.Int("skip_count", result.skipCount), zap.Int("fail_count", result.failCount), ) return nil } func (h *IotCardImportHandler) downloadAndParse(ctx context.Context, task *model.IotCardImportTask) (model.CardListJSON, int, error) { if h.storageService == nil { return nil, 0, ErrStorageNotConfigured } if task.StorageKey == "" { return nil, 0, ErrStorageKeyEmpty } localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey) if err != nil { return nil, 0, err } defer cleanup() if !strings.HasSuffix(strings.ToLower(task.FileName), ".xlsx") { ext := filepath.Ext(task.FileName) return nil, 0, fmt.Errorf("不支持的文件格式 %s,请上传Excel文件(.xlsx)", ext) } parseResult, err := utils.ParseCardExcel(localPath) if err != nil { return nil, 0, err } cards := make(model.CardListJSON, 0, len(parseResult.Cards)) for _, card := range parseResult.Cards { cards = append(cards, model.CardItem{ ICCID: card.ICCID, MSISDN: card.MSISDN, }) } return cards, parseResult.TotalCount, nil } type importResult struct { successCount int skipCount int failCount int skippedItems model.ImportResultItems failedItems model.ImportResultItems } func (h *IotCardImportHandler) processImport(ctx context.Context, task *model.IotCardImportTask) *importResult { result := &importResult{ skippedItems: make(model.ImportResultItems, 0), failedItems: make(model.ImportResultItems, 0), } cards := h.getCardsFromTask(task) if len(cards) == 0 { return result } for i := 0; i < len(cards); i += batchSize { end := min(i+batchSize, len(cards)) batch := cards[i:end] h.processBatch(ctx, task, batch, i+1, result) } return result } // getCardsFromTask 从任务中获取待导入的卡列表 func (h *IotCardImportHandler) getCardsFromTask(task *model.IotCardImportTask) []model.CardItem { return []model.CardItem(task.CardList) } func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []model.CardItem, startLine int, result *importResult) { type cardMeta struct { line int msisdn string } validCards := make([]model.CardItem, 0) cardMetaMap := make(map[string]cardMeta) for i, card := range batch { line := startLine + i cardMetaMap[card.ICCID] = cardMeta{line: line, msisdn: card.MSISDN} validationResult := validator.ValidateICCID(card.ICCID, task.CarrierType) if !validationResult.Valid { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: line, ICCID: card.ICCID, MSISDN: card.MSISDN, Reason: validationResult.Message, }) result.failCount++ continue } validCards = append(validCards, card) } if len(validCards) == 0 { return } validICCIDs := make([]string, len(validCards)) for i, card := range validCards { validICCIDs[i] = card.ICCID } existingMap, err := h.iotCardStore.ExistsByICCIDBatch(ctx, validICCIDs) if err != nil { h.logger.Error("批量检查 ICCID 是否存在失败", zap.Error(err), zap.Int("batch_size", len(validICCIDs)), ) for _, card := range validCards { meta := cardMetaMap[card.ICCID] result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: meta.line, ICCID: card.ICCID, MSISDN: meta.msisdn, Reason: "数据库查询失败", }) result.failCount++ } return } newCards := make([]model.CardItem, 0) for _, card := range validCards { meta := cardMetaMap[card.ICCID] if existingMap[card.ICCID] { result.skippedItems = append(result.skippedItems, model.ImportResultItem{ Line: meta.line, ICCID: card.ICCID, MSISDN: meta.msisdn, Reason: "ICCID 已存在", }) result.skipCount++ } else { newCards = append(newCards, card) } } if len(newCards) == 0 { return } iotCards := make([]*model.IotCard, 0, len(newCards)) now := time.Now() for _, card := range newCards { iotCard := &model.IotCard{ ICCID: card.ICCID, MSISDN: card.MSISDN, CarrierID: task.CarrierID, BatchNo: task.BatchNo, Status: constants.IotCardStatusInStock, CardCategory: constants.CardCategoryNormal, ActivationStatus: constants.ActivationStatusInactive, RealNameStatus: constants.RealNameStatusNotVerified, NetworkStatus: constants.NetworkStatusOffline, } iotCard.BaseModel.Creator = task.Creator iotCard.BaseModel.Updater = task.Creator iotCard.CreatedAt = now iotCard.UpdatedAt = now iotCards = append(iotCards, iotCard) } if err := h.iotCardStore.CreateBatch(ctx, iotCards); err != nil { h.logger.Error("批量创建 IoT 卡失败", zap.Error(err), zap.Int("batch_size", len(iotCards)), ) for _, card := range newCards { meta := cardMetaMap[card.ICCID] result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: meta.line, ICCID: card.ICCID, MSISDN: meta.msisdn, Reason: "数据库写入失败", }) result.failCount++ } return } result.successCount += len(newCards) // 通知轮询系统:批量卡已创建 if h.pollingCallback != nil { h.pollingCallback.OnBatchCardsCreated(ctx, iotCards) } }