feat: 添加环境变量管理工具和部署配置改版
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m33s

主要改动:
- 新增交互式环境配置脚本 (scripts/setup-env.sh)
- 新增本地启动快捷脚本 (scripts/run-local.sh)
- 新增环境变量模板文件 (.env.example)
- 部署模式改版:使用嵌入式配置 + 环境变量覆盖
- 添加对象存储功能支持
- 改进 IoT 卡片导入任务
- 优化 OpenAPI 文档生成
- 删除旧的配置文件,改用嵌入式默认配置
This commit is contained in:
2026-01-26 10:28:29 +08:00
parent 194078674a
commit 45aa7deb87
94 changed files with 6532 additions and 1967 deletions

View File

@@ -4,6 +4,7 @@ import (
"github.com/break/junhong_cmp_fiber/internal/service/verification"
"github.com/break/junhong_cmp_fiber/pkg/auth"
"github.com/break/junhong_cmp_fiber/pkg/queue"
"github.com/break/junhong_cmp_fiber/pkg/storage"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
@@ -19,4 +20,5 @@ type Dependencies struct {
TokenManager *auth.TokenManager // Token 管理器后台和H5认证
VerificationService *verification.Service // 验证码服务
QueueClient *queue.Client // Asynq 任务队列客户端
StorageService *storage.Service // 对象存储服务(可选,配置缺失时为 nil
}

View File

@@ -29,5 +29,6 @@ func initHandlers(svc *services, deps *Dependencies) *Handlers {
IotCard: admin.NewIotCardHandler(svc.IotCard),
IotCardImport: admin.NewIotCardImportHandler(svc.IotCardImport),
AssetAllocationRecord: admin.NewAssetAllocationRecordHandler(svc.AssetAllocationRecord),
Storage: admin.NewStorageHandler(deps.StorageService),
}
}

View File

@@ -27,6 +27,7 @@ type Handlers struct {
IotCard *admin.IotCardHandler
IotCardImport *admin.IotCardImportHandler
AssetAllocationRecord *admin.AssetAllocationRecordHandler
Storage *admin.StorageHandler
}
// Middlewares 封装所有中间件

View File

@@ -16,7 +16,9 @@ type IotCardImportHandler struct {
}
func NewIotCardImportHandler(service *iotCardImportService.Service) *IotCardImportHandler {
return &IotCardImportHandler{service: service}
return &IotCardImportHandler{
service: service,
}
}
func (h *IotCardImportHandler) Import(c *fiber.Ctx) error {
@@ -25,18 +27,11 @@ func (h *IotCardImportHandler) Import(c *fiber.Ctx) error {
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
}
file, err := c.FormFile("file")
if err != nil {
return errors.New(errors.CodeInvalidParam, "请上传 CSV 文件")
if req.FileKey == "" {
return errors.New(errors.CodeInvalidParam, "文件路径不能为空")
}
f, err := file.Open()
if err != nil {
return errors.New(errors.CodeInvalidParam, "无法读取上传文件")
}
defer f.Close()
result, err := h.service.CreateImportTask(c.UserContext(), &req, f, file.Filename)
result, err := h.service.CreateImportTask(c.UserContext(), &req)
if err != nil {
return err
}

View File

@@ -0,0 +1,40 @@
package admin
import (
"github.com/gofiber/fiber/v2"
"github.com/break/junhong_cmp_fiber/internal/model/dto"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/response"
"github.com/break/junhong_cmp_fiber/pkg/storage"
)
type StorageHandler struct {
service *storage.Service
}
func NewStorageHandler(service *storage.Service) *StorageHandler {
return &StorageHandler{service: service}
}
func (h *StorageHandler) GetUploadURL(c *fiber.Ctx) error {
if h.service == nil {
return errors.New(errors.CodeInternalError, "对象存储服务未配置")
}
var req dto.GetUploadURLRequest
if err := c.BodyParser(&req); err != nil {
return errors.New(errors.CodeInvalidParam, "请求参数解析失败")
}
result, err := h.service.GetUploadURL(c.UserContext(), req.Purpose, req.FileName, req.ContentType)
if err != nil {
return errors.New(errors.CodeInternalError, err.Error())
}
return response.Success(c, dto.GetUploadURLResponse{
UploadURL: result.URL,
FileKey: result.FileKey,
ExpiresIn: result.ExpiresIn,
})
}

View File

@@ -52,8 +52,9 @@ type ListStandaloneIotCardResponse struct {
}
type ImportIotCardRequest struct {
CarrierID uint `json:"carrier_id" form:"carrier_id" validate:"required,min=1" required:"true" minimum:"1" description:"运营商ID"`
BatchNo string `json:"batch_no" form:"batch_no" validate:"omitempty,max=100" maxLength:"100" description:"批次号"`
CarrierID uint `json:"carrier_id" validate:"required,min=1" required:"true" minimum:"1" description:"运营商ID"`
BatchNo string `json:"batch_no" validate:"omitempty,max=100" maxLength:"100" description:"批次号"`
FileKey string `json:"file_key" validate:"required,min=1,max=500" required:"true" minLength:"1" maxLength:"500" description:"对象存储文件路径(通过 /storage/upload-url 获取)"`
}
type ImportIotCardResponse struct {
@@ -102,6 +103,7 @@ type ListImportTaskResponse struct {
type ImportResultItemDTO struct {
Line int `json:"line" description:"行号"`
ICCID string `json:"iccid" description:"ICCID"`
MSISDN string `json:"msisdn,omitempty" description:"接入号"`
Reason string `json:"reason" description:"原因"`
}

View File

@@ -0,0 +1,13 @@
package dto
type GetUploadURLRequest struct {
FileName string `json:"file_name" validate:"required,min=1,max=255" required:"true" minLength:"1" maxLength:"255" description:"文件名cards.csv"`
ContentType string `json:"content_type" validate:"omitempty,max=100" maxLength:"100" description:"文件 MIME 类型text/csv留空则自动推断"`
Purpose string `json:"purpose" validate:"required,oneof=iot_import export attachment" required:"true" description:"文件用途 (iot_import:ICCID导入, export:数据导出, attachment:附件)"`
}
type GetUploadURLResponse struct {
UploadURL string `json:"upload_url" description:"预签名上传 URL使用 PUT 方法上传文件"`
FileKey string `json:"file_key" description:"文件路径标识,上传成功后用于调用业务接口"`
ExpiresIn int `json:"expires_in" description:"URL 有效期(秒)"`
}

View File

@@ -10,38 +10,46 @@ import (
type IotCardImportTask struct {
gorm.Model
BaseModel `gorm:"embedded"`
TaskNo string `gorm:"column:task_no;type:varchar(50);uniqueIndex:idx_import_task_no,where:deleted_at IS NULL;not null;comment:任务编号(IMP-YYYYMMDD-XXXXXX)" json:"task_no"`
Status int `gorm:"column:status;type:int;default:1;not null;comment:任务状态 1-待处理 2-处理中 3-已完成 4-失败" json:"status"`
CarrierID uint `gorm:"column:carrier_id;index;not null;comment:运营商ID" json:"carrier_id"`
CarrierType string `gorm:"column:carrier_type;type:varchar(20);not null;comment:运营商类型(CMCC/CUCC/CTCC/CBN)" json:"carrier_type"`
BatchNo string `gorm:"column:batch_no;type:varchar(100);comment:批次号" json:"batch_no"`
FileName string `gorm:"column:file_name;type:varchar(255);comment:原始文件名" json:"file_name"`
TotalCount int `gorm:"column:total_count;type:int;default:0;not null;comment:总数" json:"total_count"`
SuccessCount int `gorm:"column:success_count;type:int;default:0;not null;comment:成功数" json:"success_count"`
SkipCount int `gorm:"column:skip_count;type:int;default:0;not null;comment:跳过数" json:"skip_count"`
FailCount int `gorm:"column:fail_count;type:int;default:0;not null;comment:失败数" json:"fail_count"`
SkippedItems ImportResultItems `gorm:"column:skipped_items;type:jsonb;comment:跳过记录详情" json:"skipped_items"`
FailedItems ImportResultItems `gorm:"column:failed_items;type:jsonb;comment:失败记录详情" json:"failed_items"`
StartedAt *time.Time `gorm:"column:started_at;comment:开始处理时间" json:"started_at"`
CompletedAt *time.Time `gorm:"column:completed_at;comment:完成时间" json:"completed_at"`
ErrorMessage string `gorm:"column:error_message;type:text;comment:任务级错误信息" json:"error_message"`
ShopID *uint `gorm:"column:shop_id;index;comment:店铺ID(发起导入的店铺)" json:"shop_id,omitempty"`
ICCIDList ICCIDListJSON `gorm:"column:iccid_list;type:jsonb;comment:待导入ICCID列表" json:"-"`
BaseModel `gorm:"embedded"`
TaskNo string `gorm:"column:task_no;type:varchar(50);uniqueIndex:idx_import_task_no,where:deleted_at IS NULL;not null;comment:任务编号(IMP-YYYYMMDD-XXXXXX)" json:"task_no"`
Status int `gorm:"column:status;type:int;default:1;not null;comment:任务状态 1-待处理 2-处理中 3-已完成 4-失败" json:"status"`
CarrierID uint `gorm:"column:carrier_id;index;not null;comment:运营商ID" json:"carrier_id"`
CarrierType string `gorm:"column:carrier_type;type:varchar(20);not null;comment:运营商类型(CMCC/CUCC/CTCC/CBN)" json:"carrier_type"`
BatchNo string `gorm:"column:batch_no;type:varchar(100);comment:批次号" json:"batch_no"`
FileName string `gorm:"column:file_name;type:varchar(255);comment:原始文件名" json:"file_name"`
TotalCount int `gorm:"column:total_count;type:int;default:0;not null;comment:总数" json:"total_count"`
SuccessCount int `gorm:"column:success_count;type:int;default:0;not null;comment:成功数" json:"success_count"`
SkipCount int `gorm:"column:skip_count;type:int;default:0;not null;comment:跳过数" json:"skip_count"`
FailCount int `gorm:"column:fail_count;type:int;default:0;not null;comment:失败数" json:"fail_count"`
SkippedItems ImportResultItems `gorm:"column:skipped_items;type:jsonb;comment:跳过记录详情" json:"skipped_items"`
FailedItems ImportResultItems `gorm:"column:failed_items;type:jsonb;comment:失败记录详情" json:"failed_items"`
StartedAt *time.Time `gorm:"column:started_at;comment:开始处理时间" json:"started_at"`
CompletedAt *time.Time `gorm:"column:completed_at;comment:完成时间" json:"completed_at"`
ErrorMessage string `gorm:"column:error_message;type:text;comment:任务级错误信息" json:"error_message"`
ShopID *uint `gorm:"column:shop_id;index;comment:店铺ID(发起导入的店铺)" json:"shop_id,omitempty"`
CardList CardListJSON `gorm:"column:card_list;type:jsonb;comment:待导入卡列表[{iccid,msisdn}]" json:"-"`
StorageBucket string `gorm:"column:storage_bucket;type:varchar(100);comment:对象存储桶名" json:"storage_bucket,omitempty"`
StorageKey string `gorm:"column:storage_key;type:varchar(500);comment:对象存储文件路径" json:"storage_key,omitempty"`
}
type ICCIDListJSON []string
// CardItem 卡信息ICCID + MSISDN
type CardItem struct {
ICCID string `json:"iccid"`
MSISDN string `json:"msisdn"`
}
func (list ICCIDListJSON) Value() (driver.Value, error) {
type CardListJSON []CardItem
func (list CardListJSON) Value() (driver.Value, error) {
if list == nil {
return "[]", nil
}
return json.Marshal(list)
}
func (list *ICCIDListJSON) Scan(value any) error {
func (list *CardListJSON) Scan(value any) error {
if value == nil {
*list = ICCIDListJSON{}
*list = CardListJSON{}
return nil
}
bytes, ok := value.([]byte)
@@ -58,6 +66,7 @@ func (IotCardImportTask) TableName() string {
type ImportResultItem struct {
Line int `json:"line"`
ICCID string `json:"iccid"`
MSISDN string `json:"msisdn,omitempty"`
Reason string `json:"reason"`
}

View File

@@ -58,6 +58,9 @@ func RegisterAdminRoutes(router fiber.Router, handlers *bootstrap.Handlers, midd
if handlers.AssetAllocationRecord != nil {
registerAssetAllocationRecordRoutes(authGroup, handlers.AssetAllocationRecord, doc, basePath)
}
if handlers.Storage != nil {
registerStorageRoutes(authGroup, handlers.Storage, doc, basePath)
}
}
func registerAdminAuthRoutes(router fiber.Router, handler interface{}, authMiddleware fiber.Handler, doc *openapi.Generator, basePath string) {

View File

@@ -21,11 +21,36 @@ func registerIotCardRoutes(router fiber.Router, handler *admin.IotCardHandler, i
})
Register(iotCards, doc, groupPath, "POST", "/import", importHandler.Import, RouteSpec{
Summary: "批量导入ICCID",
Tags: []string{"IoT卡管理"},
Input: new(dto.ImportIotCardRequest),
Output: new(dto.ImportIotCardResponse),
Auth: true,
Summary: "批量导入IoT卡ICCID+MSISDN",
Description: `## ⚠️ 接口变更说明BREAKING CHANGE
本接口已从 ` + "`multipart/form-data`" + ` 改为 ` + "`application/json`" + `
### 完整导入流程
1. **获取上传 URL**: 调用 ` + "`POST /api/admin/storage/upload-url`" + `
2. **上传 CSV 文件**: 使用预签名 URL 上传文件到对象存储
3. **调用本接口**: 使用返回的 ` + "`file_key`" + ` 提交导入任务
### 请求示例
` + "```" + `json
{
"carrier_id": 1,
"batch_no": "BATCH-2025-01",
"file_key": "imports/2025/01/24/abc123.csv"
}
` + "```" + `
### CSV 文件格式
- 必须包含两列:` + "`iccid`" + `, ` + "`msisdn`" + `
- 首行为表头
- 编码UTF-8`,
Tags: []string{"IoT卡管理"},
Input: new(dto.ImportIotCardRequest),
Output: new(dto.ImportIotCardResponse),
Auth: true,
})
Register(iotCards, doc, groupPath, "GET", "/import-tasks", importHandler.List, RouteSpec{

View File

@@ -8,13 +8,22 @@ import (
"github.com/break/junhong_cmp_fiber/pkg/openapi"
)
// FileUploadField 定义文件上传字段
type FileUploadField struct {
Name string // 字段名
Description string // 字段描述
Required bool // 是否必填
}
// RouteSpec 定义接口文档元数据
type RouteSpec struct {
Summary string
Input interface{} // 请求参数结构体 (Query/Path/Body)
Output interface{} // 响应参数结构体
Tags []string
Auth bool // 是否需要认证图标 (预留)
Summary string // 简短摘要(中文,一行)
Description string // 详细说明,支持 Markdown 语法(可选)
Input interface{} // 请求参数结构体 (Query/Path/Body)
Output interface{} // 响应参数结构体
Tags []string // 分类标签
Auth bool // 是否需要认证图标 (预留)
FileUploads []FileUploadField // 文件上传字段列表(设置此字段时请求类型为 multipart/form-data
}
// pathParamRegex 用于匹配 Fiber 的路径参数格式 /:param
@@ -33,6 +42,19 @@ func Register(router fiber.Router, doc *openapi.Generator, basePath, method, pat
if doc != nil {
fullPath := basePath + path
openapiPath := pathParamRegex.ReplaceAllString(fullPath, "/{$1}")
doc.AddOperation(method, openapiPath, spec.Summary, spec.Input, spec.Output, spec.Auth, spec.Tags...)
if len(spec.FileUploads) > 0 {
fileFields := make([]openapi.FileUploadField, len(spec.FileUploads))
for i, f := range spec.FileUploads {
fileFields[i] = openapi.FileUploadField{
Name: f.Name,
Description: f.Description,
Required: f.Required,
}
}
doc.AddMultipartOperation(method, openapiPath, spec.Summary, spec.Description, spec.Input, spec.Output, spec.Auth, fileFields, spec.Tags...)
} else {
doc.AddOperation(method, openapiPath, spec.Summary, spec.Description, spec.Input, spec.Output, spec.Auth, spec.Tags...)
}
}
}

View File

@@ -0,0 +1,71 @@
package routes
import (
"github.com/gofiber/fiber/v2"
"github.com/break/junhong_cmp_fiber/internal/handler/admin"
"github.com/break/junhong_cmp_fiber/internal/model/dto"
"github.com/break/junhong_cmp_fiber/pkg/openapi"
)
func registerStorageRoutes(router fiber.Router, handler *admin.StorageHandler, doc *openapi.Generator, basePath string) {
storage := router.Group("/storage")
groupPath := basePath + "/storage"
Register(storage, doc, groupPath, "POST", "/upload-url", handler.GetUploadURL, RouteSpec{
Summary: "获取文件上传预签名 URL",
Description: `## 文件上传流程
本接口用于获取对象存储的预签名上传 URL实现前端直传文件到对象存储。
### 完整流程
1. **调用本接口** 获取预签名 URL 和 file_key
2. **使用预签名 URL 上传文件** 发起 PUT 请求直接上传到对象存储
3. **调用业务接口** 使用 file_key 调用相关业务接口(如 ICCID 导入)
### 前端上传示例
` + "```" + `javascript
// 1. 获取预签名 URL
const { data } = await api.post('/storage/upload-url', {
file_name: 'cards.csv',
content_type: 'text/csv',
purpose: 'iot_import'
});
// 2. 上传文件到对象存储
await fetch(data.upload_url, {
method: 'PUT',
headers: { 'Content-Type': 'text/csv' },
body: file
});
// 3. 调用业务接口
await api.post('/iot-cards/import', {
carrier_id: 1,
batch_no: 'BATCH-2025-01',
file_key: data.file_key
});
` + "```" + `
### purpose 可选值
| 值 | 说明 | 生成路径格式 |
|---|------|-------------|
| iot_import | ICCID 导入 | imports/YYYY/MM/DD/uuid.csv |
| export | 数据导出 | exports/YYYY/MM/DD/uuid.xlsx |
| attachment | 附件上传 | attachments/YYYY/MM/DD/uuid.ext |
### 注意事项
- 预签名 URL 有效期 **15 分钟**,请及时使用
- 上传时 Content-Type 需与请求时一致
- file_key 在上传成功后永久有效,用于后续业务接口调用
- 上传失败时可重新调用本接口获取新的 URL`,
Tags: []string{"对象存储"},
Input: new(dto.GetUploadURLRequest),
Output: new(dto.GetUploadURLResponse),
Auth: true,
})
}

View File

@@ -3,7 +3,7 @@ package iot_card_import
import (
"context"
"fmt"
"io"
"path/filepath"
"time"
"github.com/break/junhong_cmp_fiber/internal/model"
@@ -14,7 +14,6 @@ import (
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/middleware"
"github.com/break/junhong_cmp_fiber/pkg/queue"
"github.com/break/junhong_cmp_fiber/pkg/utils"
"gorm.io/gorm"
)
@@ -58,7 +57,7 @@ type IotCardImportPayload struct {
TaskID uint `json:"task_id"`
}
func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRequest, csvReader io.Reader, fileName string) (*dto.ImportIotCardResponse, error) {
func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRequest) (*dto.ImportIotCardResponse, error) {
userID := middleware.GetUserIDFromContext(ctx)
if userID == 0 {
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
@@ -69,29 +68,17 @@ func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRe
return nil, errors.New(errors.CodeInvalidParam, "运营商不存在")
}
parseResult, err := utils.ParseICCIDFromCSV(csvReader)
if err != nil {
return nil, errors.New(errors.CodeInvalidParam, "CSV 解析失败: "+err.Error())
}
if parseResult.TotalCount == 0 {
return nil, errors.New(errors.CodeInvalidParam, "CSV 文件中没有有效的 ICCID")
}
taskNo := s.importTaskStore.GenerateTaskNo(ctx)
fileName := filepath.Base(req.FileKey)
task := &model.IotCardImportTask{
TaskNo: taskNo,
Status: model.ImportTaskStatusPending,
CarrierID: req.CarrierID,
CarrierType: carrier.CarrierType,
BatchNo: req.BatchNo,
FileName: fileName,
TotalCount: parseResult.TotalCount,
SuccessCount: 0,
SkipCount: 0,
FailCount: 0,
ICCIDList: model.ICCIDListJSON(parseResult.ICCIDs),
TaskNo: taskNo,
Status: model.ImportTaskStatusPending,
CarrierID: req.CarrierID,
CarrierType: carrier.CarrierType,
BatchNo: req.BatchNo,
FileName: fileName,
StorageKey: req.FileKey,
}
task.Creator = userID
task.Updater = userID
@@ -110,7 +97,7 @@ func (s *Service) CreateImportTask(ctx context.Context, req *dto.ImportIotCardRe
return &dto.ImportIotCardResponse{
TaskID: task.ID,
TaskNo: taskNo,
Message: fmt.Sprintf("导入任务已创建,共 %d 条 ICCID 待处理", parseResult.TotalCount),
Message: "导入任务已创建Worker 将异步处理文件",
}, nil
}
@@ -194,6 +181,7 @@ func (s *Service) GetByID(ctx context.Context, id uint) (*dto.ImportTaskDetailRe
resp.SkippedItems = append(resp.SkippedItems, &dto.ImportResultItemDTO{
Line: item.Line,
ICCID: item.ICCID,
MSISDN: item.MSISDN,
Reason: item.Reason,
})
}
@@ -202,6 +190,7 @@ func (s *Service) GetByID(ctx context.Context, id uint) (*dto.ImportTaskDetailRe
resp.FailedItems = append(resp.FailedItems, &dto.ImportResultItemDTO{
Line: item.Line,
ICCID: item.ICCID,
MSISDN: item.MSISDN,
Reason: item.Reason,
})
}

View File

@@ -125,6 +125,15 @@ func (s *IotCardImportTaskStore) List(ctx context.Context, opts *store.QueryOpti
return tasks, total, nil
}
func (s *IotCardImportTaskStore) UpdateCardList(ctx context.Context, id uint, cards model.CardListJSON, totalCount int) error {
updates := map[string]interface{}{
"card_list": cards,
"total_count": totalCount,
"updated_at": time.Now(),
}
return s.db.WithContext(ctx).Model(&model.IotCardImportTask{}).Where("id = ?", id).Updates(updates).Error
}
func (s *IotCardImportTaskStore) GenerateTaskNo(ctx context.Context) string {
now := time.Now()
dateStr := now.Format("20060102")

View File

@@ -2,6 +2,8 @@ package task
import (
"context"
"errors"
"os"
"time"
"github.com/bytedance/sonic"
@@ -14,9 +16,16 @@ import (
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm"
"github.com/break/junhong_cmp_fiber/pkg/storage"
"github.com/break/junhong_cmp_fiber/pkg/utils"
"github.com/break/junhong_cmp_fiber/pkg/validator"
)
var (
ErrStorageNotConfigured = errors.New("对象存储服务未配置")
ErrStorageKeyEmpty = errors.New("文件存储路径为空")
)
const batchSize = 1000
type IotCardImportPayload struct {
@@ -28,15 +37,24 @@ type IotCardImportHandler struct {
redis *redis.Client
importTaskStore *postgres.IotCardImportTaskStore
iotCardStore *postgres.IotCardStore
storageService *storage.Service
logger *zap.Logger
}
func NewIotCardImportHandler(db *gorm.DB, redis *redis.Client, importTaskStore *postgres.IotCardImportTaskStore, iotCardStore *postgres.IotCardStore, logger *zap.Logger) *IotCardImportHandler {
func NewIotCardImportHandler(
db *gorm.DB,
redis *redis.Client,
importTaskStore *postgres.IotCardImportTaskStore,
iotCardStore *postgres.IotCardStore,
storageSvc *storage.Service,
logger *zap.Logger,
) *IotCardImportHandler {
return &IotCardImportHandler{
db: db,
redis: redis,
importTaskStore: importTaskStore,
iotCardStore: iotCardStore,
storageService: storageSvc,
logger: logger,
}
}
@@ -75,9 +93,23 @@ func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *as
h.logger.Info("开始处理 IoT 卡导入任务",
zap.Uint("task_id", importTask.ID),
zap.String("task_no", importTask.TaskNo),
zap.Int("total_count", importTask.TotalCount),
zap.String("storage_key", importTask.StorageKey),
)
cards, totalCount, err := h.downloadAndParseCSV(ctx, importTask)
if err != nil {
h.logger.Error("下载或解析 CSV 失败",
zap.Uint("task_id", importTask.ID),
zap.Error(err),
)
h.importTaskStore.UpdateStatus(ctx, importTask.ID, model.ImportTaskStatusFailed, err.Error())
return asynq.SkipRetry
}
importTask.CardList = cards
importTask.TotalCount = totalCount
h.importTaskStore.UpdateCardList(ctx, importTask.ID, cards, totalCount)
result := h.processImport(ctx, importTask)
h.importTaskStore.UpdateResult(ctx, importTask.ID, result.successCount, result.skipCount, result.failCount, result.skippedItems, result.failedItems)
@@ -98,6 +130,43 @@ func (h *IotCardImportHandler) HandleIotCardImport(ctx context.Context, task *as
return nil
}
func (h *IotCardImportHandler) downloadAndParseCSV(ctx context.Context, task *model.IotCardImportTask) (model.CardListJSON, int, error) {
if h.storageService == nil {
return nil, 0, ErrStorageNotConfigured
}
if task.StorageKey == "" {
return nil, 0, ErrStorageKeyEmpty
}
localPath, cleanup, err := h.storageService.DownloadToTemp(ctx, task.StorageKey)
if err != nil {
return nil, 0, err
}
defer cleanup()
f, err := os.Open(localPath)
if err != nil {
return nil, 0, err
}
defer f.Close()
parseResult, err := utils.ParseCardCSV(f)
if err != nil {
return nil, 0, err
}
cards := make(model.CardListJSON, 0, len(parseResult.Cards))
for _, card := range parseResult.Cards {
cards = append(cards, model.CardItem{
ICCID: card.ICCID,
MSISDN: card.MSISDN,
})
}
return cards, parseResult.TotalCount, nil
}
type importResult struct {
successCount int
skipCount int
@@ -112,59 +181,72 @@ func (h *IotCardImportHandler) processImport(ctx context.Context, task *model.Io
failedItems: make(model.ImportResultItems, 0),
}
iccids := h.getICCIDsFromTask(task)
if len(iccids) == 0 {
cards := h.getCardsFromTask(task)
if len(cards) == 0 {
return result
}
for i := 0; i < len(iccids); i += batchSize {
end := min(i+batchSize, len(iccids))
batch := iccids[i:end]
for i := 0; i < len(cards); i += batchSize {
end := min(i+batchSize, len(cards))
batch := cards[i:end]
h.processBatch(ctx, task, batch, i+1, result)
}
return result
}
func (h *IotCardImportHandler) getICCIDsFromTask(task *model.IotCardImportTask) []string {
return []string(task.ICCIDList)
// getCardsFromTask 从任务中获取待导入的卡列表
func (h *IotCardImportHandler) getCardsFromTask(task *model.IotCardImportTask) []model.CardItem {
return []model.CardItem(task.CardList)
}
func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []string, startLine int, result *importResult) {
validICCIDs := make([]string, 0)
lineMap := make(map[string]int)
func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.IotCardImportTask, batch []model.CardItem, startLine int, result *importResult) {
type cardMeta struct {
line int
msisdn string
}
validCards := make([]model.CardItem, 0)
cardMetaMap := make(map[string]cardMeta)
for i, iccid := range batch {
for i, card := range batch {
line := startLine + i
lineMap[iccid] = line
cardMetaMap[card.ICCID] = cardMeta{line: line, msisdn: card.MSISDN}
validationResult := validator.ValidateICCID(iccid, task.CarrierType)
validationResult := validator.ValidateICCID(card.ICCID, task.CarrierType)
if !validationResult.Valid {
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: line,
ICCID: iccid,
ICCID: card.ICCID,
MSISDN: card.MSISDN,
Reason: validationResult.Message,
})
result.failCount++
continue
}
validICCIDs = append(validICCIDs, iccid)
validCards = append(validCards, card)
}
if len(validICCIDs) == 0 {
if len(validCards) == 0 {
return
}
validICCIDs := make([]string, len(validCards))
for i, card := range validCards {
validICCIDs[i] = card.ICCID
}
existingMap, err := h.iotCardStore.ExistsByICCIDBatch(ctx, validICCIDs)
if err != nil {
h.logger.Error("批量检查 ICCID 是否存在失败",
zap.Error(err),
zap.Int("batch_size", len(validICCIDs)),
)
for _, iccid := range validICCIDs {
for _, card := range validCards {
meta := cardMetaMap[card.ICCID]
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: lineMap[iccid],
ICCID: iccid,
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "数据库查询失败",
})
result.failCount++
@@ -172,29 +254,32 @@ func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.Iot
return
}
newICCIDs := make([]string, 0)
for _, iccid := range validICCIDs {
if existingMap[iccid] {
newCards := make([]model.CardItem, 0)
for _, card := range validCards {
meta := cardMetaMap[card.ICCID]
if existingMap[card.ICCID] {
result.skippedItems = append(result.skippedItems, model.ImportResultItem{
Line: lineMap[iccid],
ICCID: iccid,
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "ICCID 已存在",
})
result.skipCount++
} else {
newICCIDs = append(newICCIDs, iccid)
newCards = append(newCards, card)
}
}
if len(newICCIDs) == 0 {
if len(newCards) == 0 {
return
}
cards := make([]*model.IotCard, 0, len(newICCIDs))
iotCards := make([]*model.IotCard, 0, len(newCards))
now := time.Now()
for _, iccid := range newICCIDs {
card := &model.IotCard{
ICCID: iccid,
for _, card := range newCards {
iotCard := &model.IotCard{
ICCID: card.ICCID,
MSISDN: card.MSISDN,
CarrierID: task.CarrierID,
BatchNo: task.BatchNo,
Status: constants.IotCardStatusInStock,
@@ -203,22 +288,24 @@ func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.Iot
RealNameStatus: constants.RealNameStatusNotVerified,
NetworkStatus: constants.NetworkStatusOffline,
}
card.BaseModel.Creator = task.Creator
card.BaseModel.Updater = task.Creator
card.CreatedAt = now
card.UpdatedAt = now
cards = append(cards, card)
iotCard.BaseModel.Creator = task.Creator
iotCard.BaseModel.Updater = task.Creator
iotCard.CreatedAt = now
iotCard.UpdatedAt = now
iotCards = append(iotCards, iotCard)
}
if err := h.iotCardStore.CreateBatch(ctx, cards); err != nil {
if err := h.iotCardStore.CreateBatch(ctx, iotCards); err != nil {
h.logger.Error("批量创建 IoT 卡失败",
zap.Error(err),
zap.Int("batch_size", len(cards)),
zap.Int("batch_size", len(iotCards)),
)
for _, iccid := range newICCIDs {
for _, card := range newCards {
meta := cardMetaMap[card.ICCID]
result.failedItems = append(result.failedItems, model.ImportResultItem{
Line: lineMap[iccid],
ICCID: iccid,
Line: meta.line,
ICCID: card.ICCID,
MSISDN: meta.msisdn,
Reason: "数据库写入失败",
})
result.failCount++
@@ -226,5 +313,5 @@ func (h *IotCardImportHandler) processBatch(ctx context.Context, task *model.Iot
return
}
result.successCount += len(newICCIDs)
result.successCount += len(newCards)
}

View File

@@ -22,7 +22,7 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
importTaskStore := postgres.NewIotCardImportTaskStore(tx, rdb)
iotCardStore := postgres.NewIotCardStore(tx, rdb)
handler := NewIotCardImportHandler(tx, rdb, importTaskStore, iotCardStore, logger)
handler := NewIotCardImportHandler(tx, rdb, importTaskStore, iotCardStore, nil, logger)
ctx := context.Background()
t.Run("成功导入新ICCID", func(t *testing.T) {
@@ -30,8 +30,12 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
CarrierID: 1,
CarrierType: constants.CarrierCodeCMCC,
BatchNo: "TEST_BATCH_001",
ICCIDList: model.ICCIDListJSON{"89860012345678905001", "89860012345678905002", "89860012345678905003"},
TotalCount: 3,
CardList: model.CardListJSON{
{ICCID: "89860012345678905001", MSISDN: "13800000001"},
{ICCID: "89860012345678905002", MSISDN: "13800000002"},
{ICCID: "89860012345678905003", MSISDN: "13800000003"},
},
TotalCount: 3,
}
task.Creator = 1
@@ -43,6 +47,9 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
exists, _ := iotCardStore.ExistsByICCID(ctx, "89860012345678905001")
assert.True(t, exists)
card, _ := iotCardStore.GetByICCID(ctx, "89860012345678905001")
assert.Equal(t, "13800000001", card.MSISDN)
})
t.Run("跳过已存在的ICCID", func(t *testing.T) {
@@ -58,8 +65,11 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
CarrierID: 1,
CarrierType: constants.CarrierCodeCMCC,
BatchNo: "TEST_BATCH_002",
ICCIDList: model.ICCIDListJSON{"89860012345678906001", "89860012345678906002"},
TotalCount: 2,
CardList: model.CardListJSON{
{ICCID: "89860012345678906001", MSISDN: "13800000011"},
{ICCID: "89860012345678906002", MSISDN: "13800000012"},
},
TotalCount: 2,
}
task.Creator = 1
@@ -70,6 +80,7 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
assert.Equal(t, 0, result.failCount)
assert.Len(t, result.skippedItems, 1)
assert.Equal(t, "89860012345678906001", result.skippedItems[0].ICCID)
assert.Equal(t, "13800000011", result.skippedItems[0].MSISDN)
assert.Equal(t, "ICCID 已存在", result.skippedItems[0].Reason)
})
@@ -78,8 +89,11 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
CarrierID: 1,
CarrierType: constants.CarrierCodeCTCC,
BatchNo: "TEST_BATCH_003",
ICCIDList: model.ICCIDListJSON{"89860312345678907001", "898603123456789070"},
TotalCount: 2,
CardList: model.CardListJSON{
{ICCID: "89860312345678907001", MSISDN: "13900000001"},
{ICCID: "898603123456789070", MSISDN: "13900000002"},
},
TotalCount: 2,
}
task.Creator = 1
@@ -89,6 +103,7 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
assert.Equal(t, 0, result.skipCount)
assert.Equal(t, 2, result.failCount)
assert.Len(t, result.failedItems, 2)
assert.Equal(t, "13900000001", result.failedItems[0].MSISDN)
})
t.Run("混合场景-成功跳过和失败", func(t *testing.T) {
@@ -104,10 +119,10 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
CarrierID: 1,
CarrierType: constants.CarrierCodeCMCC,
BatchNo: "TEST_BATCH_004",
ICCIDList: model.ICCIDListJSON{
"89860012345678908001",
"89860012345678908002",
"invalid!iccid",
CardList: model.CardListJSON{
{ICCID: "89860012345678908001", MSISDN: "13800000021"},
{ICCID: "89860012345678908002", MSISDN: "13800000022"},
{ICCID: "invalid!iccid", MSISDN: "13800000023"},
},
TotalCount: 3,
}
@@ -120,12 +135,12 @@ func TestIotCardImportHandler_ProcessImport(t *testing.T) {
assert.Equal(t, 1, result.failCount)
})
t.Run("空ICCID列表", func(t *testing.T) {
t.Run("空列表", func(t *testing.T) {
task := &model.IotCardImportTask{
CarrierID: 1,
CarrierType: constants.CarrierCodeCMCC,
BatchNo: "TEST_BATCH_005",
ICCIDList: model.ICCIDListJSON{},
CardList: model.CardListJSON{},
TotalCount: 0,
}
@@ -146,10 +161,10 @@ func TestIotCardImportHandler_ProcessBatch(t *testing.T) {
importTaskStore := postgres.NewIotCardImportTaskStore(tx, rdb)
iotCardStore := postgres.NewIotCardStore(tx, rdb)
handler := NewIotCardImportHandler(tx, rdb, importTaskStore, iotCardStore, logger)
handler := NewIotCardImportHandler(tx, rdb, importTaskStore, iotCardStore, nil, logger)
ctx := context.Background()
t.Run("验证行号正确记录", func(t *testing.T) {
t.Run("验证行号和MSISDN正确记录", func(t *testing.T) {
existingCard := &model.IotCard{
ICCID: "89860012345678909002",
CardType: "data_card",
@@ -165,10 +180,10 @@ func TestIotCardImportHandler_ProcessBatch(t *testing.T) {
}
task.Creator = 1
batch := []string{
"89860012345678909001",
"89860012345678909002",
"invalid",
batch := []model.CardItem{
{ICCID: "89860012345678909001", MSISDN: "13800000031"},
{ICCID: "89860012345678909002", MSISDN: "13800000032"},
{ICCID: "invalid", MSISDN: "13800000033"},
}
result := &importResult{
skippedItems: make(model.ImportResultItems, 0),
@@ -182,6 +197,8 @@ func TestIotCardImportHandler_ProcessBatch(t *testing.T) {
assert.Equal(t, 1, result.failCount)
assert.Equal(t, 101, result.skippedItems[0].Line)
assert.Equal(t, "13800000032", result.skippedItems[0].MSISDN)
assert.Equal(t, 102, result.failedItems[0].Line)
assert.Equal(t, "13800000033", result.failedItems[0].MSISDN)
})
}