All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m3s
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
209 lines
5.5 KiB
Go
209 lines
5.5 KiB
Go
package device_import
|
||
|
||
import (
|
||
"context"
|
||
"path/filepath"
|
||
"time"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||
"github.com/break/junhong_cmp_fiber/internal/model/dto"
|
||
"github.com/break/junhong_cmp_fiber/internal/store"
|
||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||
"github.com/break/junhong_cmp_fiber/pkg/middleware"
|
||
"github.com/break/junhong_cmp_fiber/pkg/queue"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type Service struct {
|
||
db *gorm.DB
|
||
importTaskStore *postgres.DeviceImportTaskStore
|
||
queueClient *queue.Client
|
||
}
|
||
|
||
type DeviceImportPayload struct {
|
||
TaskID uint `json:"task_id"`
|
||
}
|
||
|
||
func New(db *gorm.DB, importTaskStore *postgres.DeviceImportTaskStore, queueClient *queue.Client) *Service {
|
||
return &Service{
|
||
db: db,
|
||
importTaskStore: importTaskStore,
|
||
queueClient: queueClient,
|
||
}
|
||
}
|
||
|
||
func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportDeviceRequest) (*dto.ImportDeviceResponse, error) {
|
||
userID := middleware.GetUserIDFromContext(ctx)
|
||
if userID == 0 {
|
||
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
|
||
}
|
||
|
||
taskNo := s.importTaskStore.GenerateTaskNo(ctx)
|
||
fileName := filepath.Base(req.FileKey)
|
||
|
||
task := &model.DeviceImportTask{
|
||
TaskNo: taskNo,
|
||
Status: model.ImportTaskStatusPending,
|
||
BatchNo: req.BatchNo,
|
||
FileName: fileName,
|
||
StorageKey: req.FileKey,
|
||
}
|
||
task.Creator = userID
|
||
task.Updater = userID
|
||
|
||
if err := s.importTaskStore.Create(ctx, task); err != nil {
|
||
return nil, errors.Wrap(errors.CodeInternalError, err, "创建导入任务失败")
|
||
}
|
||
|
||
payload := DeviceImportPayload{TaskID: task.ID}
|
||
err := s.queueClient.EnqueueTask(ctx, constants.TaskTypeDeviceImport, payload)
|
||
if err != nil {
|
||
s.importTaskStore.UpdateStatus(ctx, task.ID, model.ImportTaskStatusFailed, "任务入队失败: "+err.Error())
|
||
return nil, errors.Wrap(errors.CodeInternalError, err, "任务入队失败")
|
||
}
|
||
|
||
return &dto.ImportDeviceResponse{
|
||
TaskID: task.ID,
|
||
TaskNo: taskNo,
|
||
Message: "导入任务已创建,Worker 将异步处理文件",
|
||
}, nil
|
||
}
|
||
|
||
func (s *Service) List(ctx context.Context, req *dto.ListDeviceImportTaskRequest) (*dto.ListDeviceImportTaskResponse, error) {
|
||
page := req.Page
|
||
pageSize := req.PageSize
|
||
if page == 0 {
|
||
page = 1
|
||
}
|
||
if pageSize == 0 {
|
||
pageSize = constants.DefaultPageSize
|
||
}
|
||
|
||
opts := &store.QueryOptions{
|
||
Page: page,
|
||
PageSize: pageSize,
|
||
}
|
||
|
||
filters := make(map[string]interface{})
|
||
if req.Status != nil {
|
||
filters["status"] = *req.Status
|
||
}
|
||
if req.BatchNo != "" {
|
||
filters["batch_no"] = req.BatchNo
|
||
}
|
||
if req.StartTime != nil {
|
||
filters["start_time"] = *req.StartTime
|
||
}
|
||
if req.EndTime != nil {
|
||
filters["end_time"] = *req.EndTime
|
||
}
|
||
|
||
tasks, total, err := s.importTaskStore.List(ctx, opts, filters)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
list := make([]*dto.DeviceImportTaskResponse, 0, len(tasks))
|
||
for _, task := range tasks {
|
||
list = append(list, s.toTaskResponse(task))
|
||
}
|
||
|
||
totalPages := int(total) / pageSize
|
||
if int(total)%pageSize > 0 {
|
||
totalPages++
|
||
}
|
||
|
||
return &dto.ListDeviceImportTaskResponse{
|
||
List: list,
|
||
Total: total,
|
||
Page: page,
|
||
PageSize: pageSize,
|
||
TotalPages: totalPages,
|
||
}, nil
|
||
}
|
||
|
||
func (s *Service) GetByID(ctx context.Context, id uint) (*dto.DeviceImportTaskDetailResponse, error) {
|
||
task, err := s.importTaskStore.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, errors.New(errors.CodeNotFound, "导入任务不存在")
|
||
}
|
||
|
||
resp := &dto.DeviceImportTaskDetailResponse{
|
||
DeviceImportTaskResponse: *s.toTaskResponse(task),
|
||
SkippedItems: make([]*dto.DeviceImportResultItemDTO, 0),
|
||
FailedItems: make([]*dto.DeviceImportResultItemDTO, 0),
|
||
WarningItems: make([]*dto.DeviceImportResultItemDTO, 0),
|
||
}
|
||
|
||
for _, item := range task.SkippedItems {
|
||
resp.SkippedItems = append(resp.SkippedItems, &dto.DeviceImportResultItemDTO{
|
||
Line: item.Line,
|
||
VirtualNo: item.ICCID,
|
||
Reason: item.Reason,
|
||
})
|
||
}
|
||
|
||
for _, item := range task.FailedItems {
|
||
resp.FailedItems = append(resp.FailedItems, &dto.DeviceImportResultItemDTO{
|
||
Line: item.Line,
|
||
VirtualNo: item.ICCID,
|
||
Reason: item.Reason,
|
||
})
|
||
}
|
||
|
||
for _, item := range task.WarningItems {
|
||
resp.WarningItems = append(resp.WarningItems, &dto.DeviceImportResultItemDTO{
|
||
Line: item.Line,
|
||
VirtualNo: item.ICCID,
|
||
Reason: item.Reason,
|
||
})
|
||
}
|
||
|
||
return resp, nil
|
||
}
|
||
|
||
func (s *Service) toTaskResponse(task *model.DeviceImportTask) *dto.DeviceImportTaskResponse {
|
||
var startedAt, completedAt *time.Time
|
||
if task.StartedAt != nil {
|
||
startedAt = task.StartedAt
|
||
}
|
||
if task.CompletedAt != nil {
|
||
completedAt = task.CompletedAt
|
||
}
|
||
|
||
return &dto.DeviceImportTaskResponse{
|
||
ID: task.ID,
|
||
TaskNo: task.TaskNo,
|
||
Status: task.Status,
|
||
StatusText: getStatusText(task.Status),
|
||
BatchNo: task.BatchNo,
|
||
FileName: task.FileName,
|
||
TotalCount: task.TotalCount,
|
||
SuccessCount: task.SuccessCount,
|
||
SkipCount: task.SkipCount,
|
||
FailCount: task.FailCount,
|
||
WarningCount: task.WarningCount,
|
||
StartedAt: startedAt,
|
||
CompletedAt: completedAt,
|
||
ErrorMessage: task.ErrorMessage,
|
||
CreatedAt: task.CreatedAt,
|
||
}
|
||
}
|
||
|
||
func getStatusText(status int) string {
|
||
switch status {
|
||
case model.ImportTaskStatusPending:
|
||
return "待处理"
|
||
case model.ImportTaskStatusProcessing:
|
||
return "处理中"
|
||
case model.ImportTaskStatusCompleted:
|
||
return "已完成"
|
||
case model.ImportTaskStatusFailed:
|
||
return "失败"
|
||
default:
|
||
return "未知"
|
||
}
|
||
}
|