在项目宪章中新增第九条原则"数据库设计原则",明确禁止使用数据库外键约束和ORM关联标签。 主要变更: - 新增原则IX:数据库设计原则(Database Design Principles) - 强制要求:数据库表不得使用外键约束 - 强制要求:GORM模型不得使用ORM关联标签(foreignKey、hasMany等) - 强制要求:表关系必须通过ID字段手动维护 - 强制要求:关联数据查询必须显式编写,避免ORM魔法 - 强制要求:时间字段由GORM处理,不使用数据库触发器 设计理念: - 提升业务逻辑灵活性(无数据库约束限制) - 优化高并发性能(无外键检查开销) - 增强代码可读性(显式查询,无隐式预加载) - 简化数据库架构和迁移流程 - 支持分布式和微服务场景 版本升级:2.3.0 → 2.4.0(MINOR)
17 KiB
Quick Start Guide: 数据持久化与异步任务处理集成
Feature: 002-gorm-postgres-asynq
Date: 2025-11-12
Purpose: 快速开始指南和使用示例
概述
本指南帮助开发者快速搭建和使用 GORM + PostgreSQL + Asynq 集成的数据持久化和异步任务处理功能。
前置要求
系统要求
- Go 1.25.4+
- PostgreSQL 14+
- Redis 6.0+
- golang-migrate CLI 工具
安装依赖
# 安装 Go 依赖
go mod tidy
# 安装 golang-migrate(macOS)
brew install golang-migrate
# 安装 golang-migrate(Linux)
curl -L https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz | tar xvz
sudo mv migrate /usr/local/bin/
# 或使用 Go install
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
步骤 1: 启动 PostgreSQL
使用 Docker(推荐)
# 启动 PostgreSQL 容器
docker run --name postgres-dev \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=junhong_cmp \
-p 5432:5432 \
-d postgres:14
# 验证运行状态
docker ps | grep postgres-dev
使用本地安装
# macOS
brew install postgresql@14
brew services start postgresql@14
# 创建数据库
createdb junhong_cmp
验证连接
# 测试连接
psql -h localhost -p 5432 -U postgres -d junhong_cmp
# 如果成功,会进入 PostgreSQL 命令行
# 输入 \q 退出
步骤 2: 启动 Redis
# 使用 Docker
docker run --name redis-dev \
-p 6379:6379 \
-d redis:7-alpine
# 或使用本地安装(macOS)
brew install redis
brew services start redis
# 验证 Redis
redis-cli ping
# 应返回: PONG
步骤 3: 配置数据库连接
编辑配置文件 configs/config.yaml,添加数据库和队列配置:
# configs/config.yaml
# 数据库配置
database:
host: localhost
port: 5432
user: postgres
password: password # 开发环境明文存储,生产环境使用环境变量
dbname: junhong_cmp
sslmode: disable # 开发环境禁用 SSL,生产环境使用 require
max_open_conns: 25
max_idle_conns: 10
conn_max_lifetime: 5m
# 任务队列配置
queue:
concurrency: 10 # Worker 并发数
queues: # 队列优先级(权重)
critical: 6 # 关键任务:60%
default: 3 # 普通任务:30%
low: 1 # 低优先级:10%
retry_max: 5 # 最大重试次数
timeout: 10m # 任务超时时间
步骤 4: 运行数据库迁移
方法 1: 使用迁移脚本(推荐)
# 赋予执行权限
chmod +x scripts/migrate.sh
# 向上迁移(应用所有迁移)
./scripts/migrate.sh up
# 查看当前版本
./scripts/migrate.sh version
# 回滚最后一次迁移
./scripts/migrate.sh down 1
# 创建新迁移
./scripts/migrate.sh create add_sim_table
方法 2: 直接使用 migrate CLI
# 设置数据库 URL
export DATABASE_URL="postgresql://postgres:password@localhost:5432/junhong_cmp?sslmode=disable"
# 向上迁移
migrate -path migrations -database "$DATABASE_URL" up
# 查看版本
migrate -path migrations -database "$DATABASE_URL" version
验证迁移成功
# 连接数据库
psql -h localhost -p 5432 -U postgres -d junhong_cmp
# 查看表
\dt
# 应该看到:
# tb_user
# tb_order
# schema_migrations(由 golang-migrate 创建)
# 退出
\q
步骤 5: 启动 API 服务
# 从项目根目录运行
go run cmd/api/main.go
# 预期输出:
# {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432}
# {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"}
# {"level":"info","timestamp":"...","message":"服务启动成功","host":"0.0.0.0","port":8080}
验证 API 服务
# 测试健康检查
curl http://localhost:8080/health
# 预期响应:
# {
# "status": "ok",
# "postgres": "up",
# "redis": "up"
# }
步骤 6: 启动 Worker 服务
打开新的终端窗口:
# 从项目根目录运行
go run cmd/worker/main.go
# 预期输出:
# {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432}
# {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"}
# {"level":"info","timestamp":"...","message":"Worker 启动成功","concurrency":10}
使用示例
示例 1: 数据库 CRUD 操作
创建用户
curl -X POST http://localhost:8080/api/v1/users \
-H "Content-Type: application/json" \
-H "token: valid_token_here" \
-d '{
"username": "testuser",
"email": "test@example.com",
"password": "password123"
}'
# 响应:
# {
# "code": 0,
# "msg": "success",
# "data": {
# "id": 1,
# "username": "testuser",
# "email": "test@example.com",
# "status": "active",
# "created_at": "2025-11-12T16:00:00+08:00",
# "updated_at": "2025-11-12T16:00:00+08:00"
# },
# "timestamp": "2025-11-12T16:00:00+08:00"
# }
查询用户
curl http://localhost:8080/api/v1/users/1 \
-H "token: valid_token_here"
# 响应:
# {
# "code": 0,
# "msg": "success",
# "data": {
# "id": 1,
# "username": "testuser",
# "email": "test@example.com",
# "status": "active",
# ...
# }
# }
更新用户
curl -X PUT http://localhost:8080/api/v1/users/1 \
-H "Content-Type: application/json" \
-H "token: valid_token_here" \
-d '{
"email": "newemail@example.com",
"status": "inactive"
}'
列表查询(分页)
curl "http://localhost:8080/api/v1/users?page=1&page_size=20" \
-H "token: valid_token_here"
# 响应:
# {
# "code": 0,
# "msg": "success",
# "data": {
# "users": [...],
# "page": 1,
# "page_size": 20,
# "total": 100,
# "total_pages": 5
# }
# }
删除用户(软删除)
curl -X DELETE http://localhost:8080/api/v1/users/1 \
-H "token: valid_token_here"
示例 2: 提交异步任务
提交邮件发送任务
curl -X POST http://localhost:8080/api/v1/tasks/email \
-H "Content-Type: application/json" \
-H "token: valid_token_here" \
-d '{
"to": "user@example.com",
"subject": "Welcome",
"body": "Welcome to our service!"
}'
# 响应:
# {
# "code": 0,
# "msg": "任务已提交",
# "data": {
# "task_id": "550e8400-e29b-41d4-a716-446655440000",
# "queue": "default"
# }
# }
提交数据同步任务(高优先级)
curl -X POST http://localhost:8080/api/v1/tasks/sync \
-H "Content-Type: application/json" \
-H "token: valid_token_here" \
-d '{
"sync_type": "sim_status",
"start_date": "2025-11-01",
"end_date": "2025-11-12",
"priority": "critical"
}'
示例 3: 直接在代码中使用数据库
// internal/service/user/service.go
package user
import (
"context"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
type Service struct {
store *postgres.Store
logger *zap.Logger
}
// CreateUser 创建用户
func (s *Service) CreateUser(ctx context.Context, req *model.CreateUserRequest) (*model.User, error) {
// 参数验证
if err := validate.Struct(req); err != nil {
return nil, err
}
// 密码哈希
hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
if err != nil {
return nil, err
}
// 创建用户
user := &model.User{
Username: req.Username,
Email: req.Email,
Password: string(hashedPassword),
Status: constants.UserStatusActive,
}
if err := s.store.User.Create(ctx, user); err != nil {
s.logger.Error("创建用户失败",
zap.String("username", req.Username),
zap.Error(err))
return nil, err
}
s.logger.Info("用户创建成功",
zap.Uint("user_id", user.ID),
zap.String("username", user.Username))
return user, nil
}
// GetUserByID 根据 ID 获取用户
func (s *Service) GetUserByID(ctx context.Context, id uint) (*model.User, error) {
user, err := s.store.User.GetByID(ctx, id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errors.New(errors.CodeNotFound, "用户不存在")
}
return nil, err
}
return user, nil
}
示例 4: 在代码中提交异步任务
// internal/service/email/service.go
package email
import (
"context"
"encoding/json"
"github.com/break/junhong_cmp_fiber/internal/task"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/queue"
"github.com/hibiken/asynq"
)
type Service struct {
queueClient *queue.Client
logger *zap.Logger
}
// SendWelcomeEmail 发送欢迎邮件(异步)
func (s *Service) SendWelcomeEmail(ctx context.Context, userID uint, email string) error {
// 构造任务载荷
payload := &task.EmailPayload{
RequestID: fmt.Sprintf("welcome-%d", userID),
To: email,
Subject: "欢迎加入",
Body: "感谢您注册我们的服务!",
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return err
}
// 提交任务到队列
err = s.queueClient.EnqueueTask(
ctx,
constants.TaskTypeEmailSend,
payloadBytes,
asynq.Queue(constants.QueueDefault),
asynq.MaxRetry(constants.DefaultRetryMax),
)
if err != nil {
s.logger.Error("提交邮件任务失败",
zap.Uint("user_id", userID),
zap.String("email", email),
zap.Error(err))
return err
}
s.logger.Info("欢迎邮件任务已提交",
zap.Uint("user_id", userID),
zap.String("email", email))
return nil
}
示例 5: 事务处理
// internal/service/order/service.go
package order
// CreateOrderWithUser 创建订单并更新用户统计(事务)
func (s *Service) CreateOrderWithUser(ctx context.Context, req *CreateOrderRequest) (*model.Order, error) {
var order *model.Order
// 使用事务
err := s.store.Transaction(ctx, func(tx *postgres.Store) error {
// 1. 创建订单
order = &model.Order{
OrderID: generateOrderID(),
UserID: req.UserID,
Amount: req.Amount,
Status: constants.OrderStatusPending,
}
if err := tx.Order.Create(ctx, order); err != nil {
return err
}
// 2. 更新用户订单计数
user, err := tx.User.GetByID(ctx, req.UserID)
if err != nil {
return err
}
user.OrderCount++
if err := tx.User.Update(ctx, user); err != nil {
return err
}
return nil // 提交事务
})
if err != nil {
s.logger.Error("创建订单失败",
zap.Uint("user_id", req.UserID),
zap.Error(err))
return nil, err
}
return order, nil
}
监控和调试
查看数据库数据
# 连接数据库
psql -h localhost -p 5432 -U postgres -d junhong_cmp
# 查询用户
SELECT * FROM tb_user;
# 查询订单
SELECT * FROM tb_order WHERE user_id = 1;
# 查看迁移历史
SELECT * FROM schema_migrations;
查看任务队列状态
使用 asynqmon(Web UI)
# 安装 asynqmon
go install github.com/hibiken/asynqmon@latest
# 启动监控面板
asynqmon --redis-addr=localhost:6379
# 访问 http://localhost:8080
# 可以查看:
# - 队列统计
# - 任务状态(pending, active, completed, failed)
# - 重试历史
# - 失败任务详情
使用 Redis CLI
# 查看所有队列
redis-cli KEYS "asynq:*"
# 查看 default 队列长度
redis-cli LLEN "asynq:{default}:pending"
# 查看任务详情
redis-cli HGETALL "asynq:task:{task_id}"
查看日志
# 实时查看应用日志
tail -f logs/app.log | jq .
# 过滤错误日志
tail -f logs/app.log | jq 'select(.level == "error")'
# 查看访问日志
tail -f logs/access.log | jq .
# 过滤慢查询
tail -f logs/app.log | jq 'select(.duration_ms > 100)'
测试
单元测试
# 运行所有测试
go test ./...
# 运行特定包的测试
go test ./internal/store/postgres/...
# 带覆盖率
go test -cover ./...
# 详细输出
go test -v ./...
集成测试
# 运行集成测试(需要 PostgreSQL 和 Redis)
go test -v ./tests/integration/...
# 单独测试数据库功能
go test -v ./tests/integration/database_test.go
# 单独测试任务队列
go test -v ./tests/integration/task_test.go
使用 Testcontainers(推荐)
集成测试会自动启动 PostgreSQL 和 Redis 容器:
// tests/integration/database_test.go
func TestUserCRUD(t *testing.T) {
// 自动启动 PostgreSQL 容器
// 运行测试
// 自动清理容器
}
故障排查
问题 1: 数据库连接失败
错误: dial tcp 127.0.0.1:5432: connect: connection refused
解决方案:
# 检查 PostgreSQL 是否运行
docker ps | grep postgres
# 检查端口占用
lsof -i :5432
# 重启 PostgreSQL
docker restart postgres-dev
问题 2: 迁移失败
错误: Dirty database version 1. Fix and force version.
解决方案:
# 强制设置版本
migrate -path migrations -database "$DATABASE_URL" force 1
# 然后重新运行迁移
migrate -path migrations -database "$DATABASE_URL" up
问题 3: Worker 无法连接 Redis
错误: dial tcp 127.0.0.1:6379: connect: connection refused
解决方案:
# 检查 Redis 是否运行
docker ps | grep redis
# 测试连接
redis-cli ping
# 重启 Redis
docker restart redis-dev
问题 4: 任务一直重试
原因: 任务处理函数返回错误
解决方案:
- 检查 Worker 日志:
tail -f logs/app.log | jq 'select(.level == "error")' - 使用 asynqmon 查看失败详情
- 检查任务幂等性实现
- 验证 Redis 锁键是否正确设置
环境配置
开发环境
export CONFIG_ENV=dev
go run cmd/api/main.go
预发布环境
export CONFIG_ENV=staging
go run cmd/api/main.go
生产环境
export CONFIG_ENV=prod
export DB_PASSWORD=secure_password # 使用环境变量
go run cmd/api/main.go
性能调优建议
数据库连接池
根据服务器资源调整:
database:
max_open_conns: 25 # 增大以支持更多并发
max_idle_conns: 10 # 保持足够的空闲连接
conn_max_lifetime: 5m # 定期回收连接
Worker 并发数
根据任务类型调整:
queue:
concurrency: 20 # I/O 密集型:CPU 核心数 × 2
# concurrency: 8 # CPU 密集型:CPU 核心数
队列优先级
根据业务需求调整:
queue:
queues:
critical: 8 # 提高关键任务权重
default: 2
low: 1
下一步
- 添加业务模型: 参考
internal/model/user.go创建 SIM 卡、订单等业务实体 - 实现业务逻辑: 在 Service 层实现具体业务逻辑
- 添加迁移文件: 使用
./scripts/migrate.sh create添加新表 - 创建异步任务: 参考
internal/task/email.go创建新的任务处理器 - 编写测试: 为所有 Service 层业务逻辑编写单元测试
参考资料
常见问题(FAQ)
Q: 如何添加新的数据库表?
A: 使用 ./scripts/migrate.sh create table_name 创建迁移文件,编辑 SQL,然后运行 ./scripts/migrate.sh up。
Q: 任务失败后会怎样?
A: 根据配置自动重试(默认 5 次,指数退避)。5 次后仍失败会进入死信队列,可在 asynqmon 中查看。
Q: 如何保证任务幂等性?
A: 使用 Redis 锁或数据库唯一约束。参考 research.md 中的幂等性设计模式。
Q: 如何扩展 Worker?
A: 启动多个 Worker 进程(不同机器或容器),连接同一个 Redis。Asynq 自动负载均衡。
Q: 数据库密码如何安全存储?
A: 生产环境使用环境变量:export DB_PASSWORD=xxx,配置文件中使用 ${DB_PASSWORD}。
Q: 如何监控任务执行情况?
A: 使用 asynqmon Web UI 或通过 Redis CLI 查看队列状态。
总结
本指南涵盖了:
- ✅ 环境搭建(PostgreSQL、Redis)
- ✅ 数据库迁移
- ✅ 服务启动(API + Worker)
- ✅ CRUD 操作示例
- ✅ 异步任务提交和处理
- ✅ 事务处理
- ✅ 监控和调试
- ✅ 故障排查
- ✅ 性能调优
推荐开发流程:
- 设计数据模型 → 2. 创建迁移文件 → 3. 实现 Store 层 → 4. 实现 Service 层 → 5. 实现 Handler 层 → 6. 编写测试 → 7. 运行和验证