package task import ( "context" "encoding/csv" stderrors "errors" "io" "os" "strconv" "strings" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" "github.com/break/junhong_cmp_fiber/pkg/storage" ) const deviceBatchSize = 100 type DeviceImportPayload struct { TaskID uint `json:"task_id"` } type DeviceImportHandler struct { db *gorm.DB redis *redis.Client importTaskStore *postgres.DeviceImportTaskStore deviceStore *postgres.DeviceStore deviceSimBindingStore *postgres.DeviceSimBindingStore iotCardStore *postgres.IotCardStore storageService *storage.Service logger *zap.Logger } func NewDeviceImportHandler( db *gorm.DB, redis *redis.Client, importTaskStore *postgres.DeviceImportTaskStore, deviceStore *postgres.DeviceStore, deviceSimBindingStore *postgres.DeviceSimBindingStore, iotCardStore *postgres.IotCardStore, storageSvc *storage.Service, logger *zap.Logger, ) *DeviceImportHandler { return &DeviceImportHandler{ db: db, redis: redis, importTaskStore: importTaskStore, deviceStore: deviceStore, deviceSimBindingStore: deviceSimBindingStore, iotCardStore: iotCardStore, storageService: storageSvc, logger: logger, } } func (h *DeviceImportHandler) HandleDeviceImport(ctx context.Context, task *asynq.Task) error { ctx = pkggorm.SkipDataPermission(ctx) var payload DeviceImportPayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析设备导入任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID) if err != nil { h.logger.Error("获取导入任务失败", zap.Uint("task_id", payload.TaskID), zap.Error(err), ) return asynq.SkipRetry } if importTask.Status != model.ImportTaskStatusPending { h.logger.Info("导入任务已处理,跳过", zap.Uint("task_id", payload.TaskID), zap.Int("status", importTask.Status), ) return nil } h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "") h.logger.Info("开始处理设备导入任务", zap.Uint("task_id", importTask.ID), zap.String("task_no", importTask.TaskNo), zap.String("storage_key", importTask.StorageKey), ) rows, totalCount, err := h.downloadAndParseCSV(ctx, importTask) if err != nil { h.logger.Error("下载或解析 CSV 失败", zap.Uint("task_id", importTask.ID), zap.Error(err), ) h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error()) return asynq.SkipRetry } result := h.processImport(ctx, importTask, rows, totalCount) h.importTaskStore.UpdateResult(ctx, importTask.ID, totalCount, result.successCount, result.skipCount, result.failCount, 0, result.skippedItems, result.failedItems, nil) if result.failCount > 0 && result.successCount == 0 { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败") } else { h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "") } h.logger.Info("设备导入任务完成", zap.Uint("task_id", importTask.ID), zap.Int("success_count", result.successCount), zap.Int("skip_count", result.skipCount), zap.Int("fail_count", result.failCount), ) return nil } type deviceRow struct { Line int DeviceNo string DeviceName string DeviceModel string DeviceType string MaxSimSlots int Manufacturer string ICCIDs []string } func (h *DeviceImportHandler) downloadAndParseCSV(ctx context.Context, task *model.DeviceImportTask) ([]deviceRow, int, error) { if h.storageService == nil { return nil, 0, ErrStorageNotConfigured } if task.StorageKey == "" { return nil, 0, ErrStorageKeyEmpty } localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey) if err != nil { return nil, 0, err } defer cleanup() f, err := os.Open(localPath) if err != nil { return nil, 0, err } defer f.Close() return h.parseDeviceCSV(f) } func (h *DeviceImportHandler) parseDeviceCSV(r io.Reader) ([]deviceRow, int, error) { reader := csv.NewReader(r) reader.FieldsPerRecord = -1 reader.TrimLeadingSpace = true header, err := reader.Read() if err != nil { return nil, 0, err } colIndex := h.buildColumnIndex(header) if colIndex["device_no"] == -1 { return nil, 0, ErrMissingDeviceNoColumn } var rows []deviceRow lineNum := 1 for { record, err := reader.Read() if err == io.EOF { break } if err != nil { continue } lineNum++ row := deviceRow{Line: lineNum} if idx := colIndex["device_no"]; idx >= 0 && idx < len(record) { row.DeviceNo = strings.TrimSpace(record[idx]) } if idx := colIndex["device_name"]; idx >= 0 && idx < len(record) { row.DeviceName = strings.TrimSpace(record[idx]) } if idx := colIndex["device_model"]; idx >= 0 && idx < len(record) { row.DeviceModel = strings.TrimSpace(record[idx]) } if idx := colIndex["device_type"]; idx >= 0 && idx < len(record) { row.DeviceType = strings.TrimSpace(record[idx]) } if idx := colIndex["max_sim_slots"]; idx >= 0 && idx < len(record) { if n, err := strconv.Atoi(strings.TrimSpace(record[idx])); err == nil { row.MaxSimSlots = n } } if idx := colIndex["manufacturer"]; idx >= 0 && idx < len(record) { row.Manufacturer = strings.TrimSpace(record[idx]) } row.ICCIDs = make([]string, 0, 4) for i := 1; i <= 4; i++ { colName := "iccid_" + strconv.Itoa(i) if idx := colIndex[colName]; idx >= 0 && idx < len(record) { iccid := strings.TrimSpace(record[idx]) if iccid != "" { row.ICCIDs = append(row.ICCIDs, iccid) } } } if row.DeviceNo == "" { continue } if row.MaxSimSlots == 0 { row.MaxSimSlots = 4 } rows = append(rows, row) } return rows, len(rows), nil } func (h *DeviceImportHandler) buildColumnIndex(header []string) map[string]int { index := map[string]int{ "device_no": -1, "device_name": -1, "device_model": -1, "device_type": -1, "max_sim_slots": -1, "manufacturer": -1, "iccid_1": -1, "iccid_2": -1, "iccid_3": -1, "iccid_4": -1, } for i, col := range header { col = strings.ToLower(strings.TrimSpace(col)) if _, exists := index[col]; exists { index[col] = i } } return index } type deviceImportResult struct { successCount int skipCount int failCount int skippedItems model.ImportResultItems failedItems model.ImportResultItems } func (h *DeviceImportHandler) processImport(ctx context.Context, task *model.DeviceImportTask, rows []deviceRow, totalCount int) *deviceImportResult { result := &deviceImportResult{ skippedItems: make(model.ImportResultItems, 0), failedItems: make(model.ImportResultItems, 0), } if len(rows) == 0 { return result } for i := 0; i < len(rows); i += deviceBatchSize { end := min(i+deviceBatchSize, len(rows)) batch := rows[i:end] h.processBatch(ctx, task, batch, result) } return result } func (h *DeviceImportHandler) processBatch(ctx context.Context, task *model.DeviceImportTask, batch []deviceRow, result *deviceImportResult) { deviceNos := make([]string, 0, len(batch)) allICCIDs := make([]string, 0) for _, row := range batch { deviceNos = append(deviceNos, row.DeviceNo) allICCIDs = append(allICCIDs, row.ICCIDs...) } existingDevices, err := h.deviceStore.ExistsByDeviceNoBatch(ctx, deviceNos) if err != nil { h.logger.Error("检查设备是否存在失败", zap.Error(err)) for _, row := range batch { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.DeviceNo, Reason: "数据库查询失败", }) result.failCount++ } return } var existingCards map[string]*model.IotCard var boundCards map[string]bool if len(allICCIDs) > 0 { cards, err := h.iotCardStore.GetByICCIDs(ctx, allICCIDs) if err != nil { h.logger.Error("查询卡信息失败", zap.Error(err)) } else { existingCards = make(map[string]*model.IotCard) for _, card := range cards { existingCards[card.ICCID] = card } } boundCards, err = h.deviceSimBindingStore.GetBoundICCIDs(ctx, allICCIDs) if err != nil { h.logger.Error("查询卡绑定状态失败", zap.Error(err)) } } for _, row := range batch { if existingDevices[row.DeviceNo] { result.skippedItems = append(result.skippedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.DeviceNo, Reason: "设备号已存在", }) result.skipCount++ continue } var validCardIDs []uint var cardIssues []string for _, iccid := range row.ICCIDs { card, exists := existingCards[iccid] if !exists { cardIssues = append(cardIssues, iccid+"不存在") continue } if boundCards[iccid] { cardIssues = append(cardIssues, iccid+"已绑定其他设备") continue } if card.ShopID != nil { cardIssues = append(cardIssues, iccid+"已分配给店铺,不能绑定到平台库存设备") continue } validCardIDs = append(validCardIDs, card.ID) } if len(row.ICCIDs) > 0 && len(cardIssues) > 0 { result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.DeviceNo, Reason: "卡验证失败: " + strings.Join(cardIssues, ", "), }) result.failCount++ continue } err := h.db.Transaction(func(tx *gorm.DB) error { txDeviceStore := postgres.NewDeviceStore(tx, nil) txBindingStore := postgres.NewDeviceSimBindingStore(tx, nil) device := &model.Device{ DeviceNo: row.DeviceNo, DeviceName: row.DeviceName, DeviceModel: row.DeviceModel, DeviceType: row.DeviceType, MaxSimSlots: row.MaxSimSlots, Manufacturer: row.Manufacturer, BatchNo: task.BatchNo, Status: constants.DeviceStatusInStock, } device.Creator = task.Creator device.Updater = task.Creator if err := txDeviceStore.Create(ctx, device); err != nil { return err } now := time.Now() for i, cardID := range validCardIDs { binding := &model.DeviceSimBinding{ DeviceID: device.ID, IotCardID: cardID, SlotPosition: i + 1, BindStatus: 1, BindTime: &now, } if err := txBindingStore.Create(ctx, binding); err != nil { return err } } return nil }) if err != nil { h.logger.Error("创建设备失败", zap.String("device_no", row.DeviceNo), zap.Error(err), ) result.failedItems = append(result.failedItems, model.ImportResultItem{ Line: row.Line, ICCID: row.DeviceNo, Reason: "数据库写入失败: " + err.Error(), }) result.failCount++ continue } for _, iccid := range row.ICCIDs { if card, exists := existingCards[iccid]; exists && !boundCards[iccid] && card.ShopID == nil { boundCards[iccid] = true } } result.successCount++ } } var ErrMissingDeviceNoColumn = stderrors.New("CSV 缺少 device_no 列")