feat: 实现设备管理和设备导入功能,修复测试问题
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m30s
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m30s
主要变更: - 实现设备管理模块(创建、查询、列表、更新状态、删除) - 实现设备批量导入功能(CSV 解析、ICCID 绑定、异步任务处理) - 添加设备-SIM 卡绑定约束(部分唯一索引防止并发问题) - 修复 fee_rate 数据库字段类型(numeric -> bigint) - 修复测试数据隔离问题(基于增量断言) - 修复集成测试中间件顺序问题 - 清理无用测试文件(PersonalCustomer、Email 相关) - 归档 enterprise-card-authorization 变更
This commit is contained in:
439
internal/task/device_import.go
Normal file
439
internal/task/device_import.go
Normal file
@@ -0,0 +1,439 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/csv"
|
||||
stderrors "errors"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bytedance/sonic"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||||
pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/storage"
|
||||
)
|
||||
|
||||
const deviceBatchSize = 100
|
||||
|
||||
type DeviceImportPayload struct {
|
||||
TaskID uint `json:"task_id"`
|
||||
}
|
||||
|
||||
type DeviceImportHandler struct {
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
importTaskStore *postgres.DeviceImportTaskStore
|
||||
deviceStore *postgres.DeviceStore
|
||||
deviceSimBindingStore *postgres.DeviceSimBindingStore
|
||||
iotCardStore *postgres.IotCardStore
|
||||
storageService *storage.Service
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewDeviceImportHandler(
|
||||
db *gorm.DB,
|
||||
redis *redis.Client,
|
||||
importTaskStore *postgres.DeviceImportTaskStore,
|
||||
deviceStore *postgres.DeviceStore,
|
||||
deviceSimBindingStore *postgres.DeviceSimBindingStore,
|
||||
iotCardStore *postgres.IotCardStore,
|
||||
storageSvc *storage.Service,
|
||||
logger *zap.Logger,
|
||||
) *DeviceImportHandler {
|
||||
return &DeviceImportHandler{
|
||||
db: db,
|
||||
redis: redis,
|
||||
importTaskStore: importTaskStore,
|
||||
deviceStore: deviceStore,
|
||||
deviceSimBindingStore: deviceSimBindingStore,
|
||||
iotCardStore: iotCardStore,
|
||||
storageService: storageSvc,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) HandleDeviceImport(ctx context.Context, task *asynq.Task) error {
|
||||
ctx = pkggorm.SkipDataPermission(ctx)
|
||||
|
||||
var payload DeviceImportPayload
|
||||
if err := sonic.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
h.logger.Error("解析设备导入任务载荷失败",
|
||||
zap.Error(err),
|
||||
zap.String("task_id", task.ResultWriter().TaskID()),
|
||||
)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
importTask, err := h.importTaskStore.GetByID(ctx, payload.TaskID)
|
||||
if err != nil {
|
||||
h.logger.Error("获取导入任务失败",
|
||||
zap.Uint("task_id", payload.TaskID),
|
||||
zap.Error(err),
|
||||
)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
if importTask.Status != model.ImportTaskStatusPending {
|
||||
h.logger.Info("导入任务已处理,跳过",
|
||||
zap.Uint("task_id", payload.TaskID),
|
||||
zap.Int("status", importTask.Status),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusProcessing, "")
|
||||
|
||||
h.logger.Info("开始处理设备导入任务",
|
||||
zap.Uint("task_id", importTask.ID),
|
||||
zap.String("task_no", importTask.TaskNo),
|
||||
zap.String("storage_key", importTask.StorageKey),
|
||||
)
|
||||
|
||||
rows, totalCount, err := h.downloadAndParseCSV(ctx, importTask)
|
||||
if err != nil {
|
||||
h.logger.Error("下载或解析 CSV 失败",
|
||||
zap.Uint("task_id", importTask.ID),
|
||||
zap.Error(err),
|
||||
)
|
||||
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error())
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
result := h.processImport(ctx, importTask, rows, totalCount)
|
||||
|
||||
h.importTaskStore.UpdateResult(ctx, importTask.ID, totalCount, result.successCount, result.skipCount, result.failCount, 0, result.skippedItems, result.failedItems, nil)
|
||||
|
||||
if result.failCount > 0 && result.successCount == 0 {
|
||||
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, "所有导入均失败")
|
||||
} else {
|
||||
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusCompleted, "")
|
||||
}
|
||||
|
||||
h.logger.Info("设备导入任务完成",
|
||||
zap.Uint("task_id", importTask.ID),
|
||||
zap.Int("success_count", result.successCount),
|
||||
zap.Int("skip_count", result.skipCount),
|
||||
zap.Int("fail_count", result.failCount),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type deviceRow struct {
|
||||
Line int
|
||||
DeviceNo string
|
||||
DeviceName string
|
||||
DeviceModel string
|
||||
DeviceType string
|
||||
MaxSimSlots int
|
||||
Manufacturer string
|
||||
ICCIDs []string
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) downloadAndParseCSV(ctx context.Context, task *model.DeviceImportTask) ([]deviceRow, int, error) {
|
||||
if h.storageService == nil {
|
||||
return nil, 0, ErrStorageNotConfigured
|
||||
}
|
||||
|
||||
if task.StorageKey == "" {
|
||||
return nil, 0, ErrStorageKeyEmpty
|
||||
}
|
||||
|
||||
localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
f, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return h.parseDeviceCSV(f)
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) parseDeviceCSV(r io.Reader) ([]deviceRow, int, error) {
|
||||
reader := csv.NewReader(r)
|
||||
reader.FieldsPerRecord = -1
|
||||
reader.TrimLeadingSpace = true
|
||||
|
||||
header, err := reader.Read()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
colIndex := h.buildColumnIndex(header)
|
||||
if colIndex["device_no"] == -1 {
|
||||
return nil, 0, ErrMissingDeviceNoColumn
|
||||
}
|
||||
|
||||
var rows []deviceRow
|
||||
lineNum := 1
|
||||
|
||||
for {
|
||||
record, err := reader.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
lineNum++
|
||||
|
||||
row := deviceRow{Line: lineNum}
|
||||
|
||||
if idx := colIndex["device_no"]; idx >= 0 && idx < len(record) {
|
||||
row.DeviceNo = strings.TrimSpace(record[idx])
|
||||
}
|
||||
if idx := colIndex["device_name"]; idx >= 0 && idx < len(record) {
|
||||
row.DeviceName = strings.TrimSpace(record[idx])
|
||||
}
|
||||
if idx := colIndex["device_model"]; idx >= 0 && idx < len(record) {
|
||||
row.DeviceModel = strings.TrimSpace(record[idx])
|
||||
}
|
||||
if idx := colIndex["device_type"]; idx >= 0 && idx < len(record) {
|
||||
row.DeviceType = strings.TrimSpace(record[idx])
|
||||
}
|
||||
if idx := colIndex["max_sim_slots"]; idx >= 0 && idx < len(record) {
|
||||
if n, err := strconv.Atoi(strings.TrimSpace(record[idx])); err == nil {
|
||||
row.MaxSimSlots = n
|
||||
}
|
||||
}
|
||||
if idx := colIndex["manufacturer"]; idx >= 0 && idx < len(record) {
|
||||
row.Manufacturer = strings.TrimSpace(record[idx])
|
||||
}
|
||||
|
||||
row.ICCIDs = make([]string, 0, 4)
|
||||
for i := 1; i <= 4; i++ {
|
||||
colName := "iccid_" + strconv.Itoa(i)
|
||||
if idx := colIndex[colName]; idx >= 0 && idx < len(record) {
|
||||
iccid := strings.TrimSpace(record[idx])
|
||||
if iccid != "" {
|
||||
row.ICCIDs = append(row.ICCIDs, iccid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if row.DeviceNo == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if row.MaxSimSlots == 0 {
|
||||
row.MaxSimSlots = 4
|
||||
}
|
||||
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return rows, len(rows), nil
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) buildColumnIndex(header []string) map[string]int {
|
||||
index := map[string]int{
|
||||
"device_no": -1,
|
||||
"device_name": -1,
|
||||
"device_model": -1,
|
||||
"device_type": -1,
|
||||
"max_sim_slots": -1,
|
||||
"manufacturer": -1,
|
||||
"iccid_1": -1,
|
||||
"iccid_2": -1,
|
||||
"iccid_3": -1,
|
||||
"iccid_4": -1,
|
||||
}
|
||||
|
||||
for i, col := range header {
|
||||
col = strings.ToLower(strings.TrimSpace(col))
|
||||
if _, exists := index[col]; exists {
|
||||
index[col] = i
|
||||
}
|
||||
}
|
||||
|
||||
return index
|
||||
}
|
||||
|
||||
type deviceImportResult struct {
|
||||
successCount int
|
||||
skipCount int
|
||||
failCount int
|
||||
skippedItems model.ImportResultItems
|
||||
failedItems model.ImportResultItems
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) processImport(ctx context.Context, task *model.DeviceImportTask, rows []deviceRow, totalCount int) *deviceImportResult {
|
||||
result := &deviceImportResult{
|
||||
skippedItems: make(model.ImportResultItems, 0),
|
||||
failedItems: make(model.ImportResultItems, 0),
|
||||
}
|
||||
|
||||
if len(rows) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
for i := 0; i < len(rows); i += deviceBatchSize {
|
||||
end := min(i+deviceBatchSize, len(rows))
|
||||
batch := rows[i:end]
|
||||
h.processBatch(ctx, task, batch, result)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (h *DeviceImportHandler) processBatch(ctx context.Context, task *model.DeviceImportTask, batch []deviceRow, result *deviceImportResult) {
|
||||
deviceNos := make([]string, 0, len(batch))
|
||||
allICCIDs := make([]string, 0)
|
||||
|
||||
for _, row := range batch {
|
||||
deviceNos = append(deviceNos, row.DeviceNo)
|
||||
allICCIDs = append(allICCIDs, row.ICCIDs...)
|
||||
}
|
||||
|
||||
existingDevices, err := h.deviceStore.ExistsByDeviceNoBatch(ctx, deviceNos)
|
||||
if err != nil {
|
||||
h.logger.Error("检查设备是否存在失败", zap.Error(err))
|
||||
for _, row := range batch {
|
||||
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
||||
Line: row.Line,
|
||||
ICCID: row.DeviceNo,
|
||||
Reason: "数据库查询失败",
|
||||
})
|
||||
result.failCount++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var existingCards map[string]*model.IotCard
|
||||
var boundCards map[string]bool
|
||||
if len(allICCIDs) > 0 {
|
||||
cards, err := h.iotCardStore.GetByICCIDs(ctx, allICCIDs)
|
||||
if err != nil {
|
||||
h.logger.Error("查询卡信息失败", zap.Error(err))
|
||||
} else {
|
||||
existingCards = make(map[string]*model.IotCard)
|
||||
for _, card := range cards {
|
||||
existingCards[card.ICCID] = card
|
||||
}
|
||||
}
|
||||
|
||||
boundCards, err = h.deviceSimBindingStore.GetBoundICCIDs(ctx, allICCIDs)
|
||||
if err != nil {
|
||||
h.logger.Error("查询卡绑定状态失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
for _, row := range batch {
|
||||
if existingDevices[row.DeviceNo] {
|
||||
result.skippedItems = append(result.skippedItems, model.ImportResultItem{
|
||||
Line: row.Line,
|
||||
ICCID: row.DeviceNo,
|
||||
Reason: "设备号已存在",
|
||||
})
|
||||
result.skipCount++
|
||||
continue
|
||||
}
|
||||
|
||||
var validCardIDs []uint
|
||||
var cardIssues []string
|
||||
|
||||
for _, iccid := range row.ICCIDs {
|
||||
card, exists := existingCards[iccid]
|
||||
if !exists {
|
||||
cardIssues = append(cardIssues, iccid+"不存在")
|
||||
continue
|
||||
}
|
||||
if boundCards[iccid] {
|
||||
cardIssues = append(cardIssues, iccid+"已绑定其他设备")
|
||||
continue
|
||||
}
|
||||
if card.ShopID != nil {
|
||||
cardIssues = append(cardIssues, iccid+"已分配给店铺,不能绑定到平台库存设备")
|
||||
continue
|
||||
}
|
||||
validCardIDs = append(validCardIDs, card.ID)
|
||||
}
|
||||
|
||||
if len(row.ICCIDs) > 0 && len(cardIssues) > 0 {
|
||||
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
||||
Line: row.Line,
|
||||
ICCID: row.DeviceNo,
|
||||
Reason: "卡验证失败: " + strings.Join(cardIssues, ", "),
|
||||
})
|
||||
result.failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
err := h.db.Transaction(func(tx *gorm.DB) error {
|
||||
txDeviceStore := postgres.NewDeviceStore(tx, nil)
|
||||
txBindingStore := postgres.NewDeviceSimBindingStore(tx, nil)
|
||||
|
||||
device := &model.Device{
|
||||
DeviceNo: row.DeviceNo,
|
||||
DeviceName: row.DeviceName,
|
||||
DeviceModel: row.DeviceModel,
|
||||
DeviceType: row.DeviceType,
|
||||
MaxSimSlots: row.MaxSimSlots,
|
||||
Manufacturer: row.Manufacturer,
|
||||
BatchNo: task.BatchNo,
|
||||
Status: constants.DeviceStatusInStock,
|
||||
}
|
||||
device.Creator = task.Creator
|
||||
device.Updater = task.Creator
|
||||
|
||||
if err := txDeviceStore.Create(ctx, device); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for i, cardID := range validCardIDs {
|
||||
binding := &model.DeviceSimBinding{
|
||||
DeviceID: device.ID,
|
||||
IotCardID: cardID,
|
||||
SlotPosition: i + 1,
|
||||
BindStatus: 1,
|
||||
BindTime: &now,
|
||||
}
|
||||
if err := txBindingStore.Create(ctx, binding); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.logger.Error("创建设备失败",
|
||||
zap.String("device_no", row.DeviceNo),
|
||||
zap.Error(err),
|
||||
)
|
||||
result.failedItems = append(result.failedItems, model.ImportResultItem{
|
||||
Line: row.Line,
|
||||
ICCID: row.DeviceNo,
|
||||
Reason: "数据库写入失败: " + err.Error(),
|
||||
})
|
||||
result.failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
for _, iccid := range row.ICCIDs {
|
||||
if card, exists := existingCards[iccid]; exists && !boundCards[iccid] && card.ShopID == nil {
|
||||
boundCards[iccid] = true
|
||||
}
|
||||
}
|
||||
|
||||
result.successCount++
|
||||
}
|
||||
}
|
||||
|
||||
var ErrMissingDeviceNoColumn = stderrors.New("CSV 缺少 device_no 列")
|
||||
Reference in New Issue
Block a user