Files
junhong_cmp_fiber/docs/002-gorm-postgres-asynq/使用指南.md
huang 984ccccc63 docs(constitution): 新增数据库设计原则(v2.4.0)
在项目宪章中新增第九条原则"数据库设计原则",明确禁止使用数据库外键约束和ORM关联标签。

主要变更:
- 新增原则IX:数据库设计原则(Database Design Principles)
- 强制要求:数据库表不得使用外键约束
- 强制要求:GORM模型不得使用ORM关联标签(foreignKey、hasMany等)
- 强制要求:表关系必须通过ID字段手动维护
- 强制要求:关联数据查询必须显式编写,避免ORM魔法
- 强制要求:时间字段由GORM处理,不使用数据库触发器

设计理念:
- 提升业务逻辑灵活性(无数据库约束限制)
- 优化高并发性能(无外键检查开销)
- 增强代码可读性(显式查询,无隐式预加载)
- 简化数据库架构和迁移流程
- 支持分布式和微服务场景

版本升级:2.3.0 → 2.4.0(MINOR)
2025-11-13 13:40:19 +08:00

12 KiB
Raw Blame History

数据持久化与异步任务处理集成 - 使用指南

功能编号: 002-gorm-postgres-asynq
更新日期: 2025-11-13


快速开始

详细的快速开始指南请参考:Quick Start Guide

本文档提供核心使用场景和最佳实践。


核心使用场景

1. 数据库 CRUD 操作

创建用户

curl -X POST http://localhost:8080/api/v1/users \
  -H "Content-Type: application/json" \
  -H "token: your_token" \
  -d '{
    "username": "testuser",
    "email": "test@example.com",
    "password": "password123"
  }'

查询用户

# 根据 ID 查询
curl http://localhost:8080/api/v1/users/1 \
  -H "token: your_token"

# 列表查询(分页)
curl "http://localhost:8080/api/v1/users?page=1&page_size=20" \
  -H "token: your_token"

更新用户

curl -X PUT http://localhost:8080/api/v1/users/1 \
  -H "Content-Type: application/json" \
  -H "token: your_token" \
  -d '{
    "email": "newemail@example.com",
    "status": "inactive"
  }'

删除用户(软删除)

curl -X DELETE http://localhost:8080/api/v1/users/1 \
  -H "token: your_token"

2. 异步任务提交

发送邮件任务

curl -X POST http://localhost:8080/api/v1/tasks/email \
  -H "Content-Type: application/json" \
  -H "token: your_token" \
  -d '{
    "to": "user@example.com",
    "subject": "欢迎",
    "body": "欢迎使用君鸿卡管系统"
  }'

数据同步任务

curl -X POST http://localhost:8080/api/v1/tasks/sync \
  -H "Content-Type: application/json" \
  -H "token: your_token" \
  -d '{
    "sync_type": "sim_status",
    "start_date": "2025-11-01",
    "end_date": "2025-11-13",
    "priority": "critical"
  }'

3. 代码中使用

Service 层使用数据库

package user

import (
    "context"
    "github.com/break/junhong_cmp_fiber/internal/model"
    "github.com/break/junhong_cmp_fiber/internal/store/postgres"
)

type Service struct {
    store  *postgres.Store
    logger *zap.Logger
}

// 创建用户
func (s *Service) CreateUser(ctx context.Context, req *model.CreateUserRequest) (*model.User, error) {
    user := &model.User{
        Username: req.Username,
        Email:    req.Email,
        Password: hashPassword(req.Password),
        Status:   constants.UserStatusActive,
    }
    
    if err := s.store.User.Create(ctx, user); err != nil {
        return nil, err
    }
    
    return user, nil
}

// 事务处理
func (s *Service) CreateOrderWithUser(ctx context.Context, req *CreateOrderRequest) error {
    return s.store.Transaction(ctx, func(tx *postgres.Store) error {
        // 创建订单
        order := &model.Order{...}
        if err := tx.Order.Create(ctx, order); err != nil {
            return err
        }
        
        // 更新用户统计
        user, _ := tx.User.GetByID(ctx, req.UserID)
        user.OrderCount++
        if err := tx.User.Update(ctx, user); err != nil {
            return err
        }
        
        return nil // 提交事务
    })
}

Service 层提交异步任务

package email

import (
    "context"
    "github.com/break/junhong_cmp_fiber/internal/task"
    "github.com/break/junhong_cmp_fiber/pkg/queue"
)

type Service struct {
    queueClient *queue.Client
    logger      *zap.Logger
}

// 发送欢迎邮件
func (s *Service) SendWelcomeEmail(ctx context.Context, userID uint, email string) error {
    payload := &task.EmailPayload{
        RequestID: fmt.Sprintf("welcome-%d", userID),
        To:        email,
        Subject:   "欢迎加入",
        Body:      "感谢您注册我们的服务!",
    }
    
    payloadBytes, _ := json.Marshal(payload)
    
    return s.queueClient.EnqueueTask(
        ctx,
        constants.TaskTypeEmailSend,
        payloadBytes,
        asynq.Queue(constants.QueueDefault),
        asynq.MaxRetry(constants.DefaultRetryMax),
    )
}

配置管理

环境配置文件

configs/
├── config.yaml         # 默认配置
├── config.dev.yaml     # 开发环境
├── config.staging.yaml # 预发布环境
└── config.prod.yaml    # 生产环境

切换环境

# 开发环境
export CONFIG_ENV=dev
go run cmd/api/main.go

# 生产环境
export CONFIG_ENV=prod
export DB_PASSWORD=secure_password  # 使用环境变量覆盖密码
go run cmd/api/main.go

数据库配置

database:
  host: localhost
  port: 5432
  user: postgres
  password: password      # 生产环境使用 ${DB_PASSWORD}
  dbname: junhong_cmp
  sslmode: disable        # 生产环境使用 require
  max_open_conns: 25
  max_idle_conns: 10
  conn_max_lifetime: 5m

队列配置

queue:
  concurrency: 10         # Worker 并发数
  queues:
    critical: 6           # 高优先级60%
    default: 3            # 默认优先级30%
    low: 1                # 低优先级10%
  retry_max: 5
  timeout: 10m

数据库迁移

使用迁移脚本

# 赋予执行权限
chmod +x scripts/migrate.sh

# 向上迁移(应用所有迁移)
./scripts/migrate.sh up

# 回滚最后一次迁移
./scripts/migrate.sh down 1

# 查看当前版本
./scripts/migrate.sh version

# 创建新迁移
./scripts/migrate.sh create add_new_table

创建迁移文件

# 1. 创建迁移
./scripts/migrate.sh create add_sim_card_table

# 2. 编辑生成的文件
# migrations/000002_add_sim_card_table.up.sql
# migrations/000002_add_sim_card_table.down.sql

# 3. 执行迁移
./scripts/migrate.sh up

监控与调试

健康检查

curl http://localhost:8080/health

响应示例:

{
  "status": "healthy",
  "timestamp": "2025-11-13T12:00:00+08:00",
  "services": {
    "postgres": {
      "status": "up",
      "open_conns": 5,
      "in_use": 2,
      "idle": 3
    },
    "redis": {
      "status": "up",
      "total_conns": 10,
      "idle_conns": 7
    }
  }
}

查看任务队列状态

使用 asynqmon推荐

# 安装
go install github.com/hibiken/asynqmon@latest

# 启动监控面板
asynqmon --redis-addr=localhost:6379

# 访问 http://localhost:8080

使用 Redis CLI

# 查看所有队列
redis-cli KEYS "asynq:*"

# 查看 default 队列长度
redis-cli LLEN "asynq:{default}:pending"

# 查看任务详情
redis-cli HGETALL "asynq:task:{task_id}"

查看日志

# 实时查看应用日志
tail -f logs/app.log | jq .

# 过滤错误日志
tail -f logs/app.log | jq 'select(.level == "error")'

# 查看访问日志
tail -f logs/access.log | jq .

# 过滤慢查询(> 100ms
tail -f logs/app.log | jq 'select(.duration_ms > 100)'

性能调优

数据库连接池

根据服务器资源调整:

database:
  max_open_conns: 50     # 增大以支持更多并发
  max_idle_conns: 20     # 保持足够的空闲连接
  conn_max_lifetime: 5m  # 定期回收连接

计算公式

max_open_conns = (可用内存 / 10MB) * 0.7

Worker 并发数

根据任务类型调整:

queue:
  concurrency: 20        # I/O 密集型CPU 核心数 × 2
  # concurrency: 8       # CPU 密集型CPU 核心数

队列优先级

根据业务需求调整:

queue:
  queues:
    critical: 8          # 提高关键任务权重
    default: 2
    low: 1

故障排查

问题 1: 数据库连接失败

错误: dial tcp 127.0.0.1:5432: connect: connection refused

解决方案:

# 1. 检查 PostgreSQL 是否运行
docker ps | grep postgres

# 2. 检查端口占用
lsof -i :5432

# 3. 重启 PostgreSQL
docker restart postgres-dev

问题 2: Worker 无法连接 Redis

错误: dial tcp 127.0.0.1:6379: connect: connection refused

解决方案:

# 1. 检查 Redis 是否运行
docker ps | grep redis

# 2. 测试连接
redis-cli ping

# 3. 重启 Redis
docker restart redis-dev

问题 3: 任务一直重试

原因: 任务处理函数返回错误

解决方案:

  1. 检查 Worker 日志:tail -f logs/app.log | jq 'select(.level == "error")'
  2. 使用 asynqmon 查看失败详情
  3. 检查任务幂等性实现
  4. 验证 Redis 锁键是否正确设置

问题 4: 数据库迁移失败

错误: Dirty database version 1. Fix and force version.

解决方案:

# 1. 强制设置版本
export DATABASE_URL="postgresql://user:password@localhost:5432/dbname?sslmode=disable"
migrate -path migrations -database "$DATABASE_URL" force 1

# 2. 重新运行迁移
./scripts/migrate.sh up

最佳实践

1. 数据库操作

  • 使用 GORM 的参数化查询(自动防 SQL 注入)
  • 事务尽量快(< 50ms避免长事务锁表
  • 批量操作使用 CreateInBatches() 提高性能
  • 列表查询实现分页(默认 20 条,最大 100 条)
  • 避免使用 db.Raw() 拼接 SQL

2. 异步任务

  • 任务处理函数必须幂等
  • 使用 Redis 锁或数据库唯一约束防重复执行
  • 关键任务使用 critical 队列
  • 设置合理的超时时间
  • 避免在任务中执行长时间阻塞操作

3. 错误处理

  • Service 层转换为业务错误码
  • Handler 层使用统一响应格式
  • 记录详细的错误日志
  • 避免滥用 panic

4. 日志记录

  • 使用结构化日志Zap
  • 日志消息使用中文
  • 敏感信息不输出到日志(如密码)
  • 记录关键操作(创建、更新、删除)

部署建议

Docker Compose 部署

version: '3.8'
services:
  postgres:
    image: postgres:14
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: junhong_cmp
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  api:
    build: .
    command: ./bin/api
    ports:
      - "8080:8080"
    depends_on:
      - postgres
      - redis
    environment:
      - CONFIG_ENV=prod
      - DB_PASSWORD=${DB_PASSWORD}

  worker:
    build: .
    command: ./bin/worker
    depends_on:
      - postgres
      - redis
    environment:
      - CONFIG_ENV=prod
      - DB_PASSWORD=${DB_PASSWORD}

volumes:
  postgres_data:

生产环境检查清单

  • 使用环境变量存储敏感信息
  • 数据库启用 SSL 连接
  • 配置连接池参数
  • 启用访问日志和错误日志
  • 配置日志轮转(防止磁盘满)
  • 设置健康检查端点
  • 配置优雅关闭SIGTERM
  • 准备数据库备份策略
  • 配置监控和告警

参考文档


常见问题FAQ

Q: 如何添加新的数据库表?
A: 使用 ./scripts/migrate.sh create table_name 创建迁移文件,编辑 SQL然后运行 ./scripts/migrate.sh up

Q: 任务失败后会怎样?
A: 根据配置自动重试(默认 5 次指数退避。5 次后仍失败会进入死信队列,可在 asynqmon 中查看。

Q: 如何保证任务幂等性?
A: 使用 Redis 锁或数据库唯一约束。参考 internal/task/email.go 中的实现。

Q: 如何扩展 Worker
A: 启动多个 Worker 进程(不同机器或容器),连接同一个 Redis。Asynq 自动负载均衡。

Q: 如何监控任务执行情况?
A: 使用 asynqmon Web UI 或通过 Redis CLI 查看队列状态。


文档维护: 如使用方法有变更,请同步更新本文档。