package integration import ( "context" "testing" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/break/junhong_cmp_fiber/pkg/constants" ) type EmailPayload struct { RequestID string `json:"request_id"` To string `json:"to"` Subject string `json:"subject"` Body string `json:"body"` CC []string `json:"cc,omitempty"` } func TestTaskSubmit(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = client.Close() }() // 构造任务载荷 payload := &EmailPayload{ RequestID: "test-request-001", To: "test@example.com", Subject: "Test Email", Body: "This is a test email", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.Queue(constants.QueueDefault), asynq.MaxRetry(constants.DefaultRetryMax), ) // 验证 require.NoError(t, err) assert.NotEmpty(t, info.ID) assert.Equal(t, constants.QueueDefault, info.Queue) assert.Equal(t, constants.DefaultRetryMax, info.MaxRetry) } func TestTaskPriority(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = client.Close() }() tests := []struct { name string queue string }{ {"Critical Priority", constants.QueueCritical}, {"Default Priority", constants.QueueDefault}, {"Low Priority", constants.QueueLow}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payload := &EmailPayload{ RequestID: "test-request-" + tt.queue, To: "test@example.com", Subject: "Test", Body: "Test", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.Queue(tt.queue)) require.NoError(t, err) assert.Equal(t, tt.queue, info.Queue) }) } } func TestTaskRetry(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = client.Close() }() payload := &EmailPayload{ RequestID: "retry-test-001", To: "test@example.com", Subject: "Retry Test", Body: "Test retry mechanism", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务并设置重试次数 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(30*time.Second), ) require.NoError(t, err) assert.Equal(t, 3, info.MaxRetry) assert.Equal(t, 30*time.Second, info.Timeout) } func TestTaskIdempotency(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) requestID := "idempotent-test-" + time.Now().Format("20060102150405.000") lockKey := constants.RedisTaskLockKey(requestID) rdb.Del(ctx, lockKey) t.Cleanup(func() { rdb.Del(ctx, lockKey) }) result, err := rdb.SetNX(ctx, lockKey, "1", 24*time.Hour).Result() require.NoError(t, err) assert.True(t, result, "第一次设置锁应该成功") // 第二次设置锁(模拟重复任务) result, err = rdb.SetNX(ctx, lockKey, "1", 24*time.Hour).Result() require.NoError(t, err) assert.False(t, result, "第二次设置锁应该失败(幂等性)") // 验证锁存在 exists, err := rdb.Exists(ctx, lockKey).Result() require.NoError(t, err) assert.Equal(t, int64(1), exists) // 验证 TTL ttl, err := rdb.TTL(ctx, lockKey).Result() require.NoError(t, err) assert.Greater(t, ttl.Hours(), 23.0) assert.LessOrEqual(t, ttl.Hours(), 24.0) } func TestTaskStatusTracking(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) taskID := "task-123456" statusKey := constants.RedisTaskStatusKey(taskID) // 设置任务状态 statuses := []string{"pending", "processing", "completed"} for _, status := range statuses { err := rdb.Set(ctx, statusKey, status, 7*24*time.Hour).Err() require.NoError(t, err) // 读取状态 result, err := rdb.Get(ctx, statusKey).Result() require.NoError(t, err) assert.Equal(t, status, result) } // 验证 TTL ttl, err := rdb.TTL(ctx, statusKey).Result() require.NoError(t, err) assert.Greater(t, ttl.Hours(), 24.0*6) } func TestQueueInspection(t *testing.T) { rdb := redis.NewClient(&redis.Options{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = rdb.Close() }() ctx := context.Background() cleanTestKeys(t, rdb, ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = client.Close() }() // 提交多个任务 for i := 0; i < 5; i++ { payload := &EmailPayload{ RequestID: "test-" + string(rune(i)), To: "test@example.com", Subject: "Test", Body: "Test", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) _, err = client.Enqueue(task, asynq.Queue(constants.QueueDefault)) require.NoError(t, err) } inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: testRedisAddr, Password: testRedisPasswd, DB: testRedisDB, }) defer func() { _ = inspector.Close() }() // 获取队列信息 info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.Equal(t, 5, info.Pending) assert.Equal(t, 0, info.Active) } func TestTaskSerialization(t *testing.T) { tests := []struct { name string payload EmailPayload }{ { name: "Simple Email", payload: EmailPayload{ RequestID: "req-001", To: "user@example.com", Subject: "Hello", Body: "Hello World", }, }, { name: "Email with CC", payload: EmailPayload{ RequestID: "req-002", To: "user@example.com", Subject: "Hello", Body: "Hello World", CC: []string{"cc1@example.com", "cc2@example.com"}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payloadBytes, err := sonic.Marshal(tt.payload) require.NoError(t, err) assert.NotEmpty(t, payloadBytes) var decoded EmailPayload err = sonic.Unmarshal(payloadBytes, &decoded) require.NoError(t, err) assert.Equal(t, tt.payload.RequestID, decoded.RequestID) assert.Equal(t, tt.payload.To, decoded.To) assert.Equal(t, tt.payload.Subject, decoded.Subject) assert.Equal(t, tt.payload.Body, decoded.Body) assert.Equal(t, tt.payload.CC, decoded.CC) }) } } func cleanTestKeys(t *testing.T, rdb *redis.Client, ctx context.Context) { t.Helper() prefix := "test:task:" + t.Name() + ":" keys, err := rdb.Keys(ctx, prefix+"*").Result() if err != nil { return } if len(keys) > 0 { rdb.Del(ctx, keys...) } asynqKeys, _ := rdb.Keys(ctx, "asynq:*").Result() if len(asynqKeys) > 0 { rdb.Del(ctx, asynqKeys...) } }