Files
junhong_cmp_fiber/specs/002-gorm-postgres-asynq/quickstart.md
huang 984ccccc63 docs(constitution): 新增数据库设计原则(v2.4.0)
在项目宪章中新增第九条原则"数据库设计原则",明确禁止使用数据库外键约束和ORM关联标签。

主要变更:
- 新增原则IX:数据库设计原则(Database Design Principles)
- 强制要求:数据库表不得使用外键约束
- 强制要求:GORM模型不得使用ORM关联标签(foreignKey、hasMany等)
- 强制要求:表关系必须通过ID字段手动维护
- 强制要求:关联数据查询必须显式编写,避免ORM魔法
- 强制要求:时间字段由GORM处理,不使用数据库触发器

设计理念:
- 提升业务逻辑灵活性(无数据库约束限制)
- 优化高并发性能(无外键检查开销)
- 增强代码可读性(显式查询,无隐式预加载)
- 简化数据库架构和迁移流程
- 支持分布式和微服务场景

版本升级:2.3.0 → 2.4.0(MINOR)
2025-11-13 13:40:19 +08:00

17 KiB
Raw Blame History

Quick Start Guide: 数据持久化与异步任务处理集成

Feature: 002-gorm-postgres-asynq
Date: 2025-11-12
Purpose: 快速开始指南和使用示例

概述

本指南帮助开发者快速搭建和使用 GORM + PostgreSQL + Asynq 集成的数据持久化和异步任务处理功能。


前置要求

系统要求

  • Go 1.25.4+
  • PostgreSQL 14+
  • Redis 6.0+
  • golang-migrate CLI 工具

安装依赖

# 安装 Go 依赖
go mod tidy

# 安装 golang-migratemacOS
brew install golang-migrate

# 安装 golang-migrateLinux
curl -L https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz | tar xvz
sudo mv migrate /usr/local/bin/

# 或使用 Go install
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest

步骤 1: 启动 PostgreSQL

使用 Docker推荐

# 启动 PostgreSQL 容器
docker run --name postgres-dev \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=junhong_cmp \
  -p 5432:5432 \
  -d postgres:14

# 验证运行状态
docker ps | grep postgres-dev

使用本地安装

# macOS
brew install postgresql@14
brew services start postgresql@14

# 创建数据库
createdb junhong_cmp

验证连接

# 测试连接
psql -h localhost -p 5432 -U postgres -d junhong_cmp

# 如果成功,会进入 PostgreSQL 命令行
# 输入 \q 退出

步骤 2: 启动 Redis

# 使用 Docker
docker run --name redis-dev \
  -p 6379:6379 \
  -d redis:7-alpine

# 或使用本地安装macOS
brew install redis
brew services start redis

# 验证 Redis
redis-cli ping
# 应返回: PONG

步骤 3: 配置数据库连接

编辑配置文件 configs/config.yaml,添加数据库和队列配置:

# configs/config.yaml

# 数据库配置
database:
  host: localhost
  port: 5432
  user: postgres
  password: password          # 开发环境明文存储,生产环境使用环境变量
  dbname: junhong_cmp
  sslmode: disable            # 开发环境禁用 SSL生产环境使用 require
  max_open_conns: 25
  max_idle_conns: 10
  conn_max_lifetime: 5m

# 任务队列配置
queue:
  concurrency: 10             # Worker 并发数
  queues:                     # 队列优先级(权重)
    critical: 6               # 关键任务60%
    default: 3                # 普通任务30%
    low: 1                    # 低优先级10%
  retry_max: 5                # 最大重试次数
  timeout: 10m                # 任务超时时间

步骤 4: 运行数据库迁移

方法 1: 使用迁移脚本(推荐)

# 赋予执行权限
chmod +x scripts/migrate.sh

# 向上迁移(应用所有迁移)
./scripts/migrate.sh up

# 查看当前版本
./scripts/migrate.sh version

# 回滚最后一次迁移
./scripts/migrate.sh down 1

# 创建新迁移
./scripts/migrate.sh create add_sim_table

方法 2: 直接使用 migrate CLI

# 设置数据库 URL
export DATABASE_URL="postgresql://postgres:password@localhost:5432/junhong_cmp?sslmode=disable"

# 向上迁移
migrate -path migrations -database "$DATABASE_URL" up

# 查看版本
migrate -path migrations -database "$DATABASE_URL" version

验证迁移成功

# 连接数据库
psql -h localhost -p 5432 -U postgres -d junhong_cmp

# 查看表
\dt

# 应该看到:
# tb_user
# tb_order
# schema_migrations由 golang-migrate 创建)

# 退出
\q

步骤 5: 启动 API 服务

# 从项目根目录运行
go run cmd/api/main.go

# 预期输出:
# {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432}
# {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"}
# {"level":"info","timestamp":"...","message":"服务启动成功","host":"0.0.0.0","port":8080}

验证 API 服务

# 测试健康检查
curl http://localhost:8080/health

# 预期响应:
# {
#   "status": "ok",
#   "postgres": "up",
#   "redis": "up"
# }

步骤 6: 启动 Worker 服务

打开新的终端窗口:

# 从项目根目录运行
go run cmd/worker/main.go

# 预期输出:
# {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432}
# {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"}
# {"level":"info","timestamp":"...","message":"Worker 启动成功","concurrency":10}

使用示例

示例 1: 数据库 CRUD 操作

创建用户

curl -X POST http://localhost:8080/api/v1/users \
  -H "Content-Type: application/json" \
  -H "token: valid_token_here" \
  -d '{
    "username": "testuser",
    "email": "test@example.com",
    "password": "password123"
  }'

# 响应:
# {
#   "code": 0,
#   "msg": "success",
#   "data": {
#     "id": 1,
#     "username": "testuser",
#     "email": "test@example.com",
#     "status": "active",
#     "created_at": "2025-11-12T16:00:00+08:00",
#     "updated_at": "2025-11-12T16:00:00+08:00"
#   },
#   "timestamp": "2025-11-12T16:00:00+08:00"
# }

查询用户

curl http://localhost:8080/api/v1/users/1 \
  -H "token: valid_token_here"

# 响应:
# {
#   "code": 0,
#   "msg": "success",
#   "data": {
#     "id": 1,
#     "username": "testuser",
#     "email": "test@example.com",
#     "status": "active",
#     ...
#   }
# }

更新用户

curl -X PUT http://localhost:8080/api/v1/users/1 \
  -H "Content-Type: application/json" \
  -H "token: valid_token_here" \
  -d '{
    "email": "newemail@example.com",
    "status": "inactive"
  }'

列表查询(分页)

curl "http://localhost:8080/api/v1/users?page=1&page_size=20" \
  -H "token: valid_token_here"

# 响应:
# {
#   "code": 0,
#   "msg": "success",
#   "data": {
#     "users": [...],
#     "page": 1,
#     "page_size": 20,
#     "total": 100,
#     "total_pages": 5
#   }
# }

删除用户(软删除)

curl -X DELETE http://localhost:8080/api/v1/users/1 \
  -H "token: valid_token_here"

示例 2: 提交异步任务

提交邮件发送任务

curl -X POST http://localhost:8080/api/v1/tasks/email \
  -H "Content-Type: application/json" \
  -H "token: valid_token_here" \
  -d '{
    "to": "user@example.com",
    "subject": "Welcome",
    "body": "Welcome to our service!"
  }'

# 响应:
# {
#   "code": 0,
#   "msg": "任务已提交",
#   "data": {
#     "task_id": "550e8400-e29b-41d4-a716-446655440000",
#     "queue": "default"
#   }
# }

提交数据同步任务(高优先级)

curl -X POST http://localhost:8080/api/v1/tasks/sync \
  -H "Content-Type: application/json" \
  -H "token: valid_token_here" \
  -d '{
    "sync_type": "sim_status",
    "start_date": "2025-11-01",
    "end_date": "2025-11-12",
    "priority": "critical"
  }'

示例 3: 直接在代码中使用数据库

// internal/service/user/service.go
package user

import (
    "context"
    "github.com/break/junhong_cmp_fiber/internal/model"
    "github.com/break/junhong_cmp_fiber/internal/store/postgres"
    "github.com/break/junhong_cmp_fiber/pkg/constants"
)

type Service struct {
    store  *postgres.Store
    logger *zap.Logger
}

// CreateUser 创建用户
func (s *Service) CreateUser(ctx context.Context, req *model.CreateUserRequest) (*model.User, error) {
    // 参数验证
    if err := validate.Struct(req); err != nil {
        return nil, err
    }
    
    // 密码哈希
    hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
    if err != nil {
        return nil, err
    }
    
    // 创建用户
    user := &model.User{
        Username: req.Username,
        Email:    req.Email,
        Password: string(hashedPassword),
        Status:   constants.UserStatusActive,
    }
    
    if err := s.store.User.Create(ctx, user); err != nil {
        s.logger.Error("创建用户失败",
            zap.String("username", req.Username),
            zap.Error(err))
        return nil, err
    }
    
    s.logger.Info("用户创建成功",
        zap.Uint("user_id", user.ID),
        zap.String("username", user.Username))
    
    return user, nil
}

// GetUserByID 根据 ID 获取用户
func (s *Service) GetUserByID(ctx context.Context, id uint) (*model.User, error) {
    user, err := s.store.User.GetByID(ctx, id)
    if err != nil {
        if errors.Is(err, gorm.ErrRecordNotFound) {
            return nil, errors.New(errors.CodeNotFound, "用户不存在")
        }
        return nil, err
    }
    return user, nil
}

示例 4: 在代码中提交异步任务

// internal/service/email/service.go
package email

import (
    "context"
    "encoding/json"
    "github.com/break/junhong_cmp_fiber/internal/task"
    "github.com/break/junhong_cmp_fiber/pkg/constants"
    "github.com/break/junhong_cmp_fiber/pkg/queue"
    "github.com/hibiken/asynq"
)

type Service struct {
    queueClient *queue.Client
    logger      *zap.Logger
}

// SendWelcomeEmail 发送欢迎邮件(异步)
func (s *Service) SendWelcomeEmail(ctx context.Context, userID uint, email string) error {
    // 构造任务载荷
    payload := &task.EmailPayload{
        RequestID: fmt.Sprintf("welcome-%d", userID),
        To:        email,
        Subject:   "欢迎加入",
        Body:      "感谢您注册我们的服务!",
    }
    
    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        return err
    }
    
    // 提交任务到队列
    err = s.queueClient.EnqueueTask(
        ctx,
        constants.TaskTypeEmailSend,
        payloadBytes,
        asynq.Queue(constants.QueueDefault),
        asynq.MaxRetry(constants.DefaultRetryMax),
    )
    
    if err != nil {
        s.logger.Error("提交邮件任务失败",
            zap.Uint("user_id", userID),
            zap.String("email", email),
            zap.Error(err))
        return err
    }
    
    s.logger.Info("欢迎邮件任务已提交",
        zap.Uint("user_id", userID),
        zap.String("email", email))
    
    return nil
}

示例 5: 事务处理

// internal/service/order/service.go
package order

// CreateOrderWithUser 创建订单并更新用户统计(事务)
func (s *Service) CreateOrderWithUser(ctx context.Context, req *CreateOrderRequest) (*model.Order, error) {
    var order *model.Order
    
    // 使用事务
    err := s.store.Transaction(ctx, func(tx *postgres.Store) error {
        // 1. 创建订单
        order = &model.Order{
            OrderID: generateOrderID(),
            UserID:  req.UserID,
            Amount:  req.Amount,
            Status:  constants.OrderStatusPending,
        }
        
        if err := tx.Order.Create(ctx, order); err != nil {
            return err
        }
        
        // 2. 更新用户订单计数
        user, err := tx.User.GetByID(ctx, req.UserID)
        if err != nil {
            return err
        }
        
        user.OrderCount++
        if err := tx.User.Update(ctx, user); err != nil {
            return err
        }
        
        return nil // 提交事务
    })
    
    if err != nil {
        s.logger.Error("创建订单失败",
            zap.Uint("user_id", req.UserID),
            zap.Error(err))
        return nil, err
    }
    
    return order, nil
}

监控和调试

查看数据库数据

# 连接数据库
psql -h localhost -p 5432 -U postgres -d junhong_cmp

# 查询用户
SELECT * FROM tb_user;

# 查询订单
SELECT * FROM tb_order WHERE user_id = 1;

# 查看迁移历史
SELECT * FROM schema_migrations;

查看任务队列状态

使用 asynqmonWeb UI

# 安装 asynqmon
go install github.com/hibiken/asynqmon@latest

# 启动监控面板
asynqmon --redis-addr=localhost:6379

# 访问 http://localhost:8080
# 可以查看:
# - 队列统计
# - 任务状态pending, active, completed, failed
# - 重试历史
# - 失败任务详情

使用 Redis CLI

# 查看所有队列
redis-cli KEYS "asynq:*"

# 查看 default 队列长度
redis-cli LLEN "asynq:{default}:pending"

# 查看任务详情
redis-cli HGETALL "asynq:task:{task_id}"

查看日志

# 实时查看应用日志
tail -f logs/app.log | jq .

# 过滤错误日志
tail -f logs/app.log | jq 'select(.level == "error")'

# 查看访问日志
tail -f logs/access.log | jq .

# 过滤慢查询
tail -f logs/app.log | jq 'select(.duration_ms > 100)'

测试

单元测试

# 运行所有测试
go test ./...

# 运行特定包的测试
go test ./internal/store/postgres/...

# 带覆盖率
go test -cover ./...

# 详细输出
go test -v ./...

集成测试

# 运行集成测试(需要 PostgreSQL 和 Redis
go test -v ./tests/integration/...

# 单独测试数据库功能
go test -v ./tests/integration/database_test.go

# 单独测试任务队列
go test -v ./tests/integration/task_test.go

使用 Testcontainers推荐

集成测试会自动启动 PostgreSQL 和 Redis 容器:

// tests/integration/database_test.go
func TestUserCRUD(t *testing.T) {
    // 自动启动 PostgreSQL 容器
    // 运行测试
    // 自动清理容器
}

故障排查

问题 1: 数据库连接失败

错误: dial tcp 127.0.0.1:5432: connect: connection refused

解决方案:

# 检查 PostgreSQL 是否运行
docker ps | grep postgres

# 检查端口占用
lsof -i :5432

# 重启 PostgreSQL
docker restart postgres-dev

问题 2: 迁移失败

错误: Dirty database version 1. Fix and force version.

解决方案:

# 强制设置版本
migrate -path migrations -database "$DATABASE_URL" force 1

# 然后重新运行迁移
migrate -path migrations -database "$DATABASE_URL" up

问题 3: Worker 无法连接 Redis

错误: dial tcp 127.0.0.1:6379: connect: connection refused

解决方案:

# 检查 Redis 是否运行
docker ps | grep redis

# 测试连接
redis-cli ping

# 重启 Redis
docker restart redis-dev

问题 4: 任务一直重试

原因: 任务处理函数返回错误

解决方案:

  1. 检查 Worker 日志:tail -f logs/app.log | jq 'select(.level == "error")'
  2. 使用 asynqmon 查看失败详情
  3. 检查任务幂等性实现
  4. 验证 Redis 锁键是否正确设置

环境配置

开发环境

export CONFIG_ENV=dev
go run cmd/api/main.go

预发布环境

export CONFIG_ENV=staging
go run cmd/api/main.go

生产环境

export CONFIG_ENV=prod
export DB_PASSWORD=secure_password  # 使用环境变量
go run cmd/api/main.go

性能调优建议

数据库连接池

根据服务器资源调整:

database:
  max_open_conns: 25     # 增大以支持更多并发
  max_idle_conns: 10     # 保持足够的空闲连接
  conn_max_lifetime: 5m  # 定期回收连接

Worker 并发数

根据任务类型调整:

queue:
  concurrency: 20        # I/O 密集型CPU 核心数 × 2
  # concurrency: 8       # CPU 密集型CPU 核心数

队列优先级

根据业务需求调整:

queue:
  queues:
    critical: 8          # 提高关键任务权重
    default: 2
    low: 1

下一步

  1. 添加业务模型: 参考 internal/model/user.go 创建 SIM 卡、订单等业务实体
  2. 实现业务逻辑: 在 Service 层实现具体业务逻辑
  3. 添加迁移文件: 使用 ./scripts/migrate.sh create 添加新表
  4. 创建异步任务: 参考 internal/task/email.go 创建新的任务处理器
  5. 编写测试: 为所有 Service 层业务逻辑编写单元测试

参考资料


常见问题FAQ

Q: 如何添加新的数据库表?
A: 使用 ./scripts/migrate.sh create table_name 创建迁移文件,编辑 SQL然后运行 ./scripts/migrate.sh up

Q: 任务失败后会怎样?
A: 根据配置自动重试(默认 5 次指数退避。5 次后仍失败会进入死信队列,可在 asynqmon 中查看。

Q: 如何保证任务幂等性?
A: 使用 Redis 锁或数据库唯一约束。参考 research.md 中的幂等性设计模式。

Q: 如何扩展 Worker
A: 启动多个 Worker 进程(不同机器或容器),连接同一个 Redis。Asynq 自动负载均衡。

Q: 数据库密码如何安全存储?
A: 生产环境使用环境变量:export DB_PASSWORD=xxx,配置文件中使用 ${DB_PASSWORD}

Q: 如何监控任务执行情况?
A: 使用 asynqmon Web UI 或通过 Redis CLI 查看队列状态。


总结

本指南涵盖了:

  • 环境搭建PostgreSQL、Redis
  • 数据库迁移
  • 服务启动API + Worker
  • CRUD 操作示例
  • 异步任务提交和处理
  • 事务处理
  • 监控和调试
  • 故障排查
  • 性能调优

推荐开发流程

  1. 设计数据模型 → 2. 创建迁移文件 → 3. 实现 Store 层 → 4. 实现 Service 层 → 5. 实现 Handler 层 → 6. 编写测试 → 7. 运行和验证