# Research: 数据持久化与异步任务处理集成 **Feature**: 002-gorm-postgres-asynq **Date**: 2025-11-12 **Purpose**: 记录技术选型决策、最佳实践和架构考量 ## 概述 本文档记录了 GORM + PostgreSQL + Asynq 集成的技术研究成果,包括技术选型理由、配置建议、最佳实践和常见陷阱。 --- ## 1. GORM 与 PostgreSQL 集成 ### 决策:选择 GORM 作为 ORM 框架 **理由**: - **官方支持**:GORM 是 Go 生态系统中最流行的 ORM,社区活跃,文档完善 - **PostgreSQL 原生支持**:提供专门的 PostgreSQL 驱动和方言 - **功能完整**:支持复杂查询、关联关系、事务、钩子、软删除等 - **性能优秀**:支持预编译语句、批量操作、连接池管理 - **符合 Constitution**:项目技术栈要求使用 GORM **替代方案**: - **sqlx**:更轻量,但功能不够完整,需要手写更多 SQL - **ent**:Facebook 开发,功能强大,但学习曲线陡峭,且不符合项目技术栈要求 ### GORM 最佳实践 #### 1.1 连接初始化 ```go // pkg/database/postgres.go import ( "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/logger" ) func InitPostgres(cfg *config.DatabaseConfig, log *zap.Logger) (*gorm.DB, error) { dsn := fmt.Sprintf( "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.DBName, cfg.SSLMode, ) // GORM 配置 gormConfig := &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), // 使用 Zap 替代 GORM 日志 NamingStrategy: schema.NamingStrategy{ TablePrefix: "tb_", // 表名前缀 SingularTable: true, // 使用单数表名 }, PrepareStmt: true, // 启用预编译语句缓存 } db, err := gorm.Open(postgres.Open(dsn), gormConfig) if err != nil { return nil, fmt.Errorf("连接 PostgreSQL 失败: %w", err) } // 获取底层 sql.DB 进行连接池配置 sqlDB, err := db.DB() if err != nil { return nil, fmt.Errorf("获取 sql.DB 失败: %w", err) } // 连接池配置(参考 Constitution 性能要求) sqlDB.SetMaxOpenConns(cfg.MaxOpenConns) // 最大连接数:25 sqlDB.SetMaxIdleConns(cfg.MaxIdleConns) // 最大空闲连接:10 sqlDB.SetConnMaxLifetime(cfg.ConnMaxLifetime) // 连接最大生命周期:5m // 验证连接 if err := sqlDB.Ping(); err != nil { return nil, fmt.Errorf("PostgreSQL 连接验证失败: %w", err) } log.Info("PostgreSQL 连接成功", zap.String("host", cfg.Host), zap.Int("port", cfg.Port), zap.String("database", cfg.DBName)) return db, nil } ``` #### 1.2 连接池配置建议 | 参数 | 推荐值 | 理由 | |------|--------|------| | MaxOpenConns | 25 | 平衡性能和资源,避免 PostgreSQL 连接耗尽 | | MaxIdleConns | 10 | 保持足够的空闲连接以应对突发流量 | | ConnMaxLifetime | 5m | 定期回收连接,避免长连接问题 | **计算公式**: ``` MaxOpenConns = (可用内存 / 每连接内存) * 安全系数 每连接内存 ≈ 10MB(PostgreSQL 典型值) 安全系数 = 0.7(为其他进程预留资源) ``` #### 1.3 模型定义规范 ```go // internal/model/user.go type User struct { ID uint `gorm:"primarykey" json:"id"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除 Username string `gorm:"uniqueIndex;not null;size:50" json:"username"` Email string `gorm:"uniqueIndex;not null;size:100" json:"email"` Status string `gorm:"not null;size:20;default:'active'" json:"status"` // 关联关系示例(如果需要) // Orders []Order `gorm:"foreignKey:UserID" json:"orders,omitempty"` } // TableName 指定表名(如果不使用默认命名) func (User) TableName() string { return "tb_user" // 遵循 NamingStrategy 的 TablePrefix } ``` **命名规范**: - 字段名使用 PascalCase(Go 约定) - 数据库列名自动转换为 snake_case - 表名使用 `tb_` 前缀(可配置) - JSON tag 使用 snake_case #### 1.4 事务处理 ```go // internal/store/postgres/transaction.go func (s *Store) Transaction(ctx context.Context, fn func(*Store) error) error { return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // 创建事务内的 Store 实例 txStore := &Store{db: tx, logger: s.logger} return fn(txStore) }) } // 使用示例 err := store.Transaction(ctx, func(tx *Store) error { if err := tx.User.Create(ctx, user); err != nil { return err // 自动回滚 } if err := tx.Order.Create(ctx, order); err != nil { return err // 自动回滚 } return nil // 自动提交 }) ``` **事务最佳实践**: - 使用 `context.Context` 传递超时和取消信号 - 事务内操作尽可能快(< 50ms),避免长事务锁表 - 事务失败自动回滚,无需手动处理 - 避免事务嵌套(GORM 使用 SavePoint 处理嵌套事务) --- ## 2. 数据库迁移:golang-migrate ### 决策:使用 golang-migrate 而非 GORM AutoMigrate **理由**: - **版本控制**:迁移文件版本化,可追溯数据库 schema 变更历史 - **可回滚**:每个迁移包含 up/down 脚本,支持安全回滚 - **生产安全**:明确的 SQL 语句,避免 AutoMigrate 的意外变更 - **团队协作**:迁移文件可 code review,减少数据库变更风险 - **符合 Constitution**:项目规范要求使用外部迁移工具 **GORM AutoMigrate 的问题**: - 无法回滚 - 无法删除列(只能添加和修改) - 不支持复杂的 schema 变更(如重命名列) - 生产环境风险高 ### golang-migrate 使用指南 #### 2.1 安装 ```bash # macOS brew install golang-migrate # Linux curl -L https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz | tar xvz sudo mv migrate /usr/local/bin/ # Go install go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest ``` #### 2.2 创建迁移文件 ```bash # 创建新迁移 migrate create -ext sql -dir migrations -seq init_schema # 生成文件: # migrations/000001_init_schema.up.sql # migrations/000001_init_schema.down.sql ``` #### 2.3 迁移文件示例 ```sql -- migrations/000001_init_schema.up.sql CREATE TABLE IF NOT EXISTS tb_user ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, deleted_at TIMESTAMP, username VARCHAR(50) NOT NULL UNIQUE, email VARCHAR(100) NOT NULL UNIQUE, status VARCHAR(20) NOT NULL DEFAULT 'active' ); CREATE INDEX idx_user_deleted_at ON tb_user(deleted_at); CREATE INDEX idx_user_status ON tb_user(status); -- migrations/000001_init_schema.down.sql DROP TABLE IF EXISTS tb_user; ``` #### 2.4 执行迁移 ```bash # 向上迁移(应用所有未执行的迁移) migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" up # 回滚最后一次迁移 migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" down 1 # 迁移到指定版本 migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" goto 3 # 强制设置版本(修复脏迁移) migrate -path migrations -database "postgresql://user:password@localhost:5432/dbname?sslmode=disable" force 2 ``` #### 2.5 迁移脚本封装 ```bash #!/bin/bash # scripts/migrate.sh set -e DB_USER=${DB_USER:-"postgres"} DB_PASSWORD=${DB_PASSWORD:-"password"} DB_HOST=${DB_HOST:-"localhost"} DB_PORT=${DB_PORT:-"5432"} DB_NAME=${DB_NAME:-"junhong_cmp"} DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}?sslmode=disable" case "$1" in up) migrate -path migrations -database "$DATABASE_URL" up ;; down) migrate -path migrations -database "$DATABASE_URL" down ${2:-1} ;; create) migrate create -ext sql -dir migrations -seq "$2" ;; version) migrate -path migrations -database "$DATABASE_URL" version ;; *) echo "Usage: $0 {up|down [n]|create |version}" exit 1 esac ``` --- ## 3. Asynq 任务队列 ### 决策:选择 Asynq 作为异步任务队列 **理由**: - **Redis 原生支持**:基于 Redis,无需额外中间件 - **功能完整**:支持任务重试、优先级、定时任务、唯一性约束 - **高性能**:支持并发处理,可配置 worker 数量 - **可观测性**:提供 Web UI 监控面板(asynqmon) - **符合 Constitution**:项目技术栈要求使用 Asynq **替代方案**: - **Machinery**:功能类似,但社区活跃度不如 Asynq - **RabbitMQ + amqp091-go**:更重量级,需要额外部署 RabbitMQ - **Kafka**:适合大规模流处理,对本项目过于复杂 ### Asynq 架构设计 #### 3.1 Client(任务提交) ```go // pkg/queue/client.go import ( "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" ) type Client struct { client *asynq.Client logger *zap.Logger } func NewClient(rdb *redis.Client, logger *zap.Logger) *Client { return &Client{ client: asynq.NewClient(asynq.RedisClientOpt{Addr: rdb.Options().Addr}), logger: logger, } } func (c *Client) EnqueueTask(ctx context.Context, taskType string, payload []byte, opts ...asynq.Option) error { task := asynq.NewTask(taskType, payload, opts...) info, err := c.client.EnqueueContext(ctx, task) if err != nil { c.logger.Error("任务入队失败", zap.String("task_type", taskType), zap.Error(err)) return err } c.logger.Info("任务入队成功", zap.String("task_id", info.ID), zap.String("queue", info.Queue)) return nil } ``` #### 3.2 Server(任务处理) ```go // pkg/queue/server.go func NewServer(rdb *redis.Client, cfg *config.QueueConfig, logger *zap.Logger) *asynq.Server { return asynq.NewServer( asynq.RedisClientOpt{Addr: rdb.Options().Addr}, asynq.Config{ Concurrency: cfg.Concurrency, // 并发数(默认 10) Queues: map[string]int{ "critical": 6, // 权重:60% "default": 3, // 权重:30% "low": 1, // 权重:10% }, ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { logger.Error("任务执行失败", zap.String("task_type", task.Type()), zap.Error(err)) }), Logger: &AsynqLogger{logger: logger}, // 自定义日志适配器 }, ) } // cmd/worker/main.go func main() { // ... 初始化配置、日志、Redis srv := queue.NewServer(rdb, cfg.Queue, logger) mux := asynq.NewServeMux() // 注册任务处理器 mux.HandleFunc(constants.TaskTypeEmailSend, task.HandleEmailSend) mux.HandleFunc(constants.TaskTypeDataSync, task.HandleDataSync) if err := srv.Run(mux); err != nil { logger.Fatal("Worker 启动失败", zap.Error(err)) } } ``` #### 3.3 任务处理器(Handler) ```go // internal/task/email.go func HandleEmailSend(ctx context.Context, t *asynq.Task) error { var payload EmailPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } // 幂等性检查(使用 Redis 或数据库) key := constants.RedisTaskLockKey(payload.RequestID) if exists, _ := rdb.Exists(ctx, key).Result(); exists > 0 { logger.Info("任务已处理,跳过", zap.String("request_id", payload.RequestID)) return nil // 返回 nil 表示成功,避免重试 } // 执行任务 if err := sendEmail(ctx, payload); err != nil { return fmt.Errorf("发送邮件失败: %w", err) // 返回错误触发重试 } // 标记任务已完成(设置过期时间,避免内存泄漏) rdb.SetEx(ctx, key, "1", 24*time.Hour) logger.Info("邮件发送成功", zap.String("to", payload.To), zap.String("request_id", payload.RequestID)) return nil } ``` ### Asynq 配置建议 #### 3.4 重试策略 ```go // 默认重试策略:指数退避 task := asynq.NewTask( constants.TaskTypeDataSync, payload, asynq.MaxRetry(5), // 最大重试 5 次 asynq.Timeout(10*time.Minute), // 任务超时 10 分钟 asynq.Queue("default"), // 队列名称 asynq.Retention(24*time.Hour), // 保留成功任务 24 小时 ) // 自定义重试延迟(指数退避:1s, 2s, 4s, 8s, 16s) asynq.RetryDelayFunc(func(n int, e error, t *asynq.Task) time.Duration { return time.Duration(1<