主要功能: - 实现完整的 RBAC 权限系统(账号、角色、权限的多对多关联) - 基于 owner_id + shop_id 的自动数据权限过滤 - 使用 PostgreSQL WITH RECURSIVE 查询下级账号 - Redis 缓存优化下级账号查询性能(30分钟过期) - 支持多租户数据隔离和层级权限管理 技术实现: - 新增 Account、Role、Permission 模型及关联关系表 - 实现 GORM Scopes 自动应用数据权限过滤 - 添加数据库迁移脚本(000002_rbac_data_permission、000003_add_owner_id_shop_id) - 完善错误码定义(1010-1027 为 RBAC 相关错误) - 重构 main.go 采用函数拆分提高可读性 测试覆盖: - 添加 Account、Role、Permission 的集成测试 - 添加数据权限过滤的单元测试和集成测试 - 添加下级账号查询和缓存的单元测试 - 添加 API 回归测试确保向后兼容 文档更新: - 更新 README.md 添加 RBAC 功能说明 - 更新 CLAUDE.md 添加技术栈和开发原则 - 添加 docs/004-rbac-data-permission/ 功能总结和使用指南 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
556 lines
13 KiB
Go
556 lines
13 KiB
Go
package unit
|
||
|
||
import (
|
||
"context"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/bytedance/sonic"
|
||
"github.com/hibiken/asynq"
|
||
"github.com/redis/go-redis/v9"
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
)
|
||
|
||
// TestQueueClientEnqueue 测试任务入队
|
||
func TestQueueClientEnqueue(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
payload := map[string]string{
|
||
"request_id": "test-001",
|
||
"to": "test@example.com",
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info, err := client.Enqueue(task)
|
||
|
||
require.NoError(t, err)
|
||
assert.NotEmpty(t, info.ID)
|
||
assert.Equal(t, constants.QueueDefault, info.Queue)
|
||
}
|
||
|
||
// TestQueueClientEnqueueWithOptions 测试带选项的任务入队
|
||
func TestQueueClientEnqueueWithOptions(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
tests := []struct {
|
||
name string
|
||
opts []asynq.Option
|
||
verify func(*testing.T, *asynq.TaskInfo)
|
||
}{
|
||
{
|
||
name: "Custom Queue",
|
||
opts: []asynq.Option{
|
||
asynq.Queue(constants.QueueCritical),
|
||
},
|
||
verify: func(t *testing.T, info *asynq.TaskInfo) {
|
||
assert.Equal(t, constants.QueueCritical, info.Queue)
|
||
},
|
||
},
|
||
{
|
||
name: "Custom Retry",
|
||
opts: []asynq.Option{
|
||
asynq.MaxRetry(3),
|
||
},
|
||
verify: func(t *testing.T, info *asynq.TaskInfo) {
|
||
assert.Equal(t, 3, info.MaxRetry)
|
||
},
|
||
},
|
||
{
|
||
name: "Custom Timeout",
|
||
opts: []asynq.Option{
|
||
asynq.Timeout(5 * time.Minute),
|
||
},
|
||
verify: func(t *testing.T, info *asynq.TaskInfo) {
|
||
assert.Equal(t, 5*time.Minute, info.Timeout)
|
||
},
|
||
},
|
||
{
|
||
name: "Delayed Task",
|
||
opts: []asynq.Option{
|
||
asynq.ProcessIn(10 * time.Second),
|
||
},
|
||
verify: func(t *testing.T, info *asynq.TaskInfo) {
|
||
assert.True(t, info.NextProcessAt.After(time.Now()))
|
||
},
|
||
},
|
||
{
|
||
name: "Combined Options",
|
||
opts: []asynq.Option{
|
||
asynq.Queue(constants.QueueCritical),
|
||
asynq.MaxRetry(5),
|
||
asynq.Timeout(10 * time.Minute),
|
||
},
|
||
verify: func(t *testing.T, info *asynq.TaskInfo) {
|
||
assert.Equal(t, constants.QueueCritical, info.Queue)
|
||
assert.Equal(t, 5, info.MaxRetry)
|
||
assert.Equal(t, 10*time.Minute, info.Timeout)
|
||
},
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
payload := map[string]string{
|
||
"request_id": "test-" + tt.name,
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info, err := client.Enqueue(task, tt.opts...)
|
||
|
||
require.NoError(t, err)
|
||
tt.verify(t, info)
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestQueueClientTaskUniqueness 测试任务唯一性
|
||
func TestQueueClientTaskUniqueness(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
payload := map[string]string{
|
||
"request_id": "unique-001",
|
||
"to": "test@example.com",
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
// 第一次提交
|
||
task1 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info1, err := client.Enqueue(task1,
|
||
asynq.TaskID("unique-task-001"),
|
||
asynq.Unique(1*time.Hour),
|
||
)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, info1)
|
||
|
||
// 第二次提交(重复)
|
||
task2 := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info2, err := client.Enqueue(task2,
|
||
asynq.TaskID("unique-task-001"),
|
||
asynq.Unique(1*time.Hour),
|
||
)
|
||
|
||
// 应该返回错误(任务已存在)
|
||
assert.Error(t, err)
|
||
assert.Nil(t, info2)
|
||
}
|
||
|
||
// TestQueuePriorityWeights 测试队列优先级权重
|
||
func TestQueuePriorityWeights(t *testing.T) {
|
||
queues := map[string]int{
|
||
constants.QueueCritical: 6,
|
||
constants.QueueDefault: 3,
|
||
constants.QueueLow: 1,
|
||
}
|
||
|
||
// 验证权重总和
|
||
totalWeight := 0
|
||
for _, weight := range queues {
|
||
totalWeight += weight
|
||
}
|
||
assert.Equal(t, 10, totalWeight)
|
||
|
||
// 验证权重比例
|
||
assert.Equal(t, 0.6, float64(queues[constants.QueueCritical])/float64(totalWeight))
|
||
assert.Equal(t, 0.3, float64(queues[constants.QueueDefault])/float64(totalWeight))
|
||
assert.Equal(t, 0.1, float64(queues[constants.QueueLow])/float64(totalWeight))
|
||
}
|
||
|
||
// TestTaskPayloadSizeLimit 测试任务载荷大小限制
|
||
func TestTaskPayloadSizeLimit(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
payloadSize int
|
||
shouldError bool
|
||
}{
|
||
{
|
||
name: "Small Payload (1KB)",
|
||
payloadSize: 1024,
|
||
shouldError: false,
|
||
},
|
||
{
|
||
name: "Medium Payload (100KB)",
|
||
payloadSize: 100 * 1024,
|
||
shouldError: false,
|
||
},
|
||
{
|
||
name: "Large Payload (1MB)",
|
||
payloadSize: 1024 * 1024,
|
||
shouldError: false,
|
||
},
|
||
// Redis 默认支持最大 512MB,但实际应用中不建议超过 1MB
|
||
}
|
||
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
// 创建指定大小的载荷
|
||
largeData := make([]byte, tt.payloadSize)
|
||
for i := range largeData {
|
||
largeData[i] = byte(i % 256)
|
||
}
|
||
|
||
payload := map[string]interface{}{
|
||
"request_id": "size-test-001",
|
||
"data": largeData,
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeDataSync, payloadBytes)
|
||
info, err := client.Enqueue(task)
|
||
|
||
if tt.shouldError {
|
||
assert.Error(t, err)
|
||
} else {
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, info)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestTaskScheduling 测试任务调度
|
||
func TestTaskScheduling(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
tests := []struct {
|
||
name string
|
||
scheduleOpt asynq.Option
|
||
expectedTime time.Time
|
||
}{
|
||
{
|
||
name: "Process In 5 Seconds",
|
||
scheduleOpt: asynq.ProcessIn(5 * time.Second),
|
||
expectedTime: time.Now().Add(5 * time.Second),
|
||
},
|
||
{
|
||
name: "Process At Specific Time",
|
||
scheduleOpt: asynq.ProcessAt(time.Now().Add(10 * time.Second)),
|
||
expectedTime: time.Now().Add(10 * time.Second),
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
payload := map[string]string{
|
||
"request_id": "schedule-test-" + tt.name,
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info, err := client.Enqueue(task, tt.scheduleOpt)
|
||
|
||
require.NoError(t, err)
|
||
assert.True(t, info.NextProcessAt.After(time.Now()))
|
||
// 允许 1 秒的误差
|
||
assert.WithinDuration(t, tt.expectedTime, info.NextProcessAt, 1*time.Second)
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestQueueInspectorStats 测试队列统计
|
||
func TestQueueInspectorStats(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
// 提交一些任务
|
||
for i := 0; i < 5; i++ {
|
||
payload := map[string]string{
|
||
"request_id": "stats-test-" + string(rune(i)),
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
_, err = client.Enqueue(task)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// 使用 Inspector 查询统计
|
||
inspector := asynq.NewInspector(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = inspector.Close() }()
|
||
|
||
info, err := inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
|
||
assert.Equal(t, 5, info.Pending)
|
||
assert.Equal(t, 0, info.Active)
|
||
assert.Equal(t, 0, info.Completed)
|
||
}
|
||
|
||
// TestTaskRetention 测试任务保留策略
|
||
func TestTaskRetention(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
payload := map[string]string{
|
||
"request_id": "retention-test-001",
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
// 提交任务并设置保留时间
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info, err := client.Enqueue(task,
|
||
asynq.Retention(24*time.Hour), // 保留 24 小时
|
||
)
|
||
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, info)
|
||
}
|
||
|
||
// TestQueueDraining 测试队列暂停和恢复
|
||
func TestQueueDraining(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
inspector := asynq.NewInspector(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = inspector.Close() }()
|
||
|
||
// 暂停队列
|
||
err := inspector.PauseQueue(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
|
||
// 检查队列是否已暂停
|
||
info, err := inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
assert.True(t, info.Paused)
|
||
|
||
// 恢复队列
|
||
err = inspector.UnpauseQueue(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
|
||
// 检查队列是否已恢复
|
||
info, err = inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
assert.False(t, info.Paused)
|
||
}
|
||
|
||
// TestTaskCancellation 测试任务取消
|
||
func TestTaskCancellation(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
payload := map[string]string{
|
||
"request_id": "cancel-test-001",
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
// 提交任务
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
info, err := client.Enqueue(task)
|
||
require.NoError(t, err)
|
||
|
||
// 取消任务
|
||
inspector := asynq.NewInspector(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = inspector.Close() }()
|
||
|
||
err = inspector.DeleteTask(constants.QueueDefault, info.ID)
|
||
require.NoError(t, err)
|
||
|
||
// 验证任务已删除
|
||
queueInfo, err := inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
assert.Equal(t, 0, queueInfo.Pending)
|
||
}
|
||
|
||
// TestBatchTaskEnqueue 测试批量任务入队
|
||
func TestBatchTaskEnqueue(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
// 批量创建任务
|
||
batchSize := 100
|
||
for i := 0; i < batchSize; i++ {
|
||
payload := map[string]string{
|
||
"request_id": "batch-" + string(rune(i)),
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
_, err = client.Enqueue(task)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// 验证任务数量
|
||
inspector := asynq.NewInspector(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = inspector.Close() }()
|
||
|
||
info, err := inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
assert.Equal(t, batchSize, info.Pending)
|
||
}
|
||
|
||
// TestTaskGrouping 测试任务分组
|
||
func TestTaskGrouping(t *testing.T) {
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = redisClient.Close() }()
|
||
|
||
ctx := context.Background()
|
||
redisClient.FlushDB(ctx)
|
||
|
||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = client.Close() }()
|
||
|
||
// 提交分组任务
|
||
groupKey := "email-batch-001"
|
||
for i := 0; i < 5; i++ {
|
||
payload := map[string]string{
|
||
"request_id": "group-" + string(rune(i)),
|
||
"group": groupKey,
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
require.NoError(t, err)
|
||
|
||
task := asynq.NewTask(constants.TaskTypeEmailSend, payloadBytes)
|
||
_, err = client.Enqueue(task,
|
||
asynq.Group(groupKey),
|
||
)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// 验证任务已按组提交
|
||
inspector := asynq.NewInspector(asynq.RedisClientOpt{
|
||
Addr: "localhost:6379",
|
||
})
|
||
defer func() { _ = inspector.Close() }()
|
||
|
||
info, err := inspector.GetQueueInfo(constants.QueueDefault)
|
||
require.NoError(t, err)
|
||
assert.GreaterOrEqual(t, info.Pending, 5)
|
||
}
|