Files
junhong_cmp_fiber/internal/service/device_import/service.go
huang ce0783f96e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m30s
feat: 实现设备管理和设备导入功能,修复测试问题
主要变更:
- 实现设备管理模块(创建、查询、列表、更新状态、删除)
- 实现设备批量导入功能(CSV 解析、ICCID 绑定、异步任务处理)
- 添加设备-SIM 卡绑定约束(部分唯一索引防止并发问题)
- 修复 fee_rate 数据库字段类型(numeric -> bigint)
- 修复测试数据隔离问题(基于增量断言)
- 修复集成测试中间件顺序问题
- 清理无用测试文件(PersonalCustomer、Email 相关)
- 归档 enterprise-card-authorization 变更
2026-01-26 18:05:12 +08:00

210 lines
5.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package device_import
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/model/dto"
"github.com/break/junhong_cmp_fiber/internal/store"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/middleware"
"github.com/break/junhong_cmp_fiber/pkg/queue"
"gorm.io/gorm"
)
type Service struct {
db *gorm.DB
importTaskStore *postgres.DeviceImportTaskStore
queueClient *queue.Client
}
type DeviceImportPayload struct {
TaskID uint `json:"task_id"`
}
func New(db *gorm.DB, importTaskStore *postgres.DeviceImportTaskStore, queueClient *queue.Client) *Service {
return &Service{
db: db,
importTaskStore: importTaskStore,
queueClient: queueClient,
}
}
func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportDeviceRequest) (*dto.ImportDeviceResponse, error) {
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
}
taskNo := s.importTaskStore.GenerateTaskNo(ctx)
fileName := filepath.Base(req.FileKey)
task := &model.DeviceImportTask{
TaskNo: taskNo,
Status: model.ImportTaskStatusPending,
BatchNo: req.BatchNo,
FileName: fileName,
StorageKey: req.FileKey,
}
task.Creator = userID
task.Updater = userID
if err := s.importTaskStore.Create(ctx, task); err != nil {
return nil, fmt.Errorf("创建导入任务失败: %w", err)
}
payload := DeviceImportPayload{TaskID: task.ID}
err := s.queueClient.EnqueueTask(ctx, constants.TaskTypeDeviceImport, payload)
if err != nil {
s.importTaskStore.UpdateStatus(ctx, task.ID, model.ImportTaskStatusFailed, "任务入队失败: "+err.Error())
return nil, fmt.Errorf("任务入队失败: %w", err)
}
return &dto.ImportDeviceResponse{
TaskID: task.ID,
TaskNo: taskNo,
Message: "导入任务已创建Worker 将异步处理文件",
}, nil
}
func (s *Service) List(ctx context.Context, req *dto.ListDeviceImportTaskRequest) (*dto.ListDeviceImportTaskResponse, error) {
page := req.Page
pageSize := req.PageSize
if page == 0 {
page = 1
}
if pageSize == 0 {
pageSize = constants.DefaultPageSize
}
opts := &store.QueryOptions{
Page: page,
PageSize: pageSize,
}
filters := make(map[string]interface{})
if req.Status != nil {
filters["status"] = *req.Status
}
if req.BatchNo != "" {
filters["batch_no"] = req.BatchNo
}
if req.StartTime != nil {
filters["start_time"] = *req.StartTime
}
if req.EndTime != nil {
filters["end_time"] = *req.EndTime
}
tasks, total, err := s.importTaskStore.List(ctx, opts, filters)
if err != nil {
return nil, err
}
list := make([]*dto.DeviceImportTaskResponse, 0, len(tasks))
for _, task := range tasks {
list = append(list, s.toTaskResponse(task))
}
totalPages := int(total) / pageSize
if int(total)%pageSize > 0 {
totalPages++
}
return &dto.ListDeviceImportTaskResponse{
List: list,
Total: total,
Page: page,
PageSize: pageSize,
TotalPages: totalPages,
}, nil
}
func (s *Service) GetByID(ctx context.Context, id uint) (*dto.DeviceImportTaskDetailResponse, error) {
task, err := s.importTaskStore.GetByID(ctx, id)
if err != nil {
return nil, errors.New(errors.CodeNotFound, "导入任务不存在")
}
resp := &dto.DeviceImportTaskDetailResponse{
DeviceImportTaskResponse: *s.toTaskResponse(task),
SkippedItems: make([]*dto.DeviceImportResultItemDTO, 0),
FailedItems: make([]*dto.DeviceImportResultItemDTO, 0),
WarningItems: make([]*dto.DeviceImportResultItemDTO, 0),
}
for _, item := range task.SkippedItems {
resp.SkippedItems = append(resp.SkippedItems, &dto.DeviceImportResultItemDTO{
Line: item.Line,
DeviceNo: item.ICCID,
Reason: item.Reason,
})
}
for _, item := range task.FailedItems {
resp.FailedItems = append(resp.FailedItems, &dto.DeviceImportResultItemDTO{
Line: item.Line,
DeviceNo: item.ICCID,
Reason: item.Reason,
})
}
for _, item := range task.WarningItems {
resp.WarningItems = append(resp.WarningItems, &dto.DeviceImportResultItemDTO{
Line: item.Line,
DeviceNo: item.ICCID,
Reason: item.Reason,
})
}
return resp, nil
}
func (s *Service) toTaskResponse(task *model.DeviceImportTask) *dto.DeviceImportTaskResponse {
var startedAt, completedAt *time.Time
if task.StartedAt != nil {
startedAt = task.StartedAt
}
if task.CompletedAt != nil {
completedAt = task.CompletedAt
}
return &dto.DeviceImportTaskResponse{
ID: task.ID,
TaskNo: task.TaskNo,
Status: task.Status,
StatusText: getStatusText(task.Status),
BatchNo: task.BatchNo,
FileName: task.FileName,
TotalCount: task.TotalCount,
SuccessCount: task.SuccessCount,
SkipCount: task.SkipCount,
FailCount: task.FailCount,
WarningCount: task.WarningCount,
StartedAt: startedAt,
CompletedAt: completedAt,
ErrorMessage: task.ErrorMessage,
CreatedAt: task.CreatedAt,
}
}
func getStatusText(status int) string {
switch status {
case model.ImportTaskStatusPending:
return "待处理"
case model.ImportTaskStatusProcessing:
return "处理中"
case model.ImportTaskStatusCompleted:
return "已完成"
case model.ImportTaskStatusFailed:
return "失败"
default:
return "未知"
}
}