Files
junhong_cmp_fiber/internal/task/iot_card_import.go
huang 03a0960c4d
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m2s
refactor: 数据权限过滤从 GORM Callback 改为 Store 层显式调用
- 移除 RegisterDataPermissionCallback 和 SkipDataPermission 机制
- 在 Auth 中间件预计算 SubordinateShopIDs 并注入 Context
- 新增 ApplyShopFilter/ApplyEnterpriseFilter/ApplyOwnerShopFilter 等 Helper 函数
- 所有 Store 层查询方法显式调用数据权限过滤函数
- 权限检查函数 CanManageShop/CanManageEnterprise 改为从 Context 获取数据

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-26 16:38:52 +08:00

331 lines
8.9 KiB
Go

package task
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/bytedance/sonic"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/storage"
"github.com/break/junhong_cmp_fiber/pkg/utils"
"github.com/break/junhong_cmp_fiber/pkg/validator"
)
var (
ErrStorageNotConfigured = errors.New("对象存储服务未配置")
ErrStorageKeyEmpty = errors.New("文件存储路径为空")
)
const batchSize = 1000
type IotCardImportPayload struct {
TaskID uint `json:"task_id"`
}
// PollingCallback 轮询回调接口
// 用于在卡创建/删除/状态变化时通知轮询系统
type PollingCallback interface {
// OnBatchCardsCreated 批量卡创建时的回调
OnBatchCardsCreated(ctx context.Context, cards []*model.IotCard)
}
type IotCardImportHandler struct {
db *gorm.DB
redis *redis.Client
importTaskStore *postgres.IotCardImportTaskStore
iotCardStore *postgres.IotCardStore
storageService *storage.Service
pollingCallback PollingCallback
logger *zap.Logger
}
func NewIotCardImportHandler(
db *gorm.DB,
redis *redis.Client,
importTaskStore *postgres.IotCardImportTaskStore,
iotCardStore *postgres.IotCardStore,
storageSvc *storage.Service,
pollingCallback PollingCallback,
logger *zap.Logger,
) *IotCardImportHandler {
return &IotCardImportHandler{
db: db,
redis: redis,
importTaskStore: importTaskStore,
iotCardStore: iotCardStore,
storageService: storageSvc,
pollingCallback: pollingCallback,
logger: logger,
}
}
func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *asynq.Task) error {
var payload IotCardImportPayload
if err := sonic.Unmarshal(task.Payload(), &payload); err != nil {
h.logger.Error("解析 IoT 卡导入任务载荷失败",
zap.Error(err),
zap.String("task_id", task.ResultWriter().TaskID()),
)
return asynq.SkipRetry
}
importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID)
if err != nil {
h.logger.Error("获取导入任务失败",
zap.Uint("task_id", payload.TaskID),
zap.Error(err),
)
return asynq.SkipRetry
}
if importTask.Status != model.ImportTaskStatusPending {
h.logger.Info("导入任务已处理,跳过",
zap.Uint("task_id", payload.TaskID),
zap.Int("status", importTask.Status),
)
return nil
}
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "")
h.logger.Info("开始处理 IoT 卡导入任务",
zap.Uint("task_id", importTask.ID),
zap.String("task_no", importTask.TaskNo),
zap.String("storage_key", importTask.StorageKey),
)
cards, totalCount, err := h.downloadAndParse(ctx, importTask)
if err != nil {
h.logger.Error("下载或解析 Excel 失败",
zap.Uint("task_id", importTask.ID),
zap.Error(err),
)
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error())
return asynq.SkipRetry
}
importTask.CardList = cards
importTask.TotalCount = totalCount
h.importTaskStore.UpdateCardList(ctx, importTask.ID, cards, totalCount)
result := h.processImport(ctx, importTask)
h.importTaskStore.UpdateResult(ctx, importTask.ID, result.successCount, result.skipCount, result.failCount, result.skippedItems, result.failedItems)
if result.failCount > 0 && result.successCount == 0 {
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败")
} else {
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "")
}
h.logger.Info("IoT 卡导入任务完成",
zap.Uint("task_id", importTask.ID),
zap.Int("success_count", result.successCount),
zap.Int("skip_count", result.skipCount),
zap.Int("fail_count", result.failCount),
)
return nil
}
func (h *IotCardImportHandler) downloadAndParse(ctx context.Context, task *model.IotCardImportTask) (model.CardListJSON, int, error) {
if h.storageService == nil {
return nil, 0, ErrStorageNotConfigured
}
if task.StorageKey == "" {
return nil, 0, ErrStorageKeyEmpty
}
localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey)
if err != nil {
return nil, 0, err
}
defer cleanup()
if !strings.HasSuffix(strings.ToLower(task.FileName), ".xlsx") {
ext := filepath.Ext(task.FileName)
return nil, 0, fmt.Errorf("不支持的文件格式 %s,请上传Excel文件(.xlsx)", ext)
}
parseResult, err := utils.ParseCardExcel(localPath)
if err != nil {
return nil, 0, err
}
cards := make(model.CardListJSON, 0, len(parseResult.Cards))
for _, card := range parseResult.Cards {
cards = append(cards, model.CardItem{
ICCID: card.ICCID,
MSISDN: card.MSISDN,
})
}
return cards, parseResult.TotalCount, nil
}
type importResult struct {
successCount int
skipCount int
failCount int
skippedItems model.ImportResultItems
failedItems model.ImportResultItems
}
func (h *IotCardImportHandler) processImport(ctx context.Context, task *model.IotCardImportTask) *importResult {
result := &importResult{
skippedItems: make(model.ImportResultItems, 0),
failedItems: make(model.ImportResultItems, 0),
}
cards := h.getCardsFromTask(task)
if len(cards) == 0 {
return result
}
for i := 0; i < len(cards); i += batchSize {
end := min(i+batchSize, len(cards))
batch := cards[i:end]
h.processBatch(ctx, task, batch, i+1, result)
}
return result
}
// getCardsFromTask 从任务中获取待导入的卡列表
func (h *IotCardImportHandler) getCardsFromTask(task *model.IotCardImportTask) []model.CardItem {
return []model.CardItem(task.CardList)
}
func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []model.CardItem, startLine int, result *importResult) {
type cardMeta struct {
line int
msisdn string
}
validCards := make([]model.CardItem, 0)
cardMetaMap := make(map[string]cardMeta)
for i, card := range batch {
line := startLine + i
cardMetaMap[card.ICCID] = cardMeta{line: line, msisdn: card.MSISDN}
validationResult := validator.ValidateICCID(card.ICCID, task.CarrierType)
if !validationResult.Valid {
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: line,
ICCID: card.ICCID,
MSISDN: card.MSISDN,
Reason: validationResult.Message,
})
result.failCount++
continue
}
validCards = append(validCards, card)
}
if len(validCards) == 0 {
return
}
validICCIDs := make([]string, len(validCards))
for i, card := range validCards {
validICCIDs[i] = card.ICCID
}
existingMap, err := h.iotCardStore.ExistsByICCIDBatch(ctx, validICCIDs)
if err != nil {
h.logger.Error("批量检查 ICCID 是否存在失败",
zap.Error(err),
zap.Int("batch_size", len(validICCIDs)),
)
for _, card := range validCards {
meta := cardMetaMap[card.ICCID]
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "数据库查询失败",
})
result.failCount++
}
return
}
newCards := make([]model.CardItem, 0)
for _, card := range validCards {
meta := cardMetaMap[card.ICCID]
if existingMap[card.ICCID] {
result.skippedItems = append(result.skippedItems, model.ImportResultItem{
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "ICCID 已存在",
})
result.skipCount++
} else {
newCards = append(newCards, card)
}
}
if len(newCards) == 0 {
return
}
iotCards := make([]*model.IotCard, 0, len(newCards))
now := time.Now()
for _, card := range newCards {
iotCard := &model.IotCard{
ICCID: card.ICCID,
MSISDN: card.MSISDN,
CarrierID: task.CarrierID,
BatchNo: task.BatchNo,
Status: constants.IotCardStatusInStock,
CardCategory: constants.CardCategoryNormal,
ActivationStatus: constants.ActivationStatusInactive,
RealNameStatus: constants.RealNameStatusNotVerified,
NetworkStatus: constants.NetworkStatusOffline,
}
iotCard.BaseModel.Creator = task.Creator
iotCard.BaseModel.Updater = task.Creator
iotCard.CreatedAt = now
iotCard.UpdatedAt = now
iotCards = append(iotCards, iotCard)
}
if err := h.iotCardStore.CreateBatch(ctx, iotCards); err != nil {
h.logger.Error("批量创建 IoT 卡失败",
zap.Error(err),
zap.Int("batch_size", len(iotCards)),
)
for _, card := range newCards {
meta := cardMetaMap[card.ICCID]
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "数据库写入失败",
})
result.failCount++
}
return
}
result.successCount += len(newCards)
// 通知轮询系统:批量卡已创建
if h.pollingCallback != nil {
h.pollingCallback.OnBatchCardsCreated(ctx, iotCards)
}
}