All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m45s
主要变更: 1. OpenAPI 文档契约对齐 - 统一错误响应字段名为 msg(非 message) - 规范 envelope 响应结构(code, msg, data, timestamp) - 个人客户路由纳入文档体系(使用 Register 机制) - 新增 BuildDocHandlers() 统一管理 handler 构造 - 确保文档生成的幂等性 2. Service 层错误处理统一 - 全面替换 fmt.Errorf 为 errors.New/Wrap - 统一错误码使用规范 - Handler 层参数校验不泄露底层细节 - 新增错误码验证集成测试 3. 代码质量提升 - 删除未使用的 Task handler 和路由 - 新增代码规范检查脚本(check-service-errors.sh) - 新增注释路径一致性检查(check-comment-paths.sh) - 更新 API 文档生成指南 4. OpenSpec 归档 - 归档 openapi-contract-alignment 变更(63 tasks) - 归档 service-error-unify-core 变更 - 归档 service-error-unify-support 变更 - 归档 code-cleanup-docs-update 变更 - 归档 handler-validation-security 变更 - 同步 delta specs 到主规范文件 影响范围: - pkg/openapi: 新增 handlers.go,优化 generator.go - internal/service/*: 48 个 service 文件错误处理统一 - internal/handler/admin: 优化参数校验错误提示 - internal/routes: 个人客户路由改造,删除 task 路由 - scripts: 新增 3 个代码检查脚本 - docs: 更新 OpenAPI 文档(15750+ 行) - openspec/specs: 同步 3 个主规范文件 破坏性变更:无 向后兼容:是
237 lines
6.1 KiB
Go
237 lines
6.1 KiB
Go
package iot_card_import
|
||
|
||
import (
|
||
"context"
|
||
"path/filepath"
|
||
"time"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||
"github.com/break/junhong_cmp_fiber/internal/model/dto"
|
||
"github.com/break/junhong_cmp_fiber/internal/store"
|
||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
"github.com/break/junhong_cmp_fiber/pkg/errors"
|
||
"github.com/break/junhong_cmp_fiber/pkg/middleware"
|
||
"github.com/break/junhong_cmp_fiber/pkg/queue"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type Service struct {
|
||
db *gorm.DB
|
||
importTaskStore *postgres.IotCardImportTaskStore
|
||
carrierStore carrierGetter
|
||
queueClient *queue.Client
|
||
}
|
||
|
||
type carrierGetter interface {
|
||
GetByID(ctx context.Context, id uint) (*model.Carrier, error)
|
||
}
|
||
|
||
type CarrierStore struct {
|
||
db *gorm.DB
|
||
}
|
||
|
||
func NewCarrierStore(db *gorm.DB) *CarrierStore {
|
||
return &CarrierStore{db: db}
|
||
}
|
||
|
||
func (s *CarrierStore) GetByID(ctx context.Context, id uint) (*model.Carrier, error) {
|
||
var carrier model.Carrier
|
||
if err := s.db.WithContext(ctx).First(&carrier, id).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
return &carrier, nil
|
||
}
|
||
|
||
func New(db *gorm.DB, importTaskStore *postgres.IotCardImportTaskStore, queueClient *queue.Client) *Service {
|
||
return &Service{
|
||
db: db,
|
||
importTaskStore: importTaskStore,
|
||
carrierStore: NewCarrierStore(db),
|
||
queueClient: queueClient,
|
||
}
|
||
}
|
||
|
||
type IotCardImportPayload struct {
|
||
TaskID uint `json:"task_id"`
|
||
}
|
||
|
||
func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRequest) (*dto.ImportIotCardResponse, error) {
|
||
userID := middleware.GetUserIDFromContext(ctx)
|
||
if userID == 0 {
|
||
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
|
||
}
|
||
|
||
carrier, err := s.carrierStore.GetByID(ctx, req.CarrierID)
|
||
if err != nil {
|
||
return nil, errors.New(errors.CodeInvalidParam, "运营商不存在")
|
||
}
|
||
|
||
taskNo := s.importTaskStore.GenerateTaskNo(ctx)
|
||
fileName := filepath.Base(req.FileKey)
|
||
|
||
task := &model.IotCardImportTask{
|
||
TaskNo: taskNo,
|
||
Status: model.ImportTaskStatusPending,
|
||
CarrierID: req.CarrierID,
|
||
CarrierType: carrier.CarrierType,
|
||
CarrierName: carrier.CarrierName,
|
||
BatchNo: req.BatchNo,
|
||
FileName: fileName,
|
||
StorageKey: req.FileKey,
|
||
}
|
||
task.Creator = userID
|
||
task.Updater = userID
|
||
|
||
if err := s.importTaskStore.Create(ctx, task); err != nil {
|
||
return nil, errors.Wrap(errors.CodeInternalError, err, "创建导入任务失败")
|
||
}
|
||
|
||
payload := IotCardImportPayload{TaskID: task.ID}
|
||
err = s.queueClient.EnqueueTask(ctx, constants.TaskTypeIotCardImport, payload)
|
||
if err != nil {
|
||
s.importTaskStore.UpdateStatus(ctx, task.ID, model.ImportTaskStatusFailed, "任务入队失败: "+err.Error())
|
||
return nil, errors.Wrap(errors.CodeInternalError, err, "任务入队失败")
|
||
}
|
||
|
||
return &dto.ImportIotCardResponse{
|
||
TaskID: task.ID,
|
||
TaskNo: taskNo,
|
||
Message: "导入任务已创建,Worker 将异步处理文件",
|
||
}, nil
|
||
}
|
||
|
||
func (s *Service) List(ctx context.Context, req *dto.ListImportTaskRequest) (*dto.ListImportTaskResponse, error) {
|
||
page := req.Page
|
||
pageSize := req.PageSize
|
||
if page == 0 {
|
||
page = 1
|
||
}
|
||
if pageSize == 0 {
|
||
pageSize = constants.DefaultPageSize
|
||
}
|
||
|
||
opts := &store.QueryOptions{
|
||
Page: page,
|
||
PageSize: pageSize,
|
||
}
|
||
|
||
filters := make(map[string]interface{})
|
||
if req.Status != nil {
|
||
filters["status"] = *req.Status
|
||
}
|
||
if req.CarrierID != nil {
|
||
filters["carrier_id"] = *req.CarrierID
|
||
}
|
||
if req.BatchNo != "" {
|
||
filters["batch_no"] = req.BatchNo
|
||
}
|
||
if req.StartTime != nil {
|
||
filters["start_time"] = *req.StartTime
|
||
}
|
||
if req.EndTime != nil {
|
||
filters["end_time"] = *req.EndTime
|
||
}
|
||
|
||
tasks, total, err := s.importTaskStore.List(ctx, opts, filters)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
list := make([]*dto.ImportTaskResponse, 0, len(tasks))
|
||
for _, task := range tasks {
|
||
list = append(list, s.toTaskResponse(task))
|
||
}
|
||
|
||
totalPages := int(total) / pageSize
|
||
if int(total)%pageSize > 0 {
|
||
totalPages++
|
||
}
|
||
|
||
return &dto.ListImportTaskResponse{
|
||
List: list,
|
||
Total: total,
|
||
Page: page,
|
||
PageSize: pageSize,
|
||
TotalPages: totalPages,
|
||
}, nil
|
||
}
|
||
|
||
func (s *Service) GetByID(ctx context.Context, id uint) (*dto.ImportTaskDetailResponse, error) {
|
||
task, err := s.importTaskStore.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, errors.New(errors.CodeNotFound, "导入任务不存在")
|
||
}
|
||
|
||
resp := &dto.ImportTaskDetailResponse{
|
||
ImportTaskResponse: *s.toTaskResponse(task),
|
||
SkippedItems: make([]*dto.ImportResultItemDTO, 0),
|
||
FailedItems: make([]*dto.ImportResultItemDTO, 0),
|
||
}
|
||
|
||
for _, item := range task.SkippedItems {
|
||
resp.SkippedItems = append(resp.SkippedItems, &dto.ImportResultItemDTO{
|
||
Line: item.Line,
|
||
ICCID: item.ICCID,
|
||
MSISDN: item.MSISDN,
|
||
Reason: item.Reason,
|
||
})
|
||
}
|
||
|
||
for _, item := range task.FailedItems {
|
||
resp.FailedItems = append(resp.FailedItems, &dto.ImportResultItemDTO{
|
||
Line: item.Line,
|
||
ICCID: item.ICCID,
|
||
MSISDN: item.MSISDN,
|
||
Reason: item.Reason,
|
||
})
|
||
}
|
||
|
||
return resp, nil
|
||
}
|
||
|
||
func (s *Service) toTaskResponse(task *model.IotCardImportTask) *dto.ImportTaskResponse {
|
||
var startedAt, completedAt *time.Time
|
||
if task.StartedAt != nil {
|
||
startedAt = task.StartedAt
|
||
}
|
||
if task.CompletedAt != nil {
|
||
completedAt = task.CompletedAt
|
||
}
|
||
|
||
return &dto.ImportTaskResponse{
|
||
ID: task.ID,
|
||
TaskNo: task.TaskNo,
|
||
Status: task.Status,
|
||
StatusText: getStatusText(task.Status),
|
||
CarrierID: task.CarrierID,
|
||
CarrierType: task.CarrierType,
|
||
CarrierName: task.CarrierName,
|
||
BatchNo: task.BatchNo,
|
||
FileName: task.FileName,
|
||
TotalCount: task.TotalCount,
|
||
SuccessCount: task.SuccessCount,
|
||
SkipCount: task.SkipCount,
|
||
FailCount: task.FailCount,
|
||
StartedAt: startedAt,
|
||
CompletedAt: completedAt,
|
||
ErrorMessage: task.ErrorMessage,
|
||
CreatedAt: task.CreatedAt,
|
||
}
|
||
}
|
||
|
||
func getStatusText(status int) string {
|
||
switch status {
|
||
case model.ImportTaskStatusPending:
|
||
return "待处理"
|
||
case model.ImportTaskStatusProcessing:
|
||
return "处理中"
|
||
case model.ImportTaskStatusCompleted:
|
||
return "已完成"
|
||
case model.ImportTaskStatusFailed:
|
||
return "失败"
|
||
default:
|
||
return "未知"
|
||
}
|
||
}
|