在项目宪章中新增第九条原则"数据库设计原则",明确禁止使用数据库外键约束和ORM关联标签。 主要变更: - 新增原则IX:数据库设计原则(Database Design Principles) - 强制要求:数据库表不得使用外键约束 - 强制要求:GORM模型不得使用ORM关联标签(foreignKey、hasMany等) - 强制要求:表关系必须通过ID字段手动维护 - 强制要求:关联数据查询必须显式编写,避免ORM魔法 - 强制要求:时间字段由GORM处理,不使用数据库触发器 设计理念: - 提升业务逻辑灵活性(无数据库约束限制) - 优化高并发性能(无外键检查开销) - 增强代码可读性(显式查询,无隐式预加载) - 简化数据库架构和迁移流程 - 支持分布式和微服务场景 版本升级:2.3.0 → 2.4.0(MINOR)
25 KiB
25 KiB
Research: 数据持久化与异步任务处理集成
Feature: 002-gorm-postgres-asynq
Date: 2025-11-12
Purpose: 记录技术选型决策、最佳实践和架构考量
概述
本文档记录了 GORM + PostgreSQL + Asynq 集成的技术研究成果,包括技术选型理由、配置建议、最佳实践和常见陷阱。
1. GORM 与 PostgreSQL 集成
决策:选择 GORM 作为 ORM 框架
理由:
- 官方支持:GORM 是 Go 生态系统中最流行的 ORM,社区活跃,文档完善
- PostgreSQL 原生支持:提供专门的 PostgreSQL 驱动和方言
- 功能完整:支持复杂查询、关联关系、事务、钩子、软删除等
- 性能优秀:支持预编译语句、批量操作、连接池管理
- 符合 Constitution:项目技术栈要求使用 GORM
替代方案:
- sqlx:更轻量,但功能不够完整,需要手写更多 SQL
- ent:Facebook 开发,功能强大,但学习曲线陡峭,且不符合项目技术栈要求
GORM 最佳实践
1.1 连接初始化
// pkg/database/postgres.go
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func InitPostgres(cfg *config.DatabaseConfig, log *zap.Logger) (*gorm.DB, error) {
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode,
)
// GORM 配置
gormConfig := &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent), // 使用 Zap 替代 GORM 日志
NamingStrategy: schema.NamingStrategy{
TablePrefix: "tb_", // 表名前缀
SingularTable: true, // 使用单数表名
},
PrepareStmt: true, // 启用预编译语句缓存
}
db, err := gorm.Open(postgres.Open(dsn), gormConfig)
if err != nil {
return nil, fmt.Errorf("连接 PostgreSQL 失败: %w", err)
}
// 获取底层 sql.DB 进行连接池配置
sqlDB, err := db.DB()
if err != nil {
return nil, fmt.Errorf("获取 sql.DB 失败: %w", err)
}
// 连接池配置(参考 Constitution 性能要求)
sqlDB.SetMaxOpenConns(cfg.MaxOpenConns) // 最大连接数:25
sqlDB.SetMaxIdleConns(cfg.MaxIdleConns) // 最大空闲连接:10
sqlDB.SetConnMaxLifetime(cfg.ConnMaxLifetime) // 连接最大生命周期:5m
// 验证连接
if err := sqlDB.Ping(); err != nil {
return nil, fmt.Errorf("PostgreSQL 连接验证失败: %w", err)
}
log.Info("PostgreSQL 连接成功",
zap.String("host", cfg.Host),
zap.Int("port", cfg.Port),
zap.String("database", cfg.DBName))
return db, nil
}
1.2 连接池配置建议
| 参数 | 推荐值 | 理由 |
|---|---|---|
| MaxOpenConns | 25 | 平衡性能和资源,避免 PostgreSQL 连接耗尽 |
| MaxIdleConns | 10 | 保持足够的空闲连接以应对突发流量 |
| ConnMaxLifetime | 5m | 定期回收连接,避免长连接问题 |
计算公式:
MaxOpenConns = (可用内存 / 每连接内存) * 安全系数
每连接内存 ≈ 10MB(PostgreSQL 典型值)
安全系数 = 0.7(为其他进程预留资源)
1.3 模型定义规范
// internal/model/user.go
type User struct {
ID uint `gorm:"primarykey" json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
Username string `gorm:"uniqueIndex;not null;size:50" json:"username"`
Email string `gorm:"uniqueIndex;not null;size:100" json:"email"`
Status string `gorm:"not null;size:20;default:'active'" json:"status"`
// 关联关系示例(如果需要)
// Orders []Order `gorm:"foreignKey:UserID" json:"orders,omitempty"`
}
// TableName 指定表名(如果不使用默认命名)
func (User) TableName() string {
return "tb_user" // 遵循 NamingStrategy 的 TablePrefix
}
命名规范:
- 字段名使用 PascalCase(Go 约定)
- 数据库列名自动转换为 snake_case
- 表名使用
tb_前缀(可配置) - JSON tag 使用 snake_case
1.4 事务处理
// internal/store/postgres/transaction.go
func (s *Store) Transaction(ctx context.Context, fn func(*Store) error) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 创建事务内的 Store 实例
txStore := &Store{db: tx, logger: s.logger}
return fn(txStore)
})
}
// 使用示例
err := store.Transaction(ctx, func(tx *Store) error {
if err := tx.User.Create(ctx, user); err != nil {
return err // 自动回滚
}
if err := tx.Order.Create(ctx, order); err != nil {
return err // 自动回滚
}
return nil // 自动提交
})
事务最佳实践:
- 使用
context.Context传递超时和取消信号 - 事务内操作尽可能快(< 50ms),避免长事务锁表
- 事务失败自动回滚,无需手动处理
- 避免事务嵌套(GORM 使用 SavePoint 处理嵌套事务)
2. 数据库迁移:golang-migrate
决策:使用 golang-migrate 而非 GORM AutoMigrate
理由:
- 版本控制:迁移文件版本化,可追溯数据库 schema 变更历史
- 可回滚:每个迁移包含 up/down 脚本,支持安全回滚
- 生产安全:明确的 SQL 语句,避免 AutoMigrate 的意外变更
- 团队协作:迁移文件可 code review,减少数据库变更风险
- 符合 Constitution:项目规范要求使用外部迁移工具
GORM AutoMigrate 的问题:
- 无法回滚
- 无法删除列(只能添加和修改)
- 不支持复杂的 schema 变更(如重命名列)
- 生产环境风险高
golang-migrate 使用指南
2.1 安装
# macOS
brew install golang-migrate
# Linux
curl -L https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz | tar xvz
sudo mv migrate /usr/local/bin/
# Go install
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
2.2 创建迁移文件
# 创建新迁移
migrate create -ext sql -dir migrations -seq init_schema
# 生成文件:
# migrations/000001_init_schema.up.sql
# migrations/000001_init_schema.down.sql
2.3 迁移文件示例
-- migrations/000001_init_schema.up.sql
CREATE TABLE IF NOT EXISTS tb_user (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
status VARCHAR(20) NOT NULL DEFAULT 'active'
);
CREATE INDEX idx_user_deleted_at ON tb_user(deleted_at);
CREATE INDEX idx_user_status ON tb_user(status);
-- migrations/000001_init_schema.down.sql
DROP TABLE IF EXISTS tb_user;
2.4 执行迁移
# 向上迁移(应用所有未执行的迁移)
migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" up
# 回滚最后一次迁移
migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" down 1
# 迁移到指定版本
migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" goto 3
# 强制设置版本(修复脏迁移)
migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" force 2
2.5 迁移脚本封装
#!/bin/bash
# scripts/migrate.sh
set -e
DB_USER=${DB_USER:-"postgres"}
DB_PASSWORD=${DB_PASSWORD:-"password"}
DB_HOST=${DB_HOST:-"localhost"}
DB_PORT=${DB_PORT:-"5432"}
DB_NAME=${DB_NAME:-"junhong_cmp"}
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}?sslmode=disable"
case "$1" in
up)
migrate -path migrations -database "$DATABASE_URL" up
;;
down)
migrate -path migrations -database "$DATABASE_URL" down ${2:-1}
;;
create)
migrate create -ext sql -dir migrations -seq "$2"
;;
version)
migrate -path migrations -database "$DATABASE_URL" version
;;
*)
echo "Usage: $0 {up|down [n]|create <name>|version}"
exit 1
esac
3. Asynq 任务队列
决策:选择 Asynq 作为异步任务队列
理由:
- Redis 原生支持:基于 Redis,无需额外中间件
- 功能完整:支持任务重试、优先级、定时任务、唯一性约束
- 高性能:支持并发处理,可配置 worker 数量
- 可观测性:提供 Web UI 监控面板(asynqmon)
- 符合 Constitution:项目技术栈要求使用 Asynq
替代方案:
- Machinery:功能类似,但社区活跃度不如 Asynq
- RabbitMQ + amqp091-go:更重量级,需要额外部署 RabbitMQ
- Kafka:适合大规模流处理,对本项目过于复杂
Asynq 架构设计
3.1 Client(任务提交)
// pkg/queue/client.go
import (
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
type Client struct {
client *asynq.Client
logger *zap.Logger
}
func NewClient(rdb *redis.Client, logger *zap.Logger) *Client {
return &Client{
client: asynq.NewClient(asynq.RedisClientOpt{Addr: rdb.Options().Addr}),
logger: logger,
}
}
func (c *Client) EnqueueTask(ctx context.Context, taskType string, payload []byte, opts ...asynq.Option) error {
task := asynq.NewTask(taskType, payload, opts...)
info, err := c.client.EnqueueContext(ctx, task)
if err != nil {
c.logger.Error("任务入队失败",
zap.String("task_type", taskType),
zap.Error(err))
return err
}
c.logger.Info("任务入队成功",
zap.String("task_id", info.ID),
zap.String("queue", info.Queue))
return nil
}
3.2 Server(任务处理)
// pkg/queue/server.go
func NewServer(rdb *redis.Client, cfg *config.QueueConfig, logger *zap.Logger) *asynq.Server {
return asynq.NewServer(
asynq.RedisClientOpt{Addr: rdb.Options().Addr},
asynq.Config{
Concurrency: cfg.Concurrency, // 并发数(默认 10)
Queues: map[string]int{
"critical": 6, // 权重:60%
"default": 3, // 权重:30%
"low": 1, // 权重:10%
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
logger.Error("任务执行失败",
zap.String("task_type", task.Type()),
zap.Error(err))
}),
Logger: &AsynqLogger{logger: logger}, // 自定义日志适配器
},
)
}
// cmd/worker/main.go
func main() {
// ... 初始化配置、日志、Redis
srv := queue.NewServer(rdb, cfg.Queue, logger)
mux := asynq.NewServeMux()
// 注册任务处理器
mux.HandleFunc(constants.TaskTypeEmailSend, task.HandleEmailSend)
mux.HandleFunc(constants.TaskTypeDataSync, task.HandleDataSync)
if err := srv.Run(mux); err != nil {
logger.Fatal("Worker 启动失败", zap.Error(err))
}
}
3.3 任务处理器(Handler)
// internal/task/email.go
func HandleEmailSend(ctx context.Context, t *asynq.Task) error {
var payload EmailPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务参数失败: %w", err)
}
// 幂等性检查(使用 Redis 或数据库)
key := constants.RedisTaskLockKey(payload.RequestID)
if exists, _ := rdb.Exists(ctx, key).Result(); exists > 0 {
logger.Info("任务已处理,跳过",
zap.String("request_id", payload.RequestID))
return nil // 返回 nil 表示成功,避免重试
}
// 执行任务
if err := sendEmail(ctx, payload); err != nil {
return fmt.Errorf("发送邮件失败: %w", err) // 返回错误触发重试
}
// 标记任务已完成(设置过期时间,避免内存泄漏)
rdb.SetEx(ctx, key, "1", 24*time.Hour)
logger.Info("邮件发送成功",
zap.String("to", payload.To),
zap.String("request_id", payload.RequestID))
return nil
}
Asynq 配置建议
3.4 重试策略
// 默认重试策略:指数退避
task := asynq.NewTask(
constants.TaskTypeDataSync,
payload,
asynq.MaxRetry(5), // 最大重试 5 次
asynq.Timeout(10*time.Minute), // 任务超时 10 分钟
asynq.Queue("default"), // 队列名称
asynq.Retention(24*time.Hour), // 保留成功任务 24 小时
)
// 自定义重试延迟(指数退避:1s, 2s, 4s, 8s, 16s)
asynq.RetryDelayFunc(func(n int, e error, t *asynq.Task) time.Duration {
return time.Duration(1<<uint(n)) * time.Second
})
3.5 并发配置
| 场景 | 并发数 | 理由 |
|---|---|---|
| CPU 密集型任务 | CPU 核心数 | 避免上下文切换开销 |
| I/O 密集型任务 | CPU 核心数 × 2 | 充分利用等待时间 |
| 混合任务 | 10(默认) | 平衡性能和资源 |
水平扩展:
- 启动多个 Worker 进程(不同机器或容器)
- 所有 Worker 连接同一个 Redis
- Asynq 自动负载均衡
3.6 监控与调试
# 安装 asynqmon(Web UI)
go install github.com/hibiken/asynqmon@latest
# 启动监控面板
asynqmon --redis-addr=localhost:6379
# 访问 http://localhost:8080
# 查看任务状态、队列统计、失败任务、重试历史
4. 幂等性设计
4.1 为什么需要幂等性?
场景:
- 系统重启时,Asynq 自动重新排队未完成的任务
- 任务执行失败后自动重试
- 网络抖动导致任务重复提交
风险:
- 重复发送邮件/短信
- 重复扣款/充值
- 重复创建订单
4.2 幂等性实现模式
模式 1:唯一键去重(推荐)
func HandleOrderCreate(ctx context.Context, t *asynq.Task) error {
var payload OrderPayload
json.Unmarshal(t.Payload(), &payload)
// 使用业务唯一键(如订单号)去重
key := constants.RedisTaskLockKey(payload.OrderID)
// SetNX:仅当 key 不存在时设置
ok, err := rdb.SetNX(ctx, key, "1", 24*time.Hour).Result()
if err != nil {
return fmt.Errorf("Redis 操作失败: %w", err)
}
if !ok {
logger.Info("订单已创建,跳过",
zap.String("order_id", payload.OrderID))
return nil // 幂等返回
}
// 执行业务逻辑
if err := createOrder(ctx, payload); err != nil {
rdb.Del(ctx, key) // 失败时删除锁,允许重试
return err
}
return nil
}
模式 2:数据库唯一约束
CREATE TABLE tb_order (
id SERIAL PRIMARY KEY,
order_id VARCHAR(50) NOT NULL UNIQUE, -- 业务唯一键
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL
);
func createOrder(ctx context.Context, payload OrderPayload) error {
order := &model.Order{
OrderID: payload.OrderID,
Status: constants.OrderStatusPending,
}
// GORM 插入,如果 order_id 重复则返回错误
if err := db.WithContext(ctx).Create(order).Error; err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
logger.Info("订单已存在,跳过", zap.String("order_id", payload.OrderID))
return nil // 幂等返回
}
return err
}
return nil
}
模式 3:状态机(复杂业务)
func HandleOrderProcess(ctx context.Context, t *asynq.Task) error {
var payload OrderPayload
json.Unmarshal(t.Payload(), &payload)
// 加载订单
order, err := store.Order.GetByID(ctx, payload.OrderID)
if err != nil {
return err
}
// 状态检查:仅处理特定状态的订单
if order.Status != constants.OrderStatusPending {
logger.Info("订单状态不匹配,跳过",
zap.String("order_id", payload.OrderID),
zap.String("current_status", order.Status))
return nil // 幂等返回
}
// 状态转换
order.Status = constants.OrderStatusProcessing
if err := store.Order.Update(ctx, order); err != nil {
return err
}
// 执行业务逻辑
// ...
order.Status = constants.OrderStatusCompleted
return store.Order.Update(ctx, order)
}
5. 配置管理
5.1 数据库配置结构
// pkg/config/config.go
type DatabaseConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"` // 明文存储(按需求)
DBName string `mapstructure:"dbname"`
SSLMode string `mapstructure:"sslmode"`
MaxOpenConns int `mapstructure:"max_open_conns"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime"`
}
type QueueConfig struct {
Concurrency int `mapstructure:"concurrency"`
Queues map[string]int `mapstructure:"queues"`
RetryMax int `mapstructure:"retry_max"`
Timeout time.Duration `mapstructure:"timeout"`
}
5.2 配置文件示例
# configs/config.yaml
database:
host: localhost
port: 5432
user: postgres
password: password # 明文存储(生产环境建议使用环境变量)
dbname: junhong_cmp
sslmode: disable
max_open_conns: 25
max_idle_conns: 10
conn_max_lifetime: 5m
queue:
concurrency: 10
queues:
critical: 6
default: 3
low: 1
retry_max: 5
timeout: 10m
6. 性能优化建议
6.1 数据库查询优化
索引策略:
- 为 WHERE、JOIN、ORDER BY 常用字段添加索引
- 复合索引按选择性从高到低排列
- 避免过多索引(影响写入性能)
-- 单列索引
CREATE INDEX idx_user_status ON tb_user(status);
-- 复合索引(状态 + 创建时间)
CREATE INDEX idx_user_status_created ON tb_user(status, created_at);
-- 部分索引(仅索引活跃用户)
CREATE INDEX idx_user_active ON tb_user(status) WHERE status = 'active';
批量操作:
// 避免 N+1 查询
// ❌ 错误
for _, orderID := range orderIDs {
order, _ := db.Where("id = ?", orderID).First(&Order{}).Error
}
// ✅ 正确
var orders []Order
db.Where("id IN ?", orderIDs).Find(&orders)
// 批量插入
db.CreateInBatches(users, 100) // 每批 100 条
6.2 慢查询监控
// GORM 慢查询日志
db.Logger = logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
SlowThreshold: 100 * time.Millisecond, // 慢查询阈值
LogLevel: logger.Warn,
IgnoreRecordNotFoundError: true,
Colorful: false,
},
)
7. 故障处理与恢复
7.1 数据库连接失败
重试策略:
func InitPostgresWithRetry(cfg *config.DatabaseConfig, logger *zap.Logger) (*gorm.DB, error) {
maxRetries := 5
retryDelay := 2 * time.Second
for i := 0; i < maxRetries; i++ {
db, err := InitPostgres(cfg, logger)
if err == nil {
return db, nil
}
logger.Warn("数据库连接失败,重试中",
zap.Int("attempt", i+1),
zap.Int("max_retries", maxRetries),
zap.Error(err))
time.Sleep(retryDelay)
retryDelay *= 2 // 指数退避
}
return nil, fmt.Errorf("数据库连接失败,已重试 %d 次", maxRetries)
}
7.2 任务队列故障恢复
Redis 断线重连:
- Asynq 自动处理 Redis 断线重连
- Worker 重启后自动从 Redis 恢复未完成任务
脏任务清理:
# 使用 asynqmon 手动清理死信队列
# 或编写定时任务自动归档失败任务
8. 测试策略
8.1 数据库集成测试
// tests/integration/database_test.go
func TestUserCRUD(t *testing.T) {
// 使用 testcontainers 启动 PostgreSQL
ctx := context.Background()
postgresContainer, err := postgres.RunContainer(ctx,
testcontainers.WithImage("postgres:14"),
postgres.WithDatabase("test_db"),
postgres.WithUsername("postgres"),
postgres.WithPassword("password"),
)
require.NoError(t, err)
defer postgresContainer.Terminate(ctx)
// 连接测试数据库
connStr, _ := postgresContainer.ConnectionString(ctx)
db, _ := gorm.Open(postgres.Open(connStr), &gorm.Config{})
// 运行迁移
db.AutoMigrate(&model.User{})
// 测试 CRUD
user := &model.User{Username: "test", Email: "test@example.com"}
assert.NoError(t, db.Create(user).Error)
var found model.User
assert.NoError(t, db.Where("username = ?", "test").First(&found).Error)
assert.Equal(t, "test@example.com", found.Email)
}
8.2 任务队列测试
// tests/integration/task_test.go
func TestEmailTask(t *testing.T) {
// 启动内存模式的 Asynq(测试用)
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 1},
)
mux := asynq.NewServeMux()
mux.HandleFunc(constants.TaskTypeEmailSend, task.HandleEmailSend)
// 提交任务
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
payload, _ := json.Marshal(EmailPayload{To: "test@example.com"})
client.Enqueue(asynq.NewTask(constants.TaskTypeEmailSend, payload))
// 启动 worker 处理
go srv.Run(mux)
time.Sleep(2 * time.Second)
// 验证任务已处理
// ...
}
9. 安全考虑
9.1 SQL 注入防护
✅ GORM 自动防护:
// GORM 使用预编译语句,自动转义参数
db.Where("username = ?", userInput).First(&user)
❌ 避免原始 SQL:
// 危险:SQL 注入风险
db.Raw("SELECT * FROM users WHERE username = '" + userInput + "'").Scan(&user)
// 安全:使用参数化查询
db.Raw("SELECT * FROM users WHERE username = ?", userInput).Scan(&user)
9.2 密码存储
# configs/config.yaml
database:
password: ${DB_PASSWORD} # 从环境变量读取(生产环境推荐)
# .env 文件(不提交到 Git)
export DB_PASSWORD=secret_password
10. 部署与运维
10.1 健康检查
// internal/handler/health.go
func (h *Handler) HealthCheck(c *fiber.Ctx) error {
health := map[string]string{
"status": "ok",
}
// 检查 PostgreSQL
sqlDB, _ := h.db.DB()
if err := sqlDB.Ping(); err != nil {
health["postgres"] = "down"
health["status"] = "degraded"
} else {
health["postgres"] = "up"
}
// 检查 Redis(任务队列)
if err := h.rdb.Ping(c.Context()).Err(); err != nil {
health["redis"] = "down"
health["status"] = "degraded"
} else {
health["redis"] = "up"
}
statusCode := fiber.StatusOK
if health["status"] != "ok" {
statusCode = fiber.StatusServiceUnavailable
}
return c.Status(statusCode).JSON(health)
}
10.2 优雅关闭
// cmd/worker/main.go
func main() {
// ... 初始化
srv := queue.NewServer(rdb, cfg.Queue, logger)
// 处理信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-quit
logger.Info("收到关闭信号,开始优雅关闭")
// 停止接收新任务,等待现有任务完成(最多 30 秒)
srv.Shutdown()
}()
// 启动 Worker
if err := srv.Run(mux); err != nil {
logger.Fatal("Worker 运行失败", zap.Error(err))
}
}
总结
| 技术选型 | 关键决策 | 核心理由 |
|---|---|---|
| GORM | 使用 GORM 而非 sqlx | 功能完整,符合项目技术栈 |
| golang-migrate | 使用外部迁移工具而非 AutoMigrate | 版本控制,可回滚,生产安全 |
| Asynq | 使用 Asynq 而非 Machinery | Redis 原生,功能完整,监控友好 |
| 连接池 | MaxOpenConns=25, MaxIdleConns=10 | 平衡性能和资源消耗 |
| 重试策略 | 最大 5 次,指数退避 | 避免雪崩,给系统恢复时间 |
| 幂等性 | Redis 去重 + 数据库唯一约束 | 防止重复执行,确保数据一致性 |
下一步:Phase 1 设计与契约生成(data-model.md、contracts/、quickstart.md)