package unit import ( "context" "testing" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/break/junhong_cmp_fiber/pkg/constants" ) // TestQueueClientEnqueue 测试任务入队 func TestQueueClientEnqueue(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() payload := map[string]string{ "request_id": "test-001", "to": "test@example.com", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task) require.NoError(t, err) assert.NotEmpty(t, info.ID) assert.Equal(t, constants.QueueDefault, info.Queue) } // TestQueueClientEnqueueWithOptions 测试带选项的任务入队 func TestQueueClientEnqueueWithOptions(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() tests := []struct { name string opts []asynq.Option verify func(*testing.T, *asynq.TaskInfo) }{ { name: "Custom Queue", opts: []asynq.Option{ asynq.Queue(constants.QueueCritical), }, verify: func(t *testing.T, info *asynq.TaskInfo) { assert.Equal(t, constants.QueueCritical, info.Queue) }, }, { name: "Custom Retry", opts: []asynq.Option{ asynq.MaxRetry(3), }, verify: func(t *testing.T, info *asynq.TaskInfo) { assert.Equal(t, 3, info.MaxRetry) }, }, { name: "Custom Timeout", opts: []asynq.Option{ asynq.Timeout(5 * time.Minute), }, verify: func(t *testing.T, info *asynq.TaskInfo) { assert.Equal(t, 5*time.Minute, info.Timeout) }, }, { name: "Delayed Task", opts: []asynq.Option{ asynq.ProcessIn(10 * time.Second), }, verify: func(t *testing.T, info *asynq.TaskInfo) { assert.True(t, info.NextProcessAt.After(time.Now())) }, }, { name: "Combined Options", opts: []asynq.Option{ asynq.Queue(constants.QueueCritical), asynq.MaxRetry(5), asynq.Timeout(10 * time.Minute), }, verify: func(t *testing.T, info *asynq.TaskInfo) { assert.Equal(t, constants.QueueCritical, info.Queue) assert.Equal(t, 5, info.MaxRetry) assert.Equal(t, 10*time.Minute, info.Timeout) }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payload := map[string]string{ "request_id": "test-" + tt.name, } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, tt.opts...) require.NoError(t, err) tt.verify(t, info) }) } } // TestQueueClientTaskUniqueness 测试任务唯一性 func TestQueueClientTaskUniqueness(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() payload := map[string]string{ "request_id": "unique-001", "to": "test@example.com", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 第一次提交 task1 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info1, err := client.Enqueue(task1, asynq.TaskID("unique-task-001"), asynq.Unique(1*time.Hour), ) require.NoError(t, err) assert.NotNil(t, info1) // 第二次提交(重复) task2 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info2, err := client.Enqueue(task2, asynq.TaskID("unique-task-001"), asynq.Unique(1*time.Hour), ) // 应该返回错误(任务已存在) assert.Error(t, err) assert.Nil(t, info2) } // TestQueuePriorityWeights 测试队列优先级权重 func TestQueuePriorityWeights(t *testing.T) { queues := map[string]int{ constants.QueueCritical: 6, constants.QueueDefault: 3, constants.QueueLow: 1, } // 验证权重总和 totalWeight := 0 for _, weight := range queues { totalWeight += weight } assert.Equal(t, 10, totalWeight) // 验证权重比例 assert.Equal(t, 0.6, float64(queues[constants.QueueCritical])/float64(totalWeight)) assert.Equal(t, 0.3, float64(queues[constants.QueueDefault])/float64(totalWeight)) assert.Equal(t, 0.1, float64(queues[constants.QueueLow])/float64(totalWeight)) } // TestTaskPayloadSizeLimit 测试任务载荷大小限制 func TestTaskPayloadSizeLimit(t *testing.T) { tests := []struct { name string payloadSize int shouldError bool }{ { name: "Small Payload (1KB)", payloadSize: 1024, shouldError: false, }, { name: "Medium Payload (100KB)", payloadSize: 100 * 1024, shouldError: false, }, { name: "Large Payload (1MB)", payloadSize: 1024 * 1024, shouldError: false, }, // Redis 默认支持最大 512MB,但实际应用中不建议超过 1MB } redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // 创建指定大小的载荷 largeData := make([]byte, tt.payloadSize) for i := range largeData { largeData[i] = byte(i % 256) } payload := map[string]interface{}{ "request_id": "size-test-001", "data": largeData, } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeDataSync, payloadBytes) info, err := client.Enqueue(task) if tt.shouldError { assert.Error(t, err) } else { require.NoError(t, err) assert.NotNil(t, info) } }) } } // TestTaskScheduling 测试任务调度 func TestTaskScheduling(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() tests := []struct { name string scheduleOpt asynq.Option expectedTime time.Time }{ { name: "Process In 5 Seconds", scheduleOpt: asynq.ProcessIn(5 * time.Second), expectedTime: time.Now().Add(5 * time.Second), }, { name: "Process At Specific Time", scheduleOpt: asynq.ProcessAt(time.Now().Add(10 * time.Second)), expectedTime: time.Now().Add(10 * time.Second), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payload := map[string]string{ "request_id": "schedule-test-" + tt.name, } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, tt.scheduleOpt) require.NoError(t, err) assert.True(t, info.NextProcessAt.After(time.Now())) // 允许 1 秒的误差 assert.WithinDuration(t, tt.expectedTime, info.NextProcessAt, 1*time.Second) }) } } // TestQueueInspectorStats 测试队列统计 func TestQueueInspectorStats(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() // 提交一些任务 for i := 0; i < 5; i++ { payload := map[string]string{ "request_id": "stats-test-" + string(rune(i)), } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) _, err = client.Enqueue(task) require.NoError(t, err) } // 使用 Inspector 查询统计 inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = inspector.Close() }() info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.Equal(t, 5, info.Pending) assert.Equal(t, 0, info.Active) assert.Equal(t, 0, info.Completed) } // TestTaskRetention 测试任务保留策略 func TestTaskRetention(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() payload := map[string]string{ "request_id": "retention-test-001", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务并设置保留时间 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task, asynq.Retention(24*time.Hour), // 保留 24 小时 ) require.NoError(t, err) assert.NotNil(t, info) } // TestQueueDraining 测试队列暂停和恢复 func TestQueueDraining(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = inspector.Close() }() // 暂停队列 err := inspector.PauseQueue(constants.QueueDefault) require.NoError(t, err) // 检查队列是否已暂停 info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.True(t, info.Paused) // 恢复队列 err = inspector.UnpauseQueue(constants.QueueDefault) require.NoError(t, err) // 检查队列是否已恢复 info, err = inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.False(t, info.Paused) } // TestTaskCancellation 测试任务取消 func TestTaskCancellation(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() payload := map[string]string{ "request_id": "cancel-test-001", } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) // 提交任务 task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) info, err := client.Enqueue(task) require.NoError(t, err) // 取消任务 inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = inspector.Close() }() err = inspector.DeleteTask(constants.QueueDefault, info.ID) require.NoError(t, err) // 验证任务已删除 queueInfo, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.Equal(t, 0, queueInfo.Pending) } // TestBatchTaskEnqueue 测试批量任务入队 func TestBatchTaskEnqueue(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() // 批量创建任务 batchSize := 100 for i := 0; i < batchSize; i++ { payload := map[string]string{ "request_id": "batch-" + string(rune(i)), } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) _, err = client.Enqueue(task) require.NoError(t, err) } // 验证任务数量 inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = inspector.Close() }() info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.Equal(t, batchSize, info.Pending) } // TestTaskGrouping 测试任务分组 func TestTaskGrouping(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) defer func() { _ = redisClient.Close() }() ctx := context.Background() redisClient.FlushDB(ctx) client := asynq.NewClient(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = client.Close() }() // 提交分组任务 groupKey := "email-batch-001" for i := 0; i < 5; i++ { payload := map[string]string{ "request_id": "group-" + string(rune(i)), "group": groupKey, } payloadBytes, err := sonic.Marshal(payload) require.NoError(t, err) task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes) _, err = client.Enqueue(task, asynq.Group(groupKey), ) require.NoError(t, err) } // 验证任务已按组提交 inspector := asynq.NewInspector(asynq.RedisClientOpt{ Addr: "localhost:6379", }) defer func() { _ = inspector.Close() }() info, err := inspector.GetQueueInfo(constants.QueueDefault) require.NoError(t, err) assert.GreaterOrEqual(t, info.Pending, 5) }