Files
junhong_cmp_fiber/tests/unit/queue_test.go
huang 984ccccc63 docs(constitution): 新增数据库设计原则(v2.4.0)
在项目宪章中新增第九条原则"数据库设计原则",明确禁止使用数据库外键约束和ORM关联标签。

主要变更:
- 新增原则IX:数据库设计原则(Database Design Principles)
- 强制要求:数据库表不得使用外键约束
- 强制要求:GORM模型不得使用ORM关联标签(foreignKey、hasMany等)
- 强制要求:表关系必须通过ID字段手动维护
- 强制要求:关联数据查询必须显式编写,避免ORM魔法
- 强制要求:时间字段由GORM处理,不使用数据库触发器

设计理念:
- 提升业务逻辑灵活性(无数据库约束限制)
- 优化高并发性能(无外键检查开销)
- 增强代码可读性(显式查询,无隐式预加载)
- 简化数据库架构和迁移流程
- 支持分布式和微服务场景

版本升级:2.3.0 → 2.4.0(MINOR)
2025-11-13 13:40:19 +08:00

556 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package unit
import (
"context"
"testing"
"time"
"github.com/bytedance/sonic"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// TestQueueClientEnqueue 测试任务入队
func TestQueueClientEnqueue(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
payload := map[string]string{
"request_id": "test-001",
"to": "test@example.com",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task)
require.NoError(t, err)
assert.NotEmpty(t, info.ID)
assert.Equal(t, constants.QueueDefault, info.Queue)
}
// TestQueueClientEnqueueWithOptions 测试带选项的任务入队
func TestQueueClientEnqueueWithOptions(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
tests := []struct {
name string
opts []asynq.Option
verify func(*testing.T, *asynq.TaskInfo)
}{
{
name: "Custom Queue",
opts: []asynq.Option{
asynq.Queue(constants.QueueCritical),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, constants.QueueCritical, info.Queue)
},
},
{
name: "Custom Retry",
opts: []asynq.Option{
asynq.MaxRetry(3),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, 3, info.MaxRetry)
},
},
{
name: "Custom Timeout",
opts: []asynq.Option{
asynq.Timeout(5 * time.Minute),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, 5*time.Minute, info.Timeout)
},
},
{
name: "Delayed Task",
opts: []asynq.Option{
asynq.ProcessIn(10 * time.Second),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.True(t, info.NextProcessAt.After(time.Now()))
},
},
{
name: "Combined Options",
opts: []asynq.Option{
asynq.Queue(constants.QueueCritical),
asynq.MaxRetry(5),
asynq.Timeout(10 * time.Minute),
},
verify: func(t *testing.T, info *asynq.TaskInfo) {
assert.Equal(t, constants.QueueCritical, info.Queue)
assert.Equal(t, 5, info.MaxRetry)
assert.Equal(t, 10*time.Minute, info.Timeout)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payload := map[string]string{
"request_id": "test-" + tt.name,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task, tt.opts...)
require.NoError(t, err)
tt.verify(t, info)
})
}
}
// TestQueueClientTaskUniqueness 测试任务唯一性
func TestQueueClientTaskUniqueness(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
payload := map[string]string{
"request_id": "unique-001",
"to": "test@example.com",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 第一次提交
task1 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info1, err := client.Enqueue(task1,
asynq.TaskID("unique-task-001"),
asynq.Unique(1*time.Hour),
)
require.NoError(t, err)
assert.NotNil(t, info1)
// 第二次提交(重复)
task2 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info2, err := client.Enqueue(task2,
asynq.TaskID("unique-task-001"),
asynq.Unique(1*time.Hour),
)
// 应该返回错误(任务已存在)
assert.Error(t, err)
assert.Nil(t, info2)
}
// TestQueuePriorityWeights 测试队列优先级权重
func TestQueuePriorityWeights(t *testing.T) {
queues := map[string]int{
constants.QueueCritical: 6,
constants.QueueDefault: 3,
constants.QueueLow: 1,
}
// 验证权重总和
totalWeight := 0
for _, weight := range queues {
totalWeight += weight
}
assert.Equal(t, 10, totalWeight)
// 验证权重比例
assert.Equal(t, 0.6, float64(queues[constants.QueueCritical])/float64(totalWeight))
assert.Equal(t, 0.3, float64(queues[constants.QueueDefault])/float64(totalWeight))
assert.Equal(t, 0.1, float64(queues[constants.QueueLow])/float64(totalWeight))
}
// TestTaskPayloadSizeLimit 测试任务载荷大小限制
func TestTaskPayloadSizeLimit(t *testing.T) {
tests := []struct {
name string
payloadSize int
shouldError bool
}{
{
name: "Small Payload (1KB)",
payloadSize: 1024,
shouldError: false,
},
{
name: "Medium Payload (100KB)",
payloadSize: 100 * 1024,
shouldError: false,
},
{
name: "Large Payload (1MB)",
payloadSize: 1024 * 1024,
shouldError: false,
},
// Redis 默认支持最大 512MB但实际应用中不建议超过 1MB
}
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 创建指定大小的载荷
largeData := make([]byte, tt.payloadSize)
for i := range largeData {
largeData[i] = byte(i % 256)
}
payload := map[string]interface{}{
"request_id": "size-test-001",
"data": largeData,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeDataSync, payloadBytes)
info, err := client.Enqueue(task)
if tt.shouldError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.NotNil(t, info)
}
})
}
}
// TestTaskScheduling 测试任务调度
func TestTaskScheduling(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
tests := []struct {
name string
scheduleOpt asynq.Option
expectedTime time.Time
}{
{
name: "Process In 5 Seconds",
scheduleOpt: asynq.ProcessIn(5 * time.Second),
expectedTime: time.Now().Add(5 * time.Second),
},
{
name: "Process At Specific Time",
scheduleOpt: asynq.ProcessAt(time.Now().Add(10 * time.Second)),
expectedTime: time.Now().Add(10 * time.Second),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payload := map[string]string{
"request_id": "schedule-test-" + tt.name,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task, tt.scheduleOpt)
require.NoError(t, err)
assert.True(t, info.NextProcessAt.After(time.Now()))
// 允许 1 秒的误差
assert.WithinDuration(t, tt.expectedTime, info.NextProcessAt, 1*time.Second)
})
}
}
// TestQueueInspectorStats 测试队列统计
func TestQueueInspectorStats(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
// 提交一些任务
for i := 0; i < 5; i++ {
payload := map[string]string{
"request_id": "stats-test-" + string(rune(i)),
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task)
require.NoError(t, err)
}
// 使用 Inspector 查询统计
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer inspector.Close()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, 5, info.Pending)
assert.Equal(t, 0, info.Active)
assert.Equal(t, 0, info.Completed)
}
// TestTaskRetention 测试任务保留策略
func TestTaskRetention(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
payload := map[string]string{
"request_id": "retention-test-001",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 提交任务并设置保留时间
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task,
asynq.Retention(24*time.Hour), // 保留 24 小时
)
require.NoError(t, err)
assert.NotNil(t, info)
}
// TestQueueDraining 测试队列暂停和恢复
func TestQueueDraining(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer inspector.Close()
// 暂停队列
err := inspector.PauseQueue(constants.QueueDefault)
require.NoError(t, err)
// 检查队列是否已暂停
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.True(t, info.Paused)
// 恢复队列
err = inspector.UnpauseQueue(constants.QueueDefault)
require.NoError(t, err)
// 检查队列是否已恢复
info, err = inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.False(t, info.Paused)
}
// TestTaskCancellation 测试任务取消
func TestTaskCancellation(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
payload := map[string]string{
"request_id": "cancel-test-001",
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
// 提交任务
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
info, err := client.Enqueue(task)
require.NoError(t, err)
// 取消任务
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer inspector.Close()
err = inspector.DeleteTask(constants.QueueDefault, info.ID)
require.NoError(t, err)
// 验证任务已删除
queueInfo, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, 0, queueInfo.Pending)
}
// TestBatchTaskEnqueue 测试批量任务入队
func TestBatchTaskEnqueue(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
// 批量创建任务
batchSize := 100
for i := 0; i < batchSize; i++ {
payload := map[string]string{
"request_id": "batch-" + string(rune(i)),
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task)
require.NoError(t, err)
}
// 验证任务数量
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer inspector.Close()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.Equal(t, batchSize, info.Pending)
}
// TestTaskGrouping 测试任务分组
func TestTaskGrouping(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer redisClient.Close()
ctx := context.Background()
redisClient.FlushDB(ctx)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
// 提交分组任务
groupKey := "email-batch-001"
for i := 0; i < 5; i++ {
payload := map[string]string{
"request_id": "group-" + string(rune(i)),
"group": groupKey,
}
payloadBytes, err := sonic.Marshal(payload)
require.NoError(t, err)
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
_, err = client.Enqueue(task,
asynq.Group(groupKey),
)
require.NoError(t, err)
}
// 验证任务已按组提交
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer inspector.Close()
info, err := inspector.GetQueueInfo(constants.QueueDefault)
require.NoError(t, err)
assert.GreaterOrEqual(t, info.Pending, 5)
}