package iot_card_import import ( "context" "fmt" "path/filepath" "time" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/model/dto" "github.com/break/junhong_cmp_fiber/internal/store" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/middleware" "github.com/break/junhong_cmp_fiber/pkg/queue" "gorm.io/gorm" ) type Service struct { db *gorm.DB importTaskStore *postgres.IotCardImportTaskStore carrierStore carrierGetter queueClient *queue.Client } type carrierGetter interface { GetByID(ctx context.Context, id uint) (*model.Carrier, error) } type CarrierStore struct { db *gorm.DB } func NewCarrierStore(db *gorm.DB) *CarrierStore { return &CarrierStore{db: db} } func (s *CarrierStore) GetByID(ctx context.Context, id uint) (*model.Carrier, error) { var carrier model.Carrier if err := s.db.WithContext(ctx).First(&carrier, id).Error; err != nil { return nil, err } return &carrier, nil } func New(db *gorm.DB, importTaskStore *postgres.IotCardImportTaskStore, queueClient *queue.Client) *Service { return &Service{ db: db, importTaskStore: importTaskStore, carrierStore: NewCarrierStore(db), queueClient: queueClient, } } type IotCardImportPayload struct { TaskID uint `json:"task_id"` } func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRequest) (*dto.ImportIotCardResponse, error) { userID := middleware.GetUserIDFromContext(ctx) if userID == 0 { return nil, errors.New(errors.CodeUnauthorized, "未授权访问") } carrier, err := s.carrierStore.GetByID(ctx, req.CarrierID) if err != nil { return nil, errors.New(errors.CodeInvalidParam, "运营商不存在") } taskNo := s.importTaskStore.GenerateTaskNo(ctx) fileName := filepath.Base(req.FileKey) task := &model.IotCardImportTask{ TaskNo: taskNo, Status: model.ImportTaskStatusPending, CarrierID: req.CarrierID, CarrierType: carrier.CarrierType, CarrierName: carrier.CarrierName, BatchNo: req.BatchNo, FileName: fileName, StorageKey: req.FileKey, } task.Creator = userID task.Updater = userID if err := s.importTaskStore.Create(ctx, task); err != nil { return nil, fmt.Errorf("创建导入任务失败: %w", err) } payload := IotCardImportPayload{TaskID: task.ID} err = s.queueClient.EnqueueTask(ctx, constants.TaskTypeIotCardImport, payload) if err != nil { s.importTaskStore.UpdateStatus(ctx, task.ID, model.ImportTaskStatusFailed, "任务入队失败: "+err.Error()) return nil, fmt.Errorf("任务入队失败: %w", err) } return &dto.ImportIotCardResponse{ TaskID: task.ID, TaskNo: taskNo, Message: "导入任务已创建,Worker 将异步处理文件", }, nil } func (s *Service) List(ctx context.Context, req *dto.ListImportTaskRequest) (*dto.ListImportTaskResponse, error) { page := req.Page pageSize := req.PageSize if page == 0 { page = 1 } if pageSize == 0 { pageSize = constants.DefaultPageSize } opts := &store.QueryOptions{ Page: page, PageSize: pageSize, } filters := make(map[string]interface{}) if req.Status != nil { filters["status"] = *req.Status } if req.CarrierID != nil { filters["carrier_id"] = *req.CarrierID } if req.BatchNo != "" { filters["batch_no"] = req.BatchNo } if req.StartTime != nil { filters["start_time"] = *req.StartTime } if req.EndTime != nil { filters["end_time"] = *req.EndTime } tasks, total, err := s.importTaskStore.List(ctx, opts, filters) if err != nil { return nil, err } list := make([]*dto.ImportTaskResponse, 0, len(tasks)) for _, task := range tasks { list = append(list, s.toTaskResponse(task)) } totalPages := int(total) / pageSize if int(total)%pageSize > 0 { totalPages++ } return &dto.ListImportTaskResponse{ List: list, Total: total, Page: page, PageSize: pageSize, TotalPages: totalPages, }, nil } func (s *Service) GetByID(ctx context.Context, id uint) (*dto.ImportTaskDetailResponse, error) { task, err := s.importTaskStore.GetByID(ctx, id) if err != nil { return nil, errors.New(errors.CodeNotFound, "导入任务不存在") } resp := &dto.ImportTaskDetailResponse{ ImportTaskResponse: *s.toTaskResponse(task), SkippedItems: make([]*dto.ImportResultItemDTO, 0), FailedItems: make([]*dto.ImportResultItemDTO, 0), } for _, item := range task.SkippedItems { resp.SkippedItems = append(resp.SkippedItems, &dto.ImportResultItemDTO{ Line: item.Line, ICCID: item.ICCID, MSISDN: item.MSISDN, Reason: item.Reason, }) } for _, item := range task.FailedItems { resp.FailedItems = append(resp.FailedItems, &dto.ImportResultItemDTO{ Line: item.Line, ICCID: item.ICCID, MSISDN: item.MSISDN, Reason: item.Reason, }) } return resp, nil } func (s *Service) toTaskResponse(task *model.IotCardImportTask) *dto.ImportTaskResponse { var startedAt, completedAt *time.Time if task.StartedAt != nil { startedAt = task.StartedAt } if task.CompletedAt != nil { completedAt = task.CompletedAt } return &dto.ImportTaskResponse{ ID: task.ID, TaskNo: task.TaskNo, Status: task.Status, StatusText: getStatusText(task.Status), CarrierID: task.CarrierID, CarrierType: task.CarrierType, CarrierName: task.CarrierName, BatchNo: task.BatchNo, FileName: task.FileName, TotalCount: task.TotalCount, SuccessCount: task.SuccessCount, SkipCount: task.SkipCount, FailCount: task.FailCount, StartedAt: startedAt, CompletedAt: completedAt, ErrorMessage: task.ErrorMessage, CreatedAt: task.CreatedAt, } } func getStatusText(status int) string { switch status { case model.ImportTaskStatusPending: return "待处理" case model.ImportTaskStatusProcessing: return "处理中" case model.ImportTaskStatusCompleted: return "已完成" case model.ImportTaskStatusFailed: return "失败" default: return "未知" } }