Files
junhong_cmp_fiber/tests/unit/queue_test.go
huang b68e7ec013
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 15s
优化测试数据库连接管理
- 创建全局单例连接池,性能提升 6-7 倍
- 实现 NewTestTransaction/GetTestRedis/CleanTestRedisKeys
- 移除旧的 SetupTestDB/TeardownTestDB API
- 迁移所有测试文件到新方案(47 个文件)
- 添加测试连接管理规范文档
- 更新 AGENTS.md 和 README.md

性能对比:
- 旧方案:~71 秒(204 测试)
- 新方案:~10.5 秒(首次初始化 + 后续复用)
- 内存占用降低约 80%
- 网络连接数从 204 降至 1
2026-01-22 14:38:43 +08:00

556 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package unit
import (
"context"
"testing"
"time"
"github.com/bytedance/sonic"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// TestQueueClientEnqueue 测试任务入队
func TestQueueClientEnqueue(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
payload := map[string]string{
"request_id": "test-001",
"to": "test@example.com",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task)
require.NoError(t, err)
assert.NotEmpty(t, info.ID)
assert.Equal(t, constants.QueueDefault, info.Queue)
}
// TestQueueClientEnqueueWithOptions 测试带选项的任务入队
func TestQueueClientEnqueueWithOptions(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
tests := []struct {
name string
opts []asynq.Option
verify func(*testing.T, *asynq.TaskInfo)
}{
{
name: "Custom Queue",
opts: []asynq.Option{
asynq.Queue(constants.QueueCritical),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, constants.QueueCritical, info.Queue)
},
},
{
name: "Custom Retry",
opts: []asynq.Option{
asynq.MaxRetry(3),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, 3, info.MaxRetry)
},
},
{
name: "Custom Timeout",
opts: []asynq.Option{
asynq.Timeout(5 * time.Minute),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, 5*time.Minute, info.Timeout)
},
},
{
name: "Delayed Task",
opts: []asynq.Option{
asynq.ProcessIn(10 * time.Second),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.True(t, info.NextProcessAt.After(time.Now()))
},
},
{
name: "Combined Options",
opts: []asynq.Option{
asynq.Queue(constants.QueueCritical),
asynq.MaxRetry(5),
asynq.Timeout(10 * time.Minute),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, constants.QueueCritical, info.Queue)
assert.Equal(t, 5, info.MaxRetry)
assert.Equal(t, 10*time.Minute, info.Timeout)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payload := map[string]string{
"request_id": "test-" + tt.name,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task, tt.opts...)
require.NoError(t, err)
tt.verify(t, info)
})
}
}
// TestQueueClientTaskUniqueness 测试任务唯一性
func TestQueueClientTaskUniqueness(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
payload := map[string]string{
"request_id": "unique-001",
"to": "test@example.com",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 第一次提交
task1 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info1, err := client.Enqueue(task1,
asynq.TaskID("unique-task-001"),
asynq.Unique(1*time.Hour),
)
require.NoError(t, err)
assert.NotNil(t, info1)
// 第二次提交(重复)
task2 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info2, err := client.Enqueue(task2,
asynq.TaskID("unique-task-001"),
asynq.Unique(1*time.Hour),
)
// 应该返回错误(任务已存在)
assert.Error(t, err)
assert.Nil(t, info2)
}
// TestQueuePriorityWeights 测试队列优先级权重
func TestQueuePriorityWeights(t *testing.T) {
queues := map[string]int{
constants.QueueCritical: 6,
constants.QueueDefault: 3,
constants.QueueLow: 1,
}
// 验证权重总和
totalWeight := 0
for _, weight := range queues {
totalWeight += weight
}
assert.Equal(t, 10, totalWeight)
// 验证权重比例
assert.Equal(t, 0.6, float64(queues[constants.QueueCritical])/float64(totalWeight))
assert.Equal(t, 0.3, float64(queues[constants.QueueDefault])/float64(totalWeight))
assert.Equal(t, 0.1, float64(queues[constants.QueueLow])/float64(totalWeight))
}
// TestTaskPayloadSizeLimit 测试任务载荷大小限制
func TestTaskPayloadSizeLimit(t *testing.T) {
tests := []struct {
name string
payloadSize int
shouldError bool
}{
{
name: "Small Payload (1KB)",
payloadSize: 1024,
shouldError: false,
},
{
name: "Medium Payload (100KB)",
payloadSize: 100 * 1024,
shouldError: false,
},
{
name: "Large Payload (1MB)",
payloadSize: 1024 * 1024,
shouldError: false,
},
// Redis 默认支持最大 512MB但实际应用中不建议超过 1MB
}
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 创建指定大小的载荷
largeData := make([]byte, tt.payloadSize)
for i := range largeData {
largeData[i] = byte(i % 256)
}
payload := map[string]interface{}{
"request_id": "size-test-001",
"data": largeData,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeDataSync, payloadBytes)
info, err := client.Enqueue(task)
if tt.shouldError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.NotNil(t, info)
}
})
}
}
// TestTaskScheduling 测试任务调度
func TestTaskScheduling(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
tests := []struct {
name string
scheduleOpt asynq.Option
expectedTime time.Time
}{
{
name: "Process In 5 Seconds",
scheduleOpt: asynq.ProcessIn(5 * time.Second),
expectedTime: time.Now().Add(5 * time.Second),
},
{
name: "Process At Specific Time",
scheduleOpt: asynq.ProcessAt(time.Now().Add(10 * time.Second)),
expectedTime: time.Now().Add(10 * time.Second),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payload := map[string]string{
"request_id": "schedule-test-" + tt.name,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task, tt.scheduleOpt)
require.NoError(t, err)
assert.True(t, info.NextProcessAt.After(time.Now()))
// 允许 1 秒的误差
assert.WithinDuration(t, tt.expectedTime, info.NextProcessAt, 1*time.Second)
})
}
}
// TestQueueInspectorStats 测试队列统计
func TestQueueInspectorStats(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
// 提交一些任务
for i := 0; i < 5; i++ {
payload := map[string]string{
"request_id": "stats-test-" + string(rune(i)),
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task)
require.NoError(t, err)
}
// 使用 Inspector 查询统计
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = inspector.Close() }()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, 5, info.Pending)
assert.Equal(t, 0, info.Active)
assert.Equal(t, 0, info.Completed)
}
// TestTaskRetention 测试任务保留策略
func TestTaskRetention(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
payload := map[string]string{
"request_id": "retention-test-001",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 提交任务并设置保留时间
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task,
asynq.Retention(24*time.Hour), // 保留 24 小时
)
require.NoError(t, err)
assert.NotNil(t, info)
}
// TestQueueDraining 测试队列暂停和恢复
func TestQueueDraining(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = inspector.Close() }()
// 暂停队列
err := inspector.PauseQueue(constants.QueueDefault)
require.NoError(t, err)
// 检查队列是否已暂停
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.True(t, info.Paused)
// 恢复队列
err = inspector.UnpauseQueue(constants.QueueDefault)
require.NoError(t, err)
// 检查队列是否已恢复
info, err = inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.False(t, info.Paused)
}
// TestTaskCancellation 测试任务取消
func TestTaskCancellation(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
payload := map[string]string{
"request_id": "cancel-test-001",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 提交任务
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task)
require.NoError(t, err)
// 取消任务
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = inspector.Close() }()
err = inspector.DeleteTask(constants.QueueDefault, info.ID)
require.NoError(t, err)
// 验证任务已删除
queueInfo, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, 0, queueInfo.Pending)
}
// TestBatchTaskEnqueue 测试批量任务入队
func TestBatchTaskEnqueue(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
// 批量创建任务
batchSize := 100
for i := 0; i < batchSize; i++ {
payload := map[string]string{
"request_id": "batch-" + string(rune(i)),
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task)
require.NoError(t, err)
}
// 验证任务数量
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = inspector.Close() }()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, batchSize, info.Pending)
}
// TestTaskGrouping 测试任务分组
func TestTaskGrouping(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer func() { _ = rdb.Close() }()
ctx := context.Background()
rdb.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = client.Close() }()
// 提交分组任务
groupKey := "email-batch-001"
for i := 0; i < 5; i++ {
payload := map[string]string{
"request_id": "group-" + string(rune(i)),
"group": groupKey,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task,
asynq.Group(groupKey),
)
require.NoError(t, err)
}
// 验证任务已按组提交
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer func() { _ = inspector.Close() }()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.GreaterOrEqual(t, info.Pending, 5)
}