All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m3s
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
376 lines
10 KiB
Go
376 lines
10 KiB
Go
package task
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/bytedance/sonic"
|
|
"github.com/hibiken/asynq"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/break/junhong_cmp_fiber/internal/model"
|
|
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
|
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
|
"github.com/break/junhong_cmp_fiber/pkg/storage"
|
|
"github.com/break/junhong_cmp_fiber/pkg/utils"
|
|
"github.com/break/junhong_cmp_fiber/pkg/validator"
|
|
)
|
|
|
|
var (
|
|
ErrStorageNotConfigured = errors.New("对象存储服务未配置")
|
|
ErrStorageKeyEmpty = errors.New("文件存储路径为空")
|
|
)
|
|
|
|
const batchSize = 1000
|
|
|
|
type IotCardImportPayload struct {
|
|
TaskID uint `json:"task_id"`
|
|
}
|
|
|
|
// PollingCallback 轮询回调接口
|
|
// 用于在卡创建/删除/状态变化时通知轮询系统
|
|
type PollingCallback interface {
|
|
// OnBatchCardsCreated 批量卡创建时的回调
|
|
OnBatchCardsCreated(ctx context.Context, cards []*model.IotCard)
|
|
}
|
|
|
|
type IotCardImportHandler struct {
|
|
db *gorm.DB
|
|
redis *redis.Client
|
|
importTaskStore *postgres.IotCardImportTaskStore
|
|
iotCardStore *postgres.IotCardStore
|
|
storageService *storage.Service
|
|
pollingCallback PollingCallback
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewIotCardImportHandler(
|
|
db *gorm.DB,
|
|
redis *redis.Client,
|
|
importTaskStore *postgres.IotCardImportTaskStore,
|
|
iotCardStore *postgres.IotCardStore,
|
|
storageSvc *storage.Service,
|
|
pollingCallback PollingCallback,
|
|
logger *zap.Logger,
|
|
) *IotCardImportHandler {
|
|
return &IotCardImportHandler{
|
|
db: db,
|
|
redis: redis,
|
|
importTaskStore: importTaskStore,
|
|
iotCardStore: iotCardStore,
|
|
storageService: storageSvc,
|
|
pollingCallback: pollingCallback,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *asynq.Task) error {
|
|
var payload IotCardImportPayload
|
|
if err := sonic.Unmarshal(task.Payload(), &payload); err != nil {
|
|
h.logger.Error("解析 IoT 卡导入任务载荷失败",
|
|
zap.Error(err),
|
|
zap.String("task_id", task.ResultWriter().TaskID()),
|
|
)
|
|
return asynq.SkipRetry
|
|
}
|
|
|
|
importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID)
|
|
if err != nil {
|
|
h.logger.Error("获取导入任务失败",
|
|
zap.Uint("task_id", payload.TaskID),
|
|
zap.Error(err),
|
|
)
|
|
return asynq.SkipRetry
|
|
}
|
|
|
|
if importTask.Status != model.ImportTaskStatusPending {
|
|
h.logger.Info("导入任务已处理,跳过",
|
|
zap.Uint("task_id", payload.TaskID),
|
|
zap.Int("status", importTask.Status),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "")
|
|
|
|
h.logger.Info("开始处理 IoT 卡导入任务",
|
|
zap.Uint("task_id", importTask.ID),
|
|
zap.String("task_no", importTask.TaskNo),
|
|
zap.String("storage_key", importTask.StorageKey),
|
|
)
|
|
|
|
cards, totalCount, err := h.downloadAndParse(ctx, importTask)
|
|
if err != nil {
|
|
h.logger.Error("下载或解析 Excel 失败",
|
|
zap.Uint("task_id", importTask.ID),
|
|
zap.Error(err),
|
|
)
|
|
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error())
|
|
return asynq.SkipRetry
|
|
}
|
|
|
|
importTask.CardList = cards
|
|
importTask.TotalCount = totalCount
|
|
h.importTaskStore.UpdateCardList(ctx, importTask.ID, cards, totalCount)
|
|
|
|
result := h.processImport(ctx, importTask)
|
|
|
|
h.importTaskStore.UpdateResult(ctx, importTask.ID, result.successCount, result.skipCount, result.failCount, result.skippedItems, result.failedItems)
|
|
|
|
if result.failCount > 0 && result.successCount == 0 {
|
|
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败")
|
|
} else {
|
|
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "")
|
|
}
|
|
|
|
h.logger.Info("IoT 卡导入任务完成",
|
|
zap.Uint("task_id", importTask.ID),
|
|
zap.Int("success_count", result.successCount),
|
|
zap.Int("skip_count", result.skipCount),
|
|
zap.Int("fail_count", result.failCount),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *IotCardImportHandler) downloadAndParse(ctx context.Context, task *model.IotCardImportTask) (model.CardListJSON, int, error) {
|
|
if h.storageService == nil {
|
|
return nil, 0, ErrStorageNotConfigured
|
|
}
|
|
|
|
if task.StorageKey == "" {
|
|
return nil, 0, ErrStorageKeyEmpty
|
|
}
|
|
|
|
localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer cleanup()
|
|
|
|
if !strings.HasSuffix(strings.ToLower(task.FileName), ".xlsx") {
|
|
ext := filepath.Ext(task.FileName)
|
|
return nil, 0, fmt.Errorf("不支持的文件格式 %s,请上传Excel文件(.xlsx)", ext)
|
|
}
|
|
|
|
parseResult, err := utils.ParseCardExcel(localPath)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
cards := make(model.CardListJSON, 0, len(parseResult.Cards))
|
|
for _, card := range parseResult.Cards {
|
|
cards = append(cards, model.CardItem{
|
|
ICCID: card.ICCID,
|
|
MSISDN: card.MSISDN,
|
|
VirtualNo: card.VirtualNo,
|
|
})
|
|
}
|
|
|
|
return cards, parseResult.TotalCount, nil
|
|
}
|
|
|
|
type importResult struct {
|
|
successCount int
|
|
skipCount int
|
|
failCount int
|
|
skippedItems model.ImportResultItems
|
|
failedItems model.ImportResultItems
|
|
}
|
|
|
|
func (h *IotCardImportHandler) processImport(ctx context.Context, task *model.IotCardImportTask) *importResult {
|
|
result := &importResult{
|
|
skippedItems: make(model.ImportResultItems, 0),
|
|
failedItems: make(model.ImportResultItems, 0),
|
|
}
|
|
|
|
cards := h.getCardsFromTask(task)
|
|
if len(cards) == 0 {
|
|
return result
|
|
}
|
|
|
|
for i := 0; i < len(cards); i += batchSize {
|
|
end := min(i+batchSize, len(cards))
|
|
batch := cards[i:end]
|
|
h.processBatch(ctx, task, batch, i+1, result)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// getCardsFromTask 从任务中获取待导入的卡列表
|
|
func (h *IotCardImportHandler) getCardsFromTask(task *model.IotCardImportTask) []model.CardItem {
|
|
return []model.CardItem(task.CardList)
|
|
}
|
|
|
|
func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []model.CardItem, startLine int, result *importResult) {
|
|
type cardMeta struct {
|
|
line int
|
|
msisdn string
|
|
virtualNo string
|
|
}
|
|
validCards := make([]model.CardItem, 0)
|
|
cardMetaMap := make(map[string]cardMeta)
|
|
|
|
for i, card := range batch {
|
|
line := startLine + i
|
|
cardMetaMap[card.ICCID] = cardMeta{line: line, msisdn: card.MSISDN, virtualNo: card.VirtualNo}
|
|
|
|
validationResult := validator.ValidateICCID(card.ICCID, task.CarrierType)
|
|
if !validationResult.Valid {
|
|
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
|
Line: line,
|
|
ICCID: card.ICCID,
|
|
MSISDN: card.MSISDN,
|
|
Reason: validationResult.Message,
|
|
})
|
|
result.failCount++
|
|
continue
|
|
}
|
|
validCards = append(validCards, card)
|
|
}
|
|
|
|
if len(validCards) == 0 {
|
|
return
|
|
}
|
|
|
|
validICCIDs := make([]string, len(validCards))
|
|
for i, card := range validCards {
|
|
validICCIDs[i] = card.ICCID
|
|
}
|
|
|
|
existingMap, err := h.iotCardStore.ExistsByICCIDBatch(ctx, validICCIDs)
|
|
if err != nil {
|
|
h.logger.Error("批量检查 ICCID 是否存在失败",
|
|
zap.Error(err),
|
|
zap.Int("batch_size", len(validICCIDs)),
|
|
)
|
|
for _, card := range validCards {
|
|
meta := cardMetaMap[card.ICCID]
|
|
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
|
Line: meta.line,
|
|
ICCID: card.ICCID,
|
|
MSISDN: meta.msisdn,
|
|
Reason: "数据库查询失败",
|
|
})
|
|
result.failCount++
|
|
}
|
|
return
|
|
}
|
|
|
|
newCards := make([]model.CardItem, 0)
|
|
for _, card := range validCards {
|
|
meta := cardMetaMap[card.ICCID]
|
|
if existingMap[card.ICCID] {
|
|
result.skippedItems = append(result.skippedItems, model.ImportResultItem{
|
|
Line: meta.line,
|
|
ICCID: card.ICCID,
|
|
MSISDN: meta.msisdn,
|
|
Reason: "ICCID 已存在",
|
|
})
|
|
result.skipCount++
|
|
} else {
|
|
newCards = append(newCards, card)
|
|
}
|
|
}
|
|
|
|
if len(newCards) == 0 {
|
|
return
|
|
}
|
|
|
|
// 批量检查 virtual_no 唯一性
|
|
virtualNos := make([]string, 0)
|
|
for _, card := range newCards {
|
|
if card.VirtualNo != "" {
|
|
virtualNos = append(virtualNos, card.VirtualNo)
|
|
}
|
|
}
|
|
existingVirtualNos := make(map[string]bool)
|
|
if len(virtualNos) > 0 {
|
|
existingVirtualNos, err = h.iotCardStore.ExistsByVirtualNoBatch(ctx, virtualNos)
|
|
if err != nil {
|
|
h.logger.Error("批量检查 virtual_no 是否存在失败",
|
|
zap.Error(err),
|
|
zap.Int("batch_size", len(virtualNos)),
|
|
)
|
|
}
|
|
}
|
|
// 批内去重:记录本批次已分配的 virtual_no
|
|
batchUsedVirtualNos := make(map[string]bool)
|
|
|
|
finalCards := make([]model.CardItem, 0, len(newCards))
|
|
for _, card := range newCards {
|
|
meta := cardMetaMap[card.ICCID]
|
|
if card.VirtualNo != "" {
|
|
if existingVirtualNos[card.VirtualNo] || batchUsedVirtualNos[card.VirtualNo] {
|
|
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
|
Line: meta.line,
|
|
ICCID: card.ICCID,
|
|
MSISDN: meta.msisdn,
|
|
Reason: "virtual_no 已被占用: " + card.VirtualNo,
|
|
})
|
|
result.failCount++
|
|
continue
|
|
}
|
|
batchUsedVirtualNos[card.VirtualNo] = true
|
|
}
|
|
finalCards = append(finalCards, card)
|
|
}
|
|
|
|
if len(finalCards) == 0 {
|
|
return
|
|
}
|
|
|
|
iotCards := make([]*model.IotCard, 0, len(finalCards))
|
|
now := time.Now()
|
|
for _, card := range finalCards {
|
|
iotCard := &model.IotCard{
|
|
ICCID: card.ICCID,
|
|
MSISDN: card.MSISDN,
|
|
VirtualNo: card.VirtualNo,
|
|
CarrierID: task.CarrierID,
|
|
BatchNo: task.BatchNo,
|
|
Status: constants.IotCardStatusInStock,
|
|
CardCategory: constants.CardCategoryNormal,
|
|
ActivationStatus: constants.ActivationStatusInactive,
|
|
RealNameStatus: constants.RealNameStatusNotVerified,
|
|
NetworkStatus: constants.NetworkStatusOffline,
|
|
}
|
|
iotCard.BaseModel.Creator = task.Creator
|
|
iotCard.BaseModel.Updater = task.Creator
|
|
iotCard.CreatedAt = now
|
|
iotCard.UpdatedAt = now
|
|
iotCards = append(iotCards, iotCard)
|
|
}
|
|
|
|
if err := h.iotCardStore.CreateBatch(ctx, iotCards); err != nil {
|
|
h.logger.Error("批量创建 IoT 卡失败",
|
|
zap.Error(err),
|
|
zap.Int("batch_size", len(iotCards)),
|
|
)
|
|
for _, card := range finalCards {
|
|
meta := cardMetaMap[card.ICCID]
|
|
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
|
Line: meta.line,
|
|
ICCID: card.ICCID,
|
|
MSISDN: meta.msisdn,
|
|
Reason: "数据库写入失败",
|
|
})
|
|
result.failCount++
|
|
}
|
|
return
|
|
}
|
|
|
|
result.successCount += len(finalCards)
|
|
|
|
if h.pollingCallback != nil {
|
|
h.pollingCallback.OnBatchCardsCreated(ctx, iotCards)
|
|
}
|
|
}
|