package task import ( "context" stderrors "errors" "fmt" "path/filepath" "strings" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/storage" "github.com/break/junhong_cmp_fiber/pkg/utils" ) const deviceBatchSize = 100 type DeviceImportPayload struct { TaskID uint `json:"task_id"` } type DeviceImportHandler struct { db *gorm.DB redis *redis.Client importTaskStore *postgres.DeviceImportTaskStore deviceStore *postgres.DeviceStore deviceSimBindingStore *postgres.DeviceSimBindingStore iotCardStore *postgres.IotCardStore storageService *storage.Service logger *zap.Logger } func NewDeviceImportHandler( db *gorm.DB, redis *redis.Client, importTaskStore *postgres.DeviceImportTaskStore, deviceStore *postgres.DeviceStore, deviceSimBindingStore *postgres.DeviceSimBindingStore, iotCardStore *postgres.IotCardStore, storageSvc *storage.Service, logger *zap.Logger, ) *DeviceImportHandler { return &DeviceImportHandler{ db: db, redis: redis, importTaskStore: importTaskStore, deviceStore: deviceStore, deviceSimBindingStore: deviceSimBindingStore, iotCardStore: iotCardStore, storageService: storageSvc, logger: logger, } } func (h *DeviceImportHandler) HandleDeviceImport(ctx context.Context, task *asynq.Task) error { var payload DeviceImportPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析设备导入任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID) if err != nil { h.logger.Error("获取导入任务失败", zap.Uint("task_id", payload.TaskID), zap.Error(err), ) return asynq.SkipRetry } if importTask.Status != model.ImportTaskStatusPending { h.logger.Info("导入任务已处理,跳过", zap.Uint("task_id", payload.TaskID), zap.Int("status", importTask.Status), ) return nil } h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "") h.logger.Info("开始处理设备导入任务", zap.Uint("task_id", importTask.ID), zap.String("task_no", importTask.TaskNo), zap.String("storage_key", importTask.StorageKey), ) rows, totalCount, err := h.downloadAndParse(ctx, importTask) if err != nil { h.logger.Error("下载或解析 Excel 失败", zap.Uint("task_id", importTask.ID), zap.Error(err), ) h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error()) return asynq.SkipRetry } result := h.processImport(ctx, importTask, rows, totalCount) h.importTaskStore.UpdateResult(ctx, importTask.ID, totalCount, result.successCount, result.skipCount, result.failCount, 0, result.skippedItems, result.failedItems, nil) if result.failCount > 0 && result.successCount == 0 { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败") } else { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "") } h.logger.Info("设备导入任务完成", zap.Uint("task_id", importTask.ID), zap.Int("success_count", result.successCount), zap.Int("skip_count", result.skipCount), zap.Int("fail_count", result.failCount), ) return nil } func (h *DeviceImportHandler) downloadAndParse(ctx context.Context, task *model.DeviceImportTask) ([]utils.DeviceRow, int, error) { if h.storageService == nil { return nil, 0, ErrStorageNotConfigured } if task.StorageKey == "" { return nil, 0, ErrStorageKeyEmpty } localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey) if err != nil { return nil, 0, err } defer cleanup() if !strings.HasSuffix(strings.ToLower(task.FileName), ".xlsx") { ext := filepath.Ext(task.FileName) return nil, 0, fmt.Errorf("不支持的文件格式 %s,请上传Excel文件(.xlsx)", ext) } return utils.ParseDeviceExcel(localPath) } type deviceImportResult struct { successCount int skipCount int failCount int skippedItems model.ImportResultItems failedItems model.ImportResultItems } func (h *DeviceImportHandler) processImport(ctx context.Context, task *model.DeviceImportTask, rows []utils.DeviceRow, totalCount int) *deviceImportResult { result := &deviceImportResult{ skippedItems: make(model.ImportResultItems, 0), failedItems: make(model.ImportResultItems, 0), } if len(rows) == 0 { return result } for i := 0; i < len(rows); i += deviceBatchSize { end := min(i+deviceBatchSize, len(rows)) batch := rows[i:end] h.processBatch(ctx, task, batch, result) } return result } func (h *DeviceImportHandler) processBatch(ctx context.Context, task *model.DeviceImportTask, batch []utils.DeviceRow, result *deviceImportResult) { deviceNos := make([]string, 0, len(batch)) allICCIDs := make([]string, 0) for _, row := range batch { deviceNos = append(deviceNos, row.VirtualNo) allICCIDs = append(allICCIDs, row.ICCIDs...) } existingDevices, err := h.deviceStore.ExistsByDeviceNoBatch(ctx, deviceNos) if err != nil { h.logger.Error("检查设备是否存在失败", zap.Error(err)) for _, row := range batch { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.VirtualNo, Reason: "数据库查询失败", }) result.failCount++ } return } var existingCards map[string]*model.IotCard var boundCards map[string]bool if len(allICCIDs) > 0 { cards, err := h.iotCardStore.GetByICCIDs(ctx, allICCIDs) if err != nil { h.logger.Error("查询卡信息失败", zap.Error(err)) } else { existingCards = make(map[string]*model.IotCard) for _, card := range cards { existingCards[card.ICCID] = card } } boundCards, err = h.deviceSimBindingStore.GetBoundICCIDs(ctx, allICCIDs) if err != nil { h.logger.Error("查询卡绑定状态失败", zap.Error(err)) } } for _, row := range batch { if existingDevices[row.VirtualNo] { result.skippedItems = append(result.skippedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.VirtualNo, Reason: "设备号已存在", }) result.skipCount++ continue } var validCardIDs []uint var cardIssues []string for _, iccid := range row.ICCIDs { card, exists := existingCards[iccid] if !exists { cardIssues = append(cardIssues, iccid+"不存在") continue } if boundCards[iccid] { cardIssues = append(cardIssues, iccid+"已绑定其他设备") continue } if card.ShopID != nil { cardIssues = append(cardIssues, iccid+"已分配给店铺,不能绑定到平台库存设备") continue } validCardIDs = append(validCardIDs, card.ID) } if len(row.ICCIDs) > 0 && len(cardIssues) > 0 { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.VirtualNo, Reason: "卡验证失败: " + strings.Join(cardIssues, ", "), }) result.failCount++ continue } err := h.db.Transaction(func(tx *gorm.DB) error { txDeviceStore := postgres.NewDeviceStore(tx, nil) txBindingStore := postgres.NewDeviceSimBindingStore(tx, nil) device := &model.Device{ VirtualNo: row.VirtualNo, DeviceName: row.DeviceName, DeviceModel: row.DeviceModel, DeviceType: row.DeviceType, MaxSimSlots: row.MaxSimSlots, Manufacturer: row.Manufacturer, BatchNo: task.BatchNo, Status: constants.DeviceStatusInStock, } device.Creator = task.Creator device.Updater = task.Creator if err := txDeviceStore.Create(ctx, device); err != nil { return err } now := time.Now() for i, cardID := range validCardIDs { binding := &model.DeviceSimBinding{ DeviceID: device.ID, IotCardID: cardID, SlotPosition: i + 1, BindStatus: 1, BindTime: &now, } if err := txBindingStore.Create(ctx, binding); err != nil { return err } } return nil }) if err != nil { h.logger.Error("创建设备失败", zap.String("virtual_no", row.VirtualNo), zap.Error(err), ) result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.VirtualNo, Reason: "数据库写入失败: " + err.Error(), }) result.failCount++ continue } for _, iccid := range row.ICCIDs { if card, exists := existingCards[iccid]; exists && !boundCards[iccid] && card.ShopID == nil { boundCards[iccid] = true } } result.successCount++ } } var ErrMissingDeviceNoColumn = stderrors.New("CSV 缺少 virtual_no 列")