package integration import ( "context" "os" "testing" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/tests/testutils" ) type EmailPayload struct { RequestID string `json:"request_id"` To string `json:"to"` Subject string `json:"subject"` Body string `json:"body"` CC []string `json:"cc,omitempty"` } func getRedisOpt() asynq.RedisClientOpt { host := os.Getenv("JUNHONG_REDIS_ADDRESS") if host == "" { host = "localhost" } port := os.Getenv("JUNHONG_REDIS_PORT") if port == "" { port = "6379" } password := os.Getenv("JUNHONG_REDIS_PASSWORD") return asynq.RedisClientOpt{ Addr: host + ":" + port, Password: password, DB: 0, } } func TestTaskSubmit(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) _ = rdb client := asynq.NewClient(getRedisOpt()) defer func() { _ = client.Close() }() // 构造任务载荷 payload := &EmailPayload{ RequestID: "test-request-001", To: "test@example.com", Subject: "Test Email", Body: "This is a test email", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.Queue(constants.QueueDefault), asynq.MaxRetry(constants.DefaultRetryMax), ) // 验证 require.NoError(t, err) assert.NotEmpty(t, info.ID) assert.Equal(t, constants.QueueDefault, info.Queue) assert.Equal(t, constants.DefaultRetryMax, info.MaxRetry) } func TestTaskPriority(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) client := asynq.NewClient(getRedisOpt()) defer func() { _ = client.Close() }() tests := []struct { name string queue string }{ {"Critical Priority", constants.QueueCritical}, {"Default Priority", constants.QueueDefault}, {"Low Priority", constants.QueueLow}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payload := &EmailPayload{ RequestID: "test-request-" + tt.queue, To: "test@example.com", Subject: "Test", Body: "Test", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.Queue(tt.queue)) require.NoError(t, err) assert.Equal(t, tt.queue, info.Queue) }) } } func TestTaskRetry(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) client := asynq.NewClient(getRedisOpt()) defer func() { _ = client.Close() }() payload := &EmailPayload{ RequestID: "retry-test-001", To: "test@example.com", Subject: "Retry Test", Body: "Test retry mechanism", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务并设置重试次数 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(30*time.Second), ) require.NoError(t, err) assert.Equal(t, 3, info.MaxRetry) assert.Equal(t, 30*time.Second, info.Timeout) } func TestTaskIdempotency(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) ctx := context.Background() requestID := "idempotent-test-" + time.Now().Format("20060102150405.000") lockKey := constants.RedisTaskLockKey(requestID) rdb.Del(ctx, lockKey) t.Cleanup(func() { rdb.Del(ctx, lockKey) }) result, err := rdb.SetNX(ctx, lockKey, "1", 24*time.Hour).Result() require.NoError(t, err) assert.True(t, result, "第一次设置锁应该成功") // 第二次设置锁(模拟重复任务) result, err = rdb.SetNX(ctx, lockKey, "1", 24*time.Hour).Result() require.NoError(t, err) assert.False(t, result, "第二次设置锁应该失败(幂等性)") // 验证锁存在 exists, err := rdb.Exists(ctx, lockKey).Result() require.NoError(t, err) assert.Equal(t, int64(1), exists) // 验证 TTL ttl, err := rdb.TTL(ctx, lockKey).Result() require.NoError(t, err) assert.Greater(t, ttl.Hours(), 23.0) assert.LessOrEqual(t, ttl.Hours(), 24.0) } func TestTaskStatusTracking(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) ctx := context.Background() taskID := "task-123456" statusKey := constants.RedisTaskStatusKey(taskID) // 设置任务状态 statuses := []string{"pending", "processing", "completed"} for _, status := range statuses { err := rdb.Set(ctx, statusKey, status, 7*24*time.Hour).Err() require.NoError(t, err) // 读取状态 result, err := rdb.Get(ctx, statusKey).Result() require.NoError(t, err) assert.Equal(t, status, result) } // 验证 TTL ttl, err := rdb.TTL(ctx, statusKey).Result() require.NoError(t, err) assert.Greater(t, ttl.Hours(), 24.0*6) } func TestQueueInspection(t *testing.T) { rdb := testutils.GetTestRedis(t) testutils.CleanTestRedisKeys(t, rdb) inspector := asynq.NewInspector(getRedisOpt()) defer func() { _ = inspector.Close() }() _, _ = inspector.DeleteAllPendingTasks(constants.QueueDefault) _, _ = inspector.DeleteAllScheduledTasks(constants.QueueDefault) _, _ = inspector.DeleteAllRetryTasks(constants.QueueDefault) _, _ = inspector.DeleteAllArchivedTasks(constants.QueueDefault) client := asynq.NewClient(getRedisOpt()) defer func() { _ = client.Close() }() for i := 0; i < 5; i++ { payload := &EmailPayload{ RequestID: "test-" + string(rune(i)), To: "test@example.com", Subject: "Test", Body: "Test", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) _, err = client.Enqueue(task, asynq.Queue(constants.QueueDefault)) require.NoError(t, err) } info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.Equal(t, 5, info.Pending) assert.Equal(t, 0, info.Active) } func TestTaskSerialization(t *testing.T) { tests := []struct { name string payload EmailPayload }{ { name: "Simple Email", payload: EmailPayload{ RequestID: "req-001", To: "user@example.com", Subject: "Hello", Body: "Hello World", }, }, { name: "Email with CC", payload: EmailPayload{ RequestID: "req-002", To: "user@example.com", Subject: "Hello", Body: "Hello World", CC: []string{"cc1@example.com", "cc2@example.com"}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payloadBytes, err := sonic.Marshal(tt.payload) require.NoError(t, err) assert.NotEmpty(t, payloadBytes) var decoded EmailPayload err = sonic.Unmarshal(payloadBytes, &decoded) require.NoError(t, err) assert.Equal(t, tt.payload.RequestID, decoded.RequestID) assert.Equal(t, tt.payload.To, decoded.To) assert.Equal(t, tt.payload.Subject, decoded.Subject) assert.Equal(t, tt.payload.Body, decoded.Body) assert.Equal(t, tt.payload.CC, decoded.CC) }) } }