Files
huang c665f32976
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m54s
feat: 套餐系统升级 - Worker 重构、流量重置、文档与规范更新
- 重构 Worker 启动流程,引入 bootstrap 模块统一管理依赖注入
- 实现套餐流量重置服务(日/月/年周期重置)
- 新增套餐激活排队、加油包绑定、囤货待实名激活逻辑
- 新增订单创建幂等性防重(Redis 业务键 + 分布式锁)
- 更新 AGENTS.md/CLAUDE.md:新增注释规范、幂等性规范,移除测试要求
- 添加套餐系统升级完整文档(API文档、使用指南、功能总结、运维指南)
- 归档 OpenSpec package-system-upgrade 变更,同步 specs 到主目录
- 新增 queue types 抽象和 Redis 常量定义
2026-02-12 14:24:15 +08:00

16 KiB
Raw Permalink Blame History

Spec: 套餐流量日记录

业务背景

为什么需要流量日记录

现状问题

  • 用户需要查看每日流量使用明细(哪天用了多少流量)
  • 套餐流量重置后,历史使用数据丢失
  • 无法统计和分析用户流量使用趋势
  • 计费对账需要每日流量记录

业务目标

  • 按套餐维度记录每日流量增量
  • 支持按日期范围查询流量详单
  • 流量重置后历史记录仍可查询
  • 为计费对账和数据分析提供基础数据

业务规则

1. 日记录写入规则

每次流量扣减后,写入或更新当日记录:

写入流量日记录:
1. 获取当前日期date=today
2. 查询是否已有今日记录:
   SELECT * FROM package_usage_daily_record
   WHERE package_usage_id=? AND date=today
3. 如果存在 → UPDATE daily_usage_mb += increment
4. 如果不存在 → INSERT (package_usage_id, date, daily_usage_mb, cumulative_usage_mb)
5. 使用 UPSERTON CONFLICT UPDATE确保幂等性

2. 流量增量计算

每日流量增量 = 今日上游返回的累计流量 - 昨日记录的累计流量

特殊情况:
- 如果昨日无记录 → 增量 = 今日上游累计流量
- 如果上游重置(今日累计 < 昨日累计)→ 增量 = 今日上游累计流量

3. cumulative_usage_mb 字段

  • 定义:截止到当日的累计流量
  • 计算规则cumulative_usage_mb = 昨日 cumulative_usage_mb + 今日 daily_usage_mb
  • 首日规则:首日 cumulative_usage_mb = daily_usage_mb

4. 数据保留策略

  • 保留期限永久保留或根据业务需求保留1年/2年
  • 流量重置不删除:套餐流量重置后,日记录仍保留
  • 套餐过期不删除:套餐过期后,日记录仍保留

ADDED Requirements

Requirement: 按套餐维度记录每日流量

系统 SHALL 为每个 PackageUsage 创建每日流量记录PackageUsageDailyRecord记录每天的流量增量。

Scenario: 首次记录当日流量

  • GIVEN 套餐 ID=123 在 2026-02-10 首次产生流量 1.5GB
  • WHEN 流量扣减完成
  • THEN 系统创建 PackageUsageDailyRecord
    • package_usage_id=123
    • date=2026-02-10
    • daily_usage_mb=1536 (1.5GB)
    • cumulative_usage_mb=1536

Scenario: 同一天多次流量更新

  • GIVEN 套餐在 2026-02-10 已记录 1GB 流量
  • WHEN 再产生 0.5GB 流量
  • THEN 系统更新 PackageUsageDailyRecord
    • daily_usage_mb=15361GB+0.5GB
    • cumulative_usage_mb=1536

Scenario: 跨天流量记录

  • GIVEN 套餐在 2026-02-10 使用 2GB
  • AND 2026-02-11 使用 3GB
  • WHEN 流量扣减完成
  • THEN 系统创建两条记录:
    • 2月10日daily_usage_mb=2GB, cumulative_usage_mb=2GB
    • 2月11日daily_usage_mb=3GB, cumulative_usage_mb=5GB

Scenario: 流量重置后日记录仍保留

  • GIVEN 套餐在 2月1日至2月28日有28条日记录
  • WHEN 3月1日 00:00:00 触发流量重置
  • THEN 套餐 data_usage_mb 重置为 0
  • AND 2月的28条日记录仍存在且可查询

Requirement: 流量增量基于上游查询计算

系统 SHALL 根据上游返回的累计流量,减去昨日记录的累计流量,计算每日增量。

Scenario: 计算每日流量增量

  • GIVEN 昨日2月9日记录 cumulative_usage_mb=10GB
  • WHEN 今日2月10日上游返回 cumulative=13GB
  • THEN 今日 daily_usage_mb=3GB13GB - 10GB
  • AND 今日 cumulative_usage_mb=13GB

Scenario: 上游周期重置后流量计算

  • GIVEN 联通卡在 2月27日 00:00:00 上游重置
  • AND 昨日2月26日记录 cumulative_usage_mb=15GB
  • WHEN 今日2月27日上游返回 cumulative=2GB
  • THEN 今日 daily_usage_mb=2GB上游重置取新增量
  • AND 今日 cumulative_usage_mb=2GB

Scenario: 首日无昨日记录

  • GIVEN 套餐首次激活,无任何日记录
  • WHEN 上游返回 cumulative=5GB
  • THEN 今日 daily_usage_mb=5GB
  • AND 今日 cumulative_usage_mb=5GB

Requirement: 支持按日期查询套餐流量详单

系统 SHALL 提供 API 查询指定套餐的每日流量记录。

Scenario: 查询套餐流量详单

  • WHEN 用户通过 GET /api/admin/package-usage/:id/daily-records 查询套餐流量详单
  • THEN 系统返回按日期排序的流量记录列表:
    {
      "code": 200,
      "data": [
        {
          "date": "2026-02-01",
          "daily_usage_mb": 1024,
          "cumulative_usage_mb": 1024
        },
        {
          "date": "2026-02-02",
          "daily_usage_mb": 2048,
          "cumulative_usage_mb": 3072
        }
      ]
    }
    

Scenario: 查询指定日期范围

  • GIVEN 套餐有 2月1日 至 2月28日 的流量记录
  • WHEN 用户查询流量详单,参数 start_date=2026-02-01, end_date=2026-02-10
  • THEN 系统返回 2月1日 至 2月10日 的流量记录10条

Scenario: 客户端查询自己的流量详单

  • WHEN 客户通过 GET /api/customer/package-usage/:id/daily-records 查询
  • THEN 系统校验套餐归属后,返回流量记录列表

Requirement: 日记录索引优化

系统 SHALL 在 PackageUsageDailyRecord 表创建 (package_usage_id, date) 联合唯一索引。

Scenario: 同一套餐同一天只有一条记录

  • WHEN 系统尝试为同一 package_usage_id=123 和 date=2026-02-10 创建第二条记录
  • THEN 数据库返回唯一约束冲突错误
  • AND 使用 UPSERT 自动转为 UPDATE 操作

Scenario: 查询性能达标

  • GIVEN 套餐 ID=123 有 365 条日记录(一年数据)
  • WHEN 查询全部流量详单
  • THEN 查询响应时间 < 50ms

边界条件

1. 套餐过期后的日记录

  • 场景:套餐在 2月28日过期3月1日仍可查询历史日记录
  • 处理:日记录永久保留,不随套餐过期删除

2. 并发写入同一天记录

  • 场景:同一套餐在同一天有多个并发流量扣减请求
  • 处理:使用 UPSERTON CONFLICT UPDATE确保幂等性

3. 跨月查询日记录

  • 场景:查询 1月15日 至 2月15日 的日记录(跨月)
  • 处理:按日期范围查询,返回跨月数据

并发场景

Scenario: 并发写入同一天记录

  • GIVEN 套餐 ID=123 在 2026-02-10 10:00:00 和 10:00:01 同时扣减流量
  • WHEN 两个请求同时写入日记录
  • THEN 使用 UPSERTON CONFLICT UPDATE
    INSERT INTO package_usage_daily_record (package_usage_id, date, daily_usage_mb, cumulative_usage_mb)
    VALUES (123, '2026-02-10', 1024, 1024)
    ON CONFLICT (package_usage_id, date)
    DO UPDATE SET
      daily_usage_mb = package_usage_daily_record.daily_usage_mb + EXCLUDED.daily_usage_mb,
      cumulative_usage_mb = package_usage_daily_record.cumulative_usage_mb + EXCLUDED.daily_usage_mb;
    
  • AND 两个请求的流量累加到同一条记录

异常处理

1. 日记录写入失败

  • 错误场景:流量扣减成功,但日记录写入失败(数据库连接断开)
  • 处理流程
    1. 不回滚流量扣减(已提交)
    2. 记录 Error 日志包含套餐ID、日期、流量增量
    3. 通过定时任务补录日记录
  • 返回错误:不影响用户,日记录补录在后台进行

2. 查询日记录超时

  • 错误场景查询大量日记录时超时如查询3年数据
  • 处理流程
    1. 限制单次查询最多返回 365 条记录
    2. 如果超过限制,返回错误 400"查询日期范围过大最多查询1年"
  • 返回错误{"code": "DATE_RANGE_TOO_LARGE", "msg": "查询日期范围过大最多查询1年"}

数据一致性保证

1. 事务边界

  • 流量扣减 + 写入日记录:使用单个事务(可选,根据业务需求)
  • 查询日记录:使用只读事务

2. 唯一索引

  • 联合唯一索引UNIQUE INDEX idx_package_usage_daily_record (package_usage_id, date)
  • 确保同一套餐同一天只有一条记录

3. UPSERT 幂等性

  • 使用 ON CONFLICT UPDATE:确保并发写入时累加流量而非覆盖

性能指标

操作 目标响应时间 并发要求 数据量
写入日记录UPSERT < 10ms 1000 QPS 单条插入/更新
查询日记录(单套餐) < 50ms 100 QPS 查询365条记录
查询日记录(日期范围) < 100ms 100 QPS 查询指定范围

错误码定义

错误码 HTTP 状态码 错误消息 场景
DATE_RANGE_TOO_LARGE 400 查询日期范围过大最多查询1年 查询日记录日期范围超过365天
DAILY_RECORD_NOT_FOUND 404 未找到流量记录 查询不存在的日记录

数据迁移策略

激进策略(开发阶段,保证干净性):

1. 要删除的字段

目前 package_usage_daily_record 表中可能存在的冗余字段(需确认后删除):

  • 如果有 daily_increment 字段(旧的增量字段) → 删除,统一使用 daily_usage_mb
  • 如果有 total_usage 字段(旧的累计字段) → 删除,统一使用 cumulative_usage_mb

2. 新增的字段

package_usage_daily_record 表中确保有以下字段:

CREATE TABLE IF NOT EXISTS package_usage_daily_record (
    id BIGSERIAL PRIMARY KEY,
    package_usage_id BIGINT NOT NULL COMMENT '套餐使用记录ID',
    date DATE NOT NULL COMMENT '日期',
    daily_usage_mb INT DEFAULT 0 COMMENT '当日流量使用量MB',
    cumulative_usage_mb BIGINT DEFAULT 0 COMMENT '截止当日的累计流量MB',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY idx_package_usage_daily_record (package_usage_id, date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='套餐流量日记录';

CREATE INDEX idx_date ON package_usage_daily_record(date);

3. 要废弃的逻辑

  • 废弃旧的日记录写入逻辑:如果代码中存在不使用 UPSERT 的写入逻辑,全部删除
  • 废弃旧的日记录查询逻辑:统一使用新的查询接口

4. 历史数据强制转换

-- Step 1: 如果有旧的字段名,重命名
-- ALTER TABLE package_usage_daily_record CHANGE daily_increment daily_usage_mb INT;
-- ALTER TABLE package_usage_daily_record CHANGE total_usage cumulative_usage_mb BIGINT;

-- Step 2: 修复 cumulative_usage_mb如果历史数据不准确
-- 重新计算每个套餐的 cumulative_usage_mb
-- 需要按套餐ID分组按日期排序累加 daily_usage_mb

-- Step 3: 确保唯一索引存在
CREATE UNIQUE INDEX IF NOT EXISTS idx_package_usage_daily_record
ON package_usage_daily_record(package_usage_id, date);

5. 删除遗留表/字段(确认后执行)

-- 如果存在旧的日记录表,删除
-- DROP TABLE IF EXISTS iot_card_usage_daily;

-- 如果存在旧的字段,删除
-- ALTER TABLE package_usage_daily_record DROP COLUMN IF EXISTS daily_increment;
-- ALTER TABLE package_usage_daily_record DROP COLUMN IF EXISTS total_usage;

6. 验证步骤

-- 验证1所有日记录都有 daily_usage_mb 和 cumulative_usage_mb
SELECT COUNT(*)
FROM package_usage_daily_record
WHERE daily_usage_mb IS NULL OR cumulative_usage_mb IS NULL;
-- 预期结果0

-- 验证2同一套餐同一天只有一条记录
SELECT package_usage_id, date, COUNT(*)
FROM package_usage_daily_record
GROUP BY package_usage_id, date
HAVING COUNT(*) > 1;
-- 预期结果0 rows

-- 验证3累计流量单调递增同一套餐
-- (需要编写复杂查询验证,略)

测试场景矩阵

场景分类 测试用例 预期结果
写入日记录 首次记录当日流量 创建新记录
同一天多次流量更新 更新已有记录UPSERT
跨天流量记录 创建多条记录
流量增量计算 计算每日流量增量 daily_usage_mb = 今日累计 - 昨日累计
上游周期重置后计算 daily_usage_mb = 今日累计(重置后)
首日无昨日记录 daily_usage_mb = 今日累计
查询日记录 查询套餐流量详单 返回按日期排序的记录列表
查询指定日期范围 返回指定范围内的记录
客户端查询自己的详单 校验归属后返回
索引和性能 同一套餐同一天只有一条记录 唯一约束保证
查询365条记录 响应时间 < 50ms
并发 并发写入同一天记录 UPSERT 确保累加
异常 日记录写入失败 不回滚流量扣减,后台补录
查询日记录超时 限制日期范围,返回错误

实现参考

写入日记录UPSERT

// Service 层RecordDailyUsage
func (s *Service) RecordDailyUsage(ctx context.Context, usageID uint, date time.Time, dailyUsageMB int, cumulativeUsageMB int64) error {
    record := &model.PackageUsageDailyRecord{
        PackageUsageID:    usageID,
        Date:              date,
        DailyUsageMB:      dailyUsageMB,
        CumulativeUsageMB: cumulativeUsageMB,
    }

    if err := s.store.UpsertDailyRecord(ctx, record); err != nil {
        return errors.Wrap(errors.CodeInternalError, err, "写入流量日记录失败")
    }

    return nil
}

// Store 层UpsertDailyRecord
func (s *Store) UpsertDailyRecord(ctx context.Context, record *model.PackageUsageDailyRecord) error {
    // PostgreSQL UPSERT
    return s.db.WithContext(ctx).Exec(`
        INSERT INTO package_usage_daily_record (package_usage_id, date, daily_usage_mb, cumulative_usage_mb, created_at, updated_at)
        VALUES (?, ?, ?, ?, NOW(), NOW())
        ON CONFLICT (package_usage_id, date)
        DO UPDATE SET
            daily_usage_mb = package_usage_daily_record.daily_usage_mb + EXCLUDED.daily_usage_mb,
            cumulative_usage_mb = package_usage_daily_record.cumulative_usage_mb + (EXCLUDED.daily_usage_mb),
            updated_at = NOW()
    `, record.PackageUsageID, record.Date, record.DailyUsageMB, record.CumulativeUsageMB).Error
}

查询日记录

// Handler: GetDailyRecords
func (h *Handler) GetDailyRecords(c *fiber.Ctx) error {
    usageID, _ := c.ParamsInt("id")
    startDate := c.Query("start_date", "")
    endDate := c.Query("end_date", "")

    // 查询日记录
    records, err := h.service.GetDailyRecords(c.UserContext(), uint(usageID), startDate, endDate)
    if err != nil {
        return err
    }

    return response.Success(c, records)
}

// Service 层GetDailyRecords
func (s *Service) GetDailyRecords(ctx context.Context, usageID uint, startDate, endDate string) ([]*model.PackageUsageDailyRecord, error) {
    // 参数校验
    start, err := time.Parse("2006-01-02", startDate)
    if err != nil {
        return nil, errors.New(errors.CodeInvalidParam, "起始日期格式错误")
    }

    end, err := time.Parse("2006-01-02", endDate)
    if err != nil {
        return nil, errors.New(errors.CodeInvalidParam, "结束日期格式错误")
    }

    // 限制查询范围
    if end.Sub(start).Hours() > 365*24 {
        return nil, errors.New(errors.CodeInvalidParam, "查询日期范围过大最多查询1年")
    }

    // 查询日记录
    return s.store.ListDailyRecords(ctx, usageID, start, end)
}

// Store 层ListDailyRecords
func (s *Store) ListDailyRecords(ctx context.Context, usageID uint, startDate, endDate time.Time) ([]*model.PackageUsageDailyRecord, error) {
    var records []*model.PackageUsageDailyRecord
    err := s.db.WithContext(ctx).
        Where("package_usage_id = ? AND date >= ? AND date <= ?", usageID, startDate, endDate).
        Order("date ASC").
        Find(&records).Error
    return records, err
}

本 Spec 完成,包含:

  • 业务背景和业务规则
  • 详细场景(写入、查询、增量计算)
  • 边界条件和并发场景
  • 异常处理和数据一致性保证
  • 性能指标和错误码定义
  • 激进的数据迁移策略(明确删除字段、废弃逻辑、强制转换)
  • 测试场景矩阵和实现参考