package device_import import ( "context" "path/filepath" "time" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/model/dto" "github.com/break/junhong_cmp_fiber/internal/store" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/middleware" "github.com/break/junhong_cmp_fiber/pkg/queue" "gorm.io/gorm" ) type Service struct { db *gorm.DB importTaskStore *postgres.DeviceImportTaskStore queueClient *queue.Client } type DeviceImportPayload struct { TaskID uint `json:"task_id"` } func New(db *gorm.DB, importTaskStore *postgres.DeviceImportTaskStore, queueClient *queue.Client) *Service { return &Service{ db: db, importTaskStore: importTaskStore, queueClient: queueClient, } } func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportDeviceRequest) (*dto.ImportDeviceResponse, error) { userID := middleware.GetUserIDFromContext(ctx) if userID == 0 { return nil, errors.New(errors.CodeUnauthorized, "未授权访问") } taskNo := s.importTaskStore.GenerateTaskNo(ctx) fileName := filepath.Base(req.FileKey) task := &model.DeviceImportTask{ TaskNo: taskNo, Status: model.ImportTaskStatusPending, BatchNo: req.BatchNo, FileName: fileName, StorageKey: req.FileKey, } task.Creator = userID task.Updater = userID if err := s.importTaskStore.Create(ctx, task); err != nil { return nil, errors.Wrap(errors.CodeInternalError, err, "创建导入任务失败") } payload := DeviceImportPayload{TaskID: task.ID} err := s.queueClient.EnqueueTask(ctx, constants.TaskTypeDeviceImport, payload) if err != nil { s.importTaskStore.UpdateStatus(ctx, task.ID, model.ImportTaskStatusFailed, "任务入队失败: "+err.Error()) return nil, errors.Wrap(errors.CodeInternalError, err, "任务入队失败") } return &dto.ImportDeviceResponse{ TaskID: task.ID, TaskNo: taskNo, Message: "导入任务已创建,Worker 将异步处理文件", }, nil } func (s *Service) List(ctx context.Context, req *dto.ListDeviceImportTaskRequest) (*dto.ListDeviceImportTaskResponse, error) { page := req.Page pageSize := req.PageSize if page == 0 { page = 1 } if pageSize == 0 { pageSize = constants.DefaultPageSize } opts := &store.QueryOptions{ Page: page, PageSize: pageSize, } filters := make(map[string]interface{}) if req.Status != nil { filters["status"] = *req.Status } if req.BatchNo != "" { filters["batch_no"] = req.BatchNo } if req.StartTime != nil { filters["start_time"] = *req.StartTime } if req.EndTime != nil { filters["end_time"] = *req.EndTime } tasks, total, err := s.importTaskStore.List(ctx, opts, filters) if err != nil { return nil, err } list := make([]*dto.DeviceImportTaskResponse, 0, len(tasks)) for _, task := range tasks { list = append(list, s.toTaskResponse(task)) } totalPages := int(total) / pageSize if int(total)%pageSize > 0 { totalPages++ } return &dto.ListDeviceImportTaskResponse{ List: list, Total: total, Page: page, PageSize: pageSize, TotalPages: totalPages, }, nil } func (s *Service) GetByID(ctx context.Context, id uint) (*dto.DeviceImportTaskDetailResponse, error) { task, err := s.importTaskStore.GetByID(ctx, id) if err != nil { return nil, errors.New(errors.CodeNotFound, "导入任务不存在") } resp := &dto.DeviceImportTaskDetailResponse{ DeviceImportTaskResponse: *s.toTaskResponse(task), SkippedItems: make([]*dto.DeviceImportResultItemDTO, 0), FailedItems: make([]*dto.DeviceImportResultItemDTO, 0), WarningItems: make([]*dto.DeviceImportResultItemDTO, 0), } for _, item := range task.SkippedItems { resp.SkippedItems = append(resp.SkippedItems, &dto.DeviceImportResultItemDTO{ Line: item.Line, DeviceNo: item.ICCID, Reason: item.Reason, }) } for _, item := range task.FailedItems { resp.FailedItems = append(resp.FailedItems, &dto.DeviceImportResultItemDTO{ Line: item.Line, DeviceNo: item.ICCID, Reason: item.Reason, }) } for _, item := range task.WarningItems { resp.WarningItems = append(resp.WarningItems, &dto.DeviceImportResultItemDTO{ Line: item.Line, DeviceNo: item.ICCID, Reason: item.Reason, }) } return resp, nil } func (s *Service) toTaskResponse(task *model.DeviceImportTask) *dto.DeviceImportTaskResponse { var startedAt, completedAt *time.Time if task.StartedAt != nil { startedAt = task.StartedAt } if task.CompletedAt != nil { completedAt = task.CompletedAt } return &dto.DeviceImportTaskResponse{ ID: task.ID, TaskNo: task.TaskNo, Status: task.Status, StatusText: getStatusText(task.Status), BatchNo: task.BatchNo, FileName: task.FileName, TotalCount: task.TotalCount, SuccessCount: task.SuccessCount, SkipCount: task.SkipCount, FailCount: task.FailCount, WarningCount: task.WarningCount, StartedAt: startedAt, CompletedAt: completedAt, ErrorMessage: task.ErrorMessage, CreatedAt: task.CreatedAt, } } func getStatusText(status int) string { switch status { case model.ImportTaskStatusPending: return "待处理" case model.ImportTaskStatusProcessing: return "处理中" case model.ImportTaskStatusCompleted: return "已完成" case model.ImportTaskStatusFailed: return "失败" default: return "未知" } }