package integration import ( "bytes" "context" "encoding/json" "fmt" "mime/multipart" "net/http/httptest" "testing" "time" "github.com/break/junhong_cmp_fiber/internal/bootstrap" internalMiddleware "github.com/break/junhong_cmp_fiber/internal/middleware" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/routes" "github.com/break/junhong_cmp_fiber/pkg/auth" "github.com/break/junhong_cmp_fiber/pkg/config" pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/break/junhong_cmp_fiber/pkg/response" "github.com/break/junhong_cmp_fiber/tests/testutil" "github.com/gofiber/fiber/v2" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/logger" ) type iotCardTestEnv struct { db *gorm.DB rdb *redis.Client tokenManager *auth.TokenManager app *fiber.App adminToken string t *testing.T } func setupIotCardTestEnv(t *testing.T) *iotCardTestEnv { t.Helper() // 设置测试环境变量 t.Setenv("JUNHONG_DATABASE_HOST", "cxd.whcxd.cn") t.Setenv("JUNHONG_DATABASE_PORT", "16159") t.Setenv("JUNHONG_DATABASE_USER", "erp_pgsql") t.Setenv("JUNHONG_DATABASE_PASSWORD", "erp_2025") t.Setenv("JUNHONG_DATABASE_DBNAME", "junhong_cmp_test") t.Setenv("JUNHONG_REDIS_ADDRESS", "cxd.whcxd.cn") t.Setenv("JUNHONG_REDIS_PORT", "16299") t.Setenv("JUNHONG_REDIS_PASSWORD", "cpNbWtAaqgo1YJmbMp3h") t.Setenv("JUNHONG_JWT_SECRET_KEY", "test_secret_key_for_integration_tests") cfg, err := config.Load() require.NoError(t, err) err = config.Set(cfg) require.NoError(t, err) zapLogger, _ := zap.NewDevelopment() dsn := "host=cxd.whcxd.cn port=16159 user=erp_pgsql password=erp_2025 dbname=junhong_cmp_test sslmode=disable TimeZone=Asia/Shanghai" db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) require.NoError(t, err) rdb := redis.NewClient(&redis.Options{ Addr: "cxd.whcxd.cn:16299", Password: "cpNbWtAaqgo1YJmbMp3h", DB: 15, }) ctx := context.Background() err = rdb.Ping(ctx).Err() require.NoError(t, err) testPrefix := fmt.Sprintf("test:%s:", t.Name()) keys, _ := rdb.Keys(ctx, testPrefix+"*").Result() if len(keys) > 0 { rdb.Del(ctx, keys...) } tokenManager := auth.NewTokenManager(rdb, 24*time.Hour, 7*24*time.Hour) superAdmin := testutil.CreateSuperAdmin(t, db) adminToken, _ := testutil.GenerateTestToken(t, rdb, superAdmin, "web") queueClient := queue.NewClient(rdb, zapLogger) deps := &bootstrap.Dependencies{ DB: db, Redis: rdb, Logger: zapLogger, TokenManager: tokenManager, QueueClient: queueClient, } result, err := bootstrap.Bootstrap(deps) require.NoError(t, err) app := fiber.New(fiber.Config{ ErrorHandler: internalMiddleware.ErrorHandler(zapLogger), }) routes.RegisterRoutes(app, result.Handlers, result.Middlewares) return &iotCardTestEnv{ db: db, rdb: rdb, tokenManager: tokenManager, app: app, adminToken: adminToken, t: t, } } func (e *iotCardTestEnv) teardown() { e.db.Exec("DELETE FROM tb_iot_card WHERE iccid LIKE 'TEST%'") e.db.Exec("DELETE FROM tb_iot_card_import_task WHERE task_no LIKE 'TEST%'") ctx := context.Background() testPrefix := fmt.Sprintf("test:%s:", e.t.Name()) keys, _ := e.rdb.Keys(ctx, testPrefix+"*").Result() if len(keys) > 0 { e.rdb.Del(ctx, keys...) } e.rdb.Close() } func TestIotCard_ListStandalone(t *testing.T) { env := setupIotCardTestEnv(t) defer env.teardown() cards := []*model.IotCard{ {ICCID: "TEST0012345678901001", CardType: "data_card", CarrierID: 1, Status: 1}, {ICCID: "TEST0012345678901002", CardType: "data_card", CarrierID: 1, Status: 1}, {ICCID: "TEST0012345678901003", CardType: "data_card", CarrierID: 2, Status: 2}, } for _, card := range cards { require.NoError(t, env.db.Create(card).Error) } t.Run("获取单卡列表-无过滤", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/standalone?page=1&page_size=20", nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) }) t.Run("获取单卡列表-按运营商过滤", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/standalone?carrier_id=1", nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) }) t.Run("获取单卡列表-按ICCID模糊查询", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/standalone?iccid=901001", nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) }) t.Run("未认证请求应返回错误", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/standalone", nil) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.NotEqual(t, 0, result.Code, "未认证请求应返回错误码") }) } func TestIotCard_Import(t *testing.T) { env := setupIotCardTestEnv(t) defer env.teardown() t.Run("导入CSV文件", func(t *testing.T) { body := &bytes.Buffer{} writer := multipart.NewWriter(body) part, err := writer.CreateFormFile("file", "test.csv") require.NoError(t, err) csvContent := "iccid\nTEST0012345678902001\nTEST0012345678902002\nTEST0012345678902003" _, err = part.Write([]byte(csvContent)) require.NoError(t, err) _ = writer.WriteField("carrier_id", "1") _ = writer.WriteField("carrier_type", "CMCC") _ = writer.WriteField("batch_no", "TEST_BATCH_001") writer.Close() req := httptest.NewRequest("POST", "/api/admin/iot-cards/import", body) req.Header.Set("Content-Type", writer.FormDataContentType()) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) t.Logf("Import response: code=%d, message=%s", result.Code, result.Message) assert.Equal(t, 200, resp.StatusCode) assert.Equal(t, 0, result.Code) }) t.Run("导入无文件应返回错误", func(t *testing.T) { body := &bytes.Buffer{} writer := multipart.NewWriter(body) _ = writer.WriteField("carrier_id", "1") _ = writer.WriteField("carrier_type", "CMCC") writer.Close() req := httptest.NewRequest("POST", "/api/admin/iot-cards/import", body) req.Header.Set("Content-Type", writer.FormDataContentType()) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) t.Logf("No file response: code=%d, message=%s, data=%v", result.Code, result.Message, result.Data) assert.NotEqual(t, 0, result.Code, "无文件时应返回错误码") }) } func TestIotCard_ImportTaskList(t *testing.T) { env := setupIotCardTestEnv(t) defer env.teardown() task := &model.IotCardImportTask{ TaskNo: "TEST20260123001", Status: model.ImportTaskStatusCompleted, CarrierID: 1, CarrierType: "CMCC", TotalCount: 100, } require.NoError(t, env.db.Create(task).Error) t.Run("获取导入任务列表", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/import-tasks?page=1&page_size=20", nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) }) t.Run("获取导入任务详情", func(t *testing.T) { url := fmt.Sprintf("/api/admin/iot-cards/import-tasks/%d", task.ID) req := httptest.NewRequest("GET", url, nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) }) } // TestIotCard_ImportE2E 端到端测试:API提交 -> Worker处理 -> 数据验证 func TestIotCard_ImportE2E(t *testing.T) { t.Setenv("CONFIG_ENV", "dev") t.Setenv("CONFIG_PATH", "../../configs/config.dev.yaml") cfg, err := config.Load() require.NoError(t, err) err = config.Set(cfg) require.NoError(t, err) zapLogger, _ := zap.NewDevelopment() dsn := "host=cxd.whcxd.cn port=16159 user=erp_pgsql password=erp_2025 dbname=junhong_cmp_test sslmode=disable TimeZone=Asia/Shanghai" db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) require.NoError(t, err) rdb := redis.NewClient(&redis.Options{ Addr: "cxd.whcxd.cn:16299", Password: "cpNbWtAaqgo1YJmbMp3h", DB: 15, }) defer rdb.Close() ctx := context.Background() err = rdb.Ping(ctx).Err() require.NoError(t, err) // 清理测试数据(包括之前运行遗留的数据) testICCIDPrefix := "E2ETEST" testBatchNo1 := fmt.Sprintf("E2E_BATCH_%d_001", time.Now().UnixNano()) testBatchNo2 := fmt.Sprintf("E2E_BATCH_%d_002", time.Now().UnixNano()) db.Exec("DELETE FROM tb_iot_card WHERE iccid LIKE ?", testICCIDPrefix+"%") db.Exec("DELETE FROM tb_iot_card_import_task WHERE batch_no LIKE ?", "E2E_BATCH%") cleanAsynqQueues(t, rdb) t.Cleanup(func() { db.Exec("DELETE FROM tb_iot_card WHERE iccid LIKE ?", testICCIDPrefix+"%") db.Exec("DELETE FROM tb_iot_card_import_task WHERE batch_no LIKE ?", "E2E_BATCH%") cleanAsynqQueues(t, rdb) }) // 启动 Worker 服务器 workerServer := startTestWorker(t, db, rdb, zapLogger) defer workerServer.Shutdown() // 等待 Worker 启动 time.Sleep(500 * time.Millisecond) // 设置 API 服务 tokenManager := auth.NewTokenManager(rdb, 24*time.Hour, 7*24*time.Hour) superAdmin := testutil.CreateSuperAdmin(t, db) adminToken, _ := testutil.GenerateTestToken(t, rdb, superAdmin, "web") queueClient := queue.NewClient(rdb, zapLogger) deps := &bootstrap.Dependencies{ DB: db, Redis: rdb, Logger: zapLogger, TokenManager: tokenManager, QueueClient: queueClient, } result, err := bootstrap.Bootstrap(deps) require.NoError(t, err) app := fiber.New(fiber.Config{ ErrorHandler: internalMiddleware.ErrorHandler(zapLogger), }) routes.RegisterRoutes(app, result.Handlers, result.Middlewares) // 准备测试用的 ICCID(20位,满足 CMCC 要求) testICCIDs := []string{ testICCIDPrefix + "1234567890123", testICCIDPrefix + "1234567890124", testICCIDPrefix + "1234567890125", } t.Run("完整导入流程验证", func(t *testing.T) { // Step 1: 通过 API 提交导入任务 body := &bytes.Buffer{} writer := multipart.NewWriter(body) part, err := writer.CreateFormFile("file", "e2e_test.csv") require.NoError(t, err) csvContent := "iccid\n" + testICCIDs[0] + "\n" + testICCIDs[1] + "\n" + testICCIDs[2] _, err = part.Write([]byte(csvContent)) require.NoError(t, err) _ = writer.WriteField("carrier_id", "1") _ = writer.WriteField("carrier_type", "CMCC") _ = writer.WriteField("batch_no", testBatchNo1) writer.Close() req := httptest.NewRequest("POST", "/api/admin/iot-cards/import", body) req.Header.Set("Content-Type", writer.FormDataContentType()) req.Header.Set("Authorization", "Bearer "+adminToken) resp, err := app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var apiResult response.Response err = json.NewDecoder(resp.Body).Decode(&apiResult) require.NoError(t, err) require.Equal(t, 0, apiResult.Code, "API 应返回成功: %s", apiResult.Message) // 从响应中提取 task_id dataMap, ok := apiResult.Data.(map[string]interface{}) require.True(t, ok, "响应数据应为 map") taskIDFloat, ok := dataMap["task_id"].(float64) require.True(t, ok, "task_id 应存在") taskID := uint(taskIDFloat) t.Logf("创建的导入任务 ID: %d", taskID) // Step 2: 等待 Worker 处理完成(轮询检查任务状态) var importTask model.IotCardImportTask maxWaitTime := 30 * time.Second pollInterval := 500 * time.Millisecond startTime := time.Now() skipCtx := pkggorm.SkipDataPermission(ctx) for { if time.Since(startTime) > maxWaitTime { t.Fatalf("等待超时:任务 %d 未在 %v 内完成", taskID, maxWaitTime) } err = db.WithContext(skipCtx).First(&importTask, taskID).Error require.NoError(t, err) t.Logf("任务状态: %d (1=pending, 2=processing, 3=completed, 4=failed)", importTask.Status) if importTask.Status == model.ImportTaskStatusCompleted || importTask.Status == model.ImportTaskStatusFailed { break } time.Sleep(pollInterval) } // Step 3: 验证任务完成状态 assert.Equal(t, model.ImportTaskStatusCompleted, importTask.Status, "任务应完成") assert.Equal(t, 3, importTask.TotalCount, "总数应为3") assert.Equal(t, 3, importTask.SuccessCount, "成功数应为3") assert.Equal(t, 0, importTask.SkipCount, "跳过数应为0") assert.Equal(t, 0, importTask.FailCount, "失败数应为0") t.Logf("任务完成: total=%d, success=%d, skip=%d, fail=%d", importTask.TotalCount, importTask.SuccessCount, importTask.SkipCount, importTask.FailCount) // Step 4: 验证 IoT 卡已入库 var cards []model.IotCard err = db.WithContext(skipCtx).Where("iccid IN ?", testICCIDs).Find(&cards).Error require.NoError(t, err) assert.Len(t, cards, 3, "应创建3张 IoT 卡") for _, card := range cards { assert.Equal(t, uint(1), card.CarrierID, "运营商ID应为1") assert.Equal(t, testBatchNo1, card.BatchNo, "批次号应匹配") assert.Equal(t, 1, card.Status, "状态应为在库(1)") t.Logf("已创建 IoT 卡: ICCID=%s, ID=%d", card.ICCID, card.ID) } }) t.Run("重复导入应跳过已存在的ICCID", func(t *testing.T) { // 再次导入相同的 ICCID,应该全部跳过 body := &bytes.Buffer{} writer := multipart.NewWriter(body) part, err := writer.CreateFormFile("file", "e2e_test_dup.csv") require.NoError(t, err) csvContent := "iccid\n" + testICCIDs[0] + "\n" + testICCIDs[1] _, err = part.Write([]byte(csvContent)) require.NoError(t, err) _ = writer.WriteField("carrier_id", "1") _ = writer.WriteField("carrier_type", "CMCC") _ = writer.WriteField("batch_no", testBatchNo2) writer.Close() req := httptest.NewRequest("POST", "/api/admin/iot-cards/import", body) req.Header.Set("Content-Type", writer.FormDataContentType()) req.Header.Set("Authorization", "Bearer "+adminToken) resp, err := app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var apiResult response.Response err = json.NewDecoder(resp.Body).Decode(&apiResult) require.NoError(t, err) require.Equal(t, 0, apiResult.Code) dataMap := apiResult.Data.(map[string]interface{}) taskID := uint(dataMap["task_id"].(float64)) // 等待处理完成 var importTask model.IotCardImportTask maxWaitTime := 30 * time.Second startTime := time.Now() skipCtx := pkggorm.SkipDataPermission(ctx) for { if time.Since(startTime) > maxWaitTime { t.Fatalf("等待超时") } db.WithContext(skipCtx).First(&importTask, taskID) if importTask.Status == model.ImportTaskStatusCompleted || importTask.Status == model.ImportTaskStatusFailed { break } time.Sleep(500 * time.Millisecond) } // 验证:2条应该全部跳过 assert.Equal(t, model.ImportTaskStatusCompleted, importTask.Status) assert.Equal(t, 2, importTask.TotalCount) assert.Equal(t, 0, importTask.SuccessCount, "成功数应为0(全部跳过)") assert.Equal(t, 2, importTask.SkipCount, "跳过数应为2") t.Logf("重复导入结果: success=%d, skip=%d", importTask.SuccessCount, importTask.SkipCount) }) } func cleanAsynqQueues(t *testing.T, rdb *redis.Client) { t.Helper() ctx := context.Background() keys, err := rdb.Keys(ctx, "asynq:*").Result() if err != nil { t.Logf("获取 asynq 队列键失败: %v", err) return } if len(keys) > 0 { deleted, err := rdb.Del(ctx, keys...).Result() if err != nil { t.Logf("删除 asynq 队列键失败: %v", err) } else { t.Logf("清理了 %d 个 asynq 队列键", deleted) } } } func startTestWorker(t *testing.T, db *gorm.DB, rdb *redis.Client, logger *zap.Logger) *queue.Server { t.Helper() queueCfg := &config.QueueConfig{ Concurrency: 2, Queues: map[string]int{ "default": 1, }, } workerServer := queue.NewServer(rdb, queueCfg, logger) taskHandler := queue.NewHandler(db, rdb, nil, logger) taskHandler.RegisterHandlers() go func() { if err := workerServer.Start(taskHandler.GetMux()); err != nil { t.Logf("Worker 服务器启动错误: %v", err) } }() t.Logf("测试 Worker 服务器已启动") return workerServer } func TestIotCard_GetByICCID(t *testing.T) { env := setupIotCardTestEnv(t) defer env.teardown() // 创建测试运营商 carrier := &model.Carrier{ CarrierCode: "TEST001", CarrierName: "测试运营商", CarrierType: "CMCC", Status: 1, } require.NoError(t, env.db.Create(carrier).Error) // 创建测试 IoT 卡 card := &model.IotCard{ ICCID: "TEST_ICCID_001", CarrierID: carrier.ID, MSISDN: "13800000001", CardType: "physical", CardCategory: "normal", CostPrice: 1000, DistributePrice: 1500, Status: 1, } require.NoError(t, env.db.Create(card).Error) t.Run("通过ICCID查询单卡详情-成功", func(t *testing.T) { url := fmt.Sprintf("/api/admin/iot-cards/by-iccid/%s", card.ICCID) req := httptest.NewRequest("GET", url, nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.Equal(t, 0, result.Code) // 验证返回数据 dataMap, ok := result.Data.(map[string]interface{}) require.True(t, ok) assert.Equal(t, "TEST_ICCID_001", dataMap["iccid"]) assert.Equal(t, "13800000001", dataMap["msisdn"]) }) t.Run("通过不存在的ICCID查询-应返回错误", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/admin/iot-cards/by-iccid/NONEXISTENT_ICCID", nil) req.Header.Set("Authorization", "Bearer "+env.adminToken) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.NotEqual(t, 0, result.Code, "不存在的ICCID应返回错误码") }) t.Run("未认证请求-应返回错误", func(t *testing.T) { url := fmt.Sprintf("/api/admin/iot-cards/by-iccid/%s", card.ICCID) req := httptest.NewRequest("GET", url, nil) resp, err := env.app.Test(req, -1) require.NoError(t, err) defer resp.Body.Close() var result response.Response err = json.NewDecoder(resp.Body).Decode(&result) require.NoError(t, err) assert.NotEqual(t, 0, result.Code, "未认证请求应返回错误码") }) }