# Quick Start Guide: 数据持久化与异步任务处理集成 **Feature**: 002-gorm-postgres-asynq **Date**: 2025-11-12 **Purpose**: 快速开始指南和使用示例 ## 概述 本指南帮助开发者快速搭建和使用 GORM + PostgreSQL + Asynq 集成的数据持久化和异步任务处理功能。 --- ## 前置要求 ### 系统要求 - Go 1.25.4+ - PostgreSQL 14+ - Redis 6.0+ - golang-migrate CLI 工具 ### 安装依赖 ```bash # 安装 Go 依赖 go mod tidy # 安装 golang-migrate(macOS) brew install golang-migrate # 安装 golang-migrate(Linux) curl -L https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz | tar xvz sudo mv migrate /usr/local/bin/ # 或使用 Go install go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest ``` --- ## 步骤 1: 启动 PostgreSQL ### 使用 Docker(推荐) ```bash # 启动 PostgreSQL 容器 docker run --name postgres-dev \ -e POSTGRES_USER=postgres \ -e POSTGRES_PASSWORD=password \ -e POSTGRES_DB=junhong_cmp \ -p 5432:5432 \ -d postgres:14 # 验证运行状态 docker ps | grep postgres-dev ``` ### 使用本地安装 ```bash # macOS brew install postgresql@14 brew services start postgresql@14 # 创建数据库 createdb junhong_cmp ``` ### 验证连接 ```bash # 测试连接 psql -h localhost -p 5432 -U postgres -d junhong_cmp # 如果成功,会进入 PostgreSQL 命令行 # 输入 \q 退出 ``` --- ## 步骤 2: 启动 Redis ```bash # 使用 Docker docker run --name redis-dev \ -p 6379:6379 \ -d redis:7-alpine # 或使用本地安装(macOS) brew install redis brew services start redis # 验证 Redis redis-cli ping # 应返回: PONG ``` --- ## 步骤 3: 配置数据库连接 编辑配置文件 `configs/config.yaml`,添加数据库和队列配置: ```yaml # configs/config.yaml # 数据库配置 database: host: localhost port: 5432 user: postgres password: password # 开发环境明文存储,生产环境使用环境变量 dbname: junhong_cmp sslmode: disable # 开发环境禁用 SSL,生产环境使用 require max_open_conns: 25 max_idle_conns: 10 conn_max_lifetime: 5m # 任务队列配置 queue: concurrency: 10 # Worker 并发数 queues: # 队列优先级(权重) critical: 6 # 关键任务:60% default: 3 # 普通任务:30% low: 1 # 低优先级:10% retry_max: 5 # 最大重试次数 timeout: 10m # 任务超时时间 ``` --- ## 步骤 4: 运行数据库迁移 ### 方法 1: 使用迁移脚本(推荐) ```bash # 赋予执行权限 chmod +x scripts/migrate.sh # 向上迁移(应用所有迁移) ./scripts/migrate.sh up # 查看当前版本 ./scripts/migrate.sh version # 回滚最后一次迁移 ./scripts/migrate.sh down 1 # 创建新迁移 ./scripts/migrate.sh create add_sim_table ``` ### 方法 2: 直接使用 migrate CLI ```bash # 设置数据库 URL export DATABASE_URL="postgresql://postgres:password@localhost:5432/junhong_cmp?sslmode=disable" # 向上迁移 migrate -path migrations -database "$DATABASE_URL" up # 查看版本 migrate -path migrations -database "$DATABASE_URL" version ``` ### 验证迁移成功 ```bash # 连接数据库 psql -h localhost -p 5432 -U postgres -d junhong_cmp # 查看表 \dt # 应该看到: # tb_user # tb_order # schema_migrations(由 golang-migrate 创建) # 退出 \q ``` --- ## 步骤 5: 启动 API 服务 ```bash # 从项目根目录运行 go run cmd/api/main.go # 预期输出: # {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432} # {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"} # {"level":"info","timestamp":"...","message":"服务启动成功","host":"0.0.0.0","port":8080} ``` ### 验证 API 服务 ```bash # 测试健康检查 curl http://localhost:8080/health # 预期响应: # { # "status": "ok", # "postgres": "up", # "redis": "up" # } ``` --- ## 步骤 6: 启动 Worker 服务 打开新的终端窗口: ```bash # 从项目根目录运行 go run cmd/worker/main.go # 预期输出: # {"level":"info","timestamp":"...","message":"PostgreSQL 连接成功","host":"localhost","port":5432} # {"level":"info","timestamp":"...","message":"Redis 连接成功","addr":"localhost:6379"} # {"level":"info","timestamp":"...","message":"Worker 启动成功","concurrency":10} ``` --- ## 使用示例 ### 示例 1: 数据库 CRUD 操作 #### 创建用户 ```bash curl -X POST http://localhost:8080/api/v1/users \ -H "Content-Type: application/json" \ -H "token: valid_token_here" \ -d '{ "username": "testuser", "email": "test@example.com", "password": "password123" }' # 响应: # { # "code": 0, # "msg": "success", # "data": { # "id": 1, # "username": "testuser", # "email": "test@example.com", # "status": "active", # "created_at": "2025-11-12T16:00:00+08:00", # "updated_at": "2025-11-12T16:00:00+08:00" # }, # "timestamp": "2025-11-12T16:00:00+08:00" # } ``` #### 查询用户 ```bash curl http://localhost:8080/api/v1/users/1 \ -H "token: valid_token_here" # 响应: # { # "code": 0, # "msg": "success", # "data": { # "id": 1, # "username": "testuser", # "email": "test@example.com", # "status": "active", # ... # } # } ``` #### 更新用户 ```bash curl -X PUT http://localhost:8080/api/v1/users/1 \ -H "Content-Type: application/json" \ -H "token: valid_token_here" \ -d '{ "email": "newemail@example.com", "status": "inactive" }' ``` #### 列表查询(分页) ```bash curl "http://localhost:8080/api/v1/users?page=1&page_size=20" \ -H "token: valid_token_here" # 响应: # { # "code": 0, # "msg": "success", # "data": { # "users": [...], # "page": 1, # "page_size": 20, # "total": 100, # "total_pages": 5 # } # } ``` #### 删除用户(软删除) ```bash curl -X DELETE http://localhost:8080/api/v1/users/1 \ -H "token: valid_token_here" ``` ### 示例 2: 提交异步任务 #### 提交邮件发送任务 ```bash curl -X POST http://localhost:8080/api/v1/tasks/email \ -H "Content-Type: application/json" \ -H "token: valid_token_here" \ -d '{ "to": "user@example.com", "subject": "Welcome", "body": "Welcome to our service!" }' # 响应: # { # "code": 0, # "msg": "任务已提交", # "data": { # "task_id": "550e8400-e29b-41d4-a716-446655440000", # "queue": "default" # } # } ``` #### 提交数据同步任务(高优先级) ```bash curl -X POST http://localhost:8080/api/v1/tasks/sync \ -H "Content-Type: application/json" \ -H "token: valid_token_here" \ -d '{ "sync_type": "sim_status", "start_date": "2025-11-01", "end_date": "2025-11-12", "priority": "critical" }' ``` ### 示例 3: 直接在代码中使用数据库 ```go // internal/service/user/service.go package user import ( "context" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" ) type Service struct { store *postgres.Store logger *zap.Logger } // CreateUser 创建用户 func (s *Service) CreateUser(ctx context.Context, req *model.CreateUserRequest) (*model.User, error) { // 参数验证 if err := validate.Struct(req); err != nil { return nil, err } // 密码哈希 hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost) if err != nil { return nil, err } // 创建用户 user := &model.User{ Username: req.Username, Email: req.Email, Password: string(hashedPassword), Status: constants.UserStatusActive, } if err := s.store.User.Create(ctx, user); err != nil { s.logger.Error("创建用户失败", zap.String("username", req.Username), zap.Error(err)) return nil, err } s.logger.Info("用户创建成功", zap.Uint("user_id", user.ID), zap.String("username", user.Username)) return user, nil } // GetUserByID 根据 ID 获取用户 func (s *Service) GetUserByID(ctx context.Context, id uint) (*model.User, error) { user, err := s.store.User.GetByID(ctx, id) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, errors.New(errors.CodeNotFound, "用户不存在") } return nil, err } return user, nil } ``` ### 示例 4: 在代码中提交异步任务 ```go // internal/service/email/service.go package email import ( "context" "encoding/json" "github.com/break/junhong_cmp_fiber/internal/task" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/queue" "github.com/hibiken/asynq" ) type Service struct { queueClient *queue.Client logger *zap.Logger } // SendWelcomeEmail 发送欢迎邮件(异步) func (s *Service) SendWelcomeEmail(ctx context.Context, userID uint, email string) error { // 构造任务载荷 payload := &task.EmailPayload{ RequestID: fmt.Sprintf("welcome-%d", userID), To: email, Subject: "欢迎加入", Body: "感谢您注册我们的服务!", } payloadBytes, err := json.Marshal(payload) if err != nil { return err } // 提交任务到队列 err = s.queueClient.EnqueueTask( ctx, constants.TaskTypeEmailSend, payloadBytes, asynq.Queue(constants.QueueDefault), asynq.MaxRetry(constants.DefaultRetryMax), ) if err != nil { s.logger.Error("提交邮件任务失败", zap.Uint("user_id", userID), zap.String("email", email), zap.Error(err)) return err } s.logger.Info("欢迎邮件任务已提交", zap.Uint("user_id", userID), zap.String("email", email)) return nil } ``` ### 示例 5: 事务处理 ```go // internal/service/order/service.go package order // CreateOrderWithUser 创建订单并更新用户统计(事务) func (s *Service) CreateOrderWithUser(ctx context.Context, req *CreateOrderRequest) (*model.Order, error) { var order *model.Order // 使用事务 err := s.store.Transaction(ctx, func(tx *postgres.Store) error { // 1. 创建订单 order = &model.Order{ OrderID: generateOrderID(), UserID: req.UserID, Amount: req.Amount, Status: constants.OrderStatusPending, } if err := tx.Order.Create(ctx, order); err != nil { return err } // 2. 更新用户订单计数 user, err := tx.User.GetByID(ctx, req.UserID) if err != nil { return err } user.OrderCount++ if err := tx.User.Update(ctx, user); err != nil { return err } return nil // 提交事务 }) if err != nil { s.logger.Error("创建订单失败", zap.Uint("user_id", req.UserID), zap.Error(err)) return nil, err } return order, nil } ``` --- ## 监控和调试 ### 查看数据库数据 ```bash # 连接数据库 psql -h localhost -p 5432 -U postgres -d junhong_cmp # 查询用户 SELECT * FROM tb_user; # 查询订单 SELECT * FROM tb_order WHERE user_id = 1; # 查看迁移历史 SELECT * FROM schema_migrations; ``` ### 查看任务队列状态 #### 使用 asynqmon(Web UI) ```bash # 安装 asynqmon go install github.com/hibiken/asynqmon@latest # 启动监控面板 asynqmon --redis-addr=localhost:6379 # 访问 http://localhost:8080 # 可以查看: # - 队列统计 # - 任务状态(pending, active, completed, failed) # - 重试历史 # - 失败任务详情 ``` #### 使用 Redis CLI ```bash # 查看所有队列 redis-cli KEYS "asynq:*" # 查看 default 队列长度 redis-cli LLEN "asynq:{default}:pending" # 查看任务详情 redis-cli HGETALL "asynq:task:{task_id}" ``` ### 查看日志 ```bash # 实时查看应用日志 tail -f logs/app.log | jq . # 过滤错误日志 tail -f logs/app.log | jq 'select(.level == "error")' # 查看访问日志 tail -f logs/access.log | jq . # 过滤慢查询 tail -f logs/app.log | jq 'select(.duration_ms > 100)' ``` --- ## 测试 ### 单元测试 ```bash # 运行所有测试 go test ./... # 运行特定包的测试 go test ./internal/store/postgres/... # 带覆盖率 go test -cover ./... # 详细输出 go test -v ./... ``` ### 集成测试 ```bash # 运行集成测试(需要 PostgreSQL 和 Redis) go test -v ./tests/integration/... # 单独测试数据库功能 go test -v ./tests/integration/database_test.go # 单独测试任务队列 go test -v ./tests/integration/task_test.go ``` ### 使用 Testcontainers(推荐) 集成测试会自动启动 PostgreSQL 和 Redis 容器: ```go // tests/integration/database_test.go func TestUserCRUD(t *testing.T) { // 自动启动 PostgreSQL 容器 // 运行测试 // 自动清理容器 } ``` --- ## 故障排查 ### 问题 1: 数据库连接失败 **错误**: `dial tcp 127.0.0.1:5432: connect: connection refused` **解决方案**: ```bash # 检查 PostgreSQL 是否运行 docker ps | grep postgres # 检查端口占用 lsof -i :5432 # 重启 PostgreSQL docker restart postgres-dev ``` ### 问题 2: 迁移失败 **错误**: `Dirty database version 1. Fix and force version.` **解决方案**: ```bash # 强制设置版本 migrate -path migrations -database "$DATABASE_URL" force 1 # 然后重新运行迁移 migrate -path migrations -database "$DATABASE_URL" up ``` ### 问题 3: Worker 无法连接 Redis **错误**: `dial tcp 127.0.0.1:6379: connect: connection refused` **解决方案**: ```bash # 检查 Redis 是否运行 docker ps | grep redis # 测试连接 redis-cli ping # 重启 Redis docker restart redis-dev ``` ### 问题 4: 任务一直重试 **原因**: 任务处理函数返回错误 **解决方案**: 1. 检查 Worker 日志:`tail -f logs/app.log | jq 'select(.level == "error")'` 2. 使用 asynqmon 查看失败详情 3. 检查任务幂等性实现 4. 验证 Redis 锁键是否正确设置 --- ## 环境配置 ### 开发环境 ```bash export CONFIG_ENV=dev go run cmd/api/main.go ``` ### 预发布环境 ```bash export CONFIG_ENV=staging go run cmd/api/main.go ``` ### 生产环境 ```bash export CONFIG_ENV=prod export DB_PASSWORD=secure_password # 使用环境变量 go run cmd/api/main.go ``` --- ## 性能调优建议 ### 数据库连接池 根据服务器资源调整: ```yaml database: max_open_conns: 25 # 增大以支持更多并发 max_idle_conns: 10 # 保持足够的空闲连接 conn_max_lifetime: 5m # 定期回收连接 ``` ### Worker 并发数 根据任务类型调整: ```yaml queue: concurrency: 20 # I/O 密集型:CPU 核心数 × 2 # concurrency: 8 # CPU 密集型:CPU 核心数 ``` ### 队列优先级 根据业务需求调整: ```yaml queue: queues: critical: 8 # 提高关键任务权重 default: 2 low: 1 ``` --- ## 下一步 1. **添加业务模型**: 参考 `internal/model/user.go` 创建 SIM 卡、订单等业务实体 2. **实现业务逻辑**: 在 Service 层实现具体业务逻辑 3. **添加迁移文件**: 使用 `./scripts/migrate.sh create` 添加新表 4. **创建异步任务**: 参考 `internal/task/email.go` 创建新的任务处理器 5. **编写测试**: 为所有 Service 层业务逻辑编写单元测试 --- ## 参考资料 - [GORM 官方文档](https://gorm.io/docs/) - [Asynq 官方文档](https://github.com/hibiken/asynq) - [golang-migrate 文档](https://github.com/golang-migrate/migrate) - [PostgreSQL 文档](https://www.postgresql.org/docs/) - [项目 Constitution](../../.specify/memory/constitution.md) --- ## 常见问题(FAQ) **Q: 如何添加新的数据库表?** A: 使用 `./scripts/migrate.sh create table_name` 创建迁移文件,编辑 SQL,然后运行 `./scripts/migrate.sh up`。 **Q: 任务失败后会怎样?** A: 根据配置自动重试(默认 5 次,指数退避)。5 次后仍失败会进入死信队列,可在 asynqmon 中查看。 **Q: 如何保证任务幂等性?** A: 使用 Redis 锁或数据库唯一约束。参考 `research.md` 中的幂等性设计模式。 **Q: 如何扩展 Worker?** A: 启动多个 Worker 进程(不同机器或容器),连接同一个 Redis。Asynq 自动负载均衡。 **Q: 数据库密码如何安全存储?** A: 生产环境使用环境变量:`export DB_PASSWORD=xxx`,配置文件中使用 `${DB_PASSWORD}`。 **Q: 如何监控任务执行情况?** A: 使用 asynqmon Web UI 或通过 Redis CLI 查看队列状态。 --- ## 总结 本指南涵盖了: - ✅ 环境搭建(PostgreSQL、Redis) - ✅ 数据库迁移 - ✅ 服务启动(API + Worker) - ✅ CRUD 操作示例 - ✅ 异步任务提交和处理 - ✅ 事务处理 - ✅ 监控和调试 - ✅ 故障排查 - ✅ 性能调优 **推荐开发流程**: 1. 设计数据模型 → 2. 创建迁移文件 → 3. 实现 Store 层 → 4. 实现 Service 层 → 5. 实现 Handler 层 → 6. 编写测试 → 7. 运行和验证