Files
junhong_cmp_fiber/openspec/changes/archive/2026-02-10-polling-system-implementation/design.md
huang 804145332b
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 44s
chore: 归档轮询系统实现变更 (polling-system-implementation)
已完成千万级卡规模轮询系统的完整实现和集成测试验证,将变更归档到 openspec/changes/archive/2026-02-10-polling-system-implementation/

主要成果:
- 三大轮询任务:实名检查、卡流量检查、套餐流量检查
- 快速启动(<10秒)和渐进式初始化
- 完整运营工具:配置管理、并发控制、监控面板、告警系统、数据清理、手动触发
- 任务完成度:215/216(99.5%)
- 所有 24 个新增接口已生成 OpenAPI 文档

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-10 10:28:47 +08:00

22 KiB
Raw Blame History

Context

背景

系统当前管理 1000 万+的 IoT 卡资产,需要定期检查:

  1. 实名状态未实名卡需要高频检查30-60秒已实名卡低频检查1小时
  2. 流量使用:已激活卡需要监控流量消耗,防止超额使用
  3. 套餐流量:检查套餐是否用完或过期,及时停机

当前状态

  • 已有 Gateway Client 封装(internal/gateway),提供实名查询、流量查询、停复机等 HTTP 接口
  • 已有 Asynq 任务队列基础设施(pkg/queue
  • 已有 IoT 卡、套餐、设备等数据模型
  • 缺失:轮询调度机制,无法自动定期检查,依赖人工或外部触发

约束

  • 规模约束1000万+ 卡量,未来持续增长
  • 性能约束
    • 数据库查询延迟 < 50ms
    • Redis 内存配置16 GB
    • Gateway API 无明确限流,但需控制并发避免打挂
  • 业务约束
    • 不同卡状态需要不同轮询策略(梯度配置)
    • Gateway 返回的流量是自然月总量每月1号重置
    • 行业卡无需实名检查
    • 并发数需要动态调整,无需重启
  • 架构约束:严格遵守 Handler → Service → Store → Model 分层

利益相关方

  • 运营团队:需要轮询配置管理接口,调整检查策略
  • 开发团队:需要监控面板,查看轮询任务执行情况
  • 运维团队:需要告警机制,及时发现问题

Goals / Non-Goals

Goals目标

  1. 高性能轮询调度支持百万级卡的高效调度Worker 启动时间 < 10秒
  2. 灵活配置管理:支持按卡状态、卡类型、运营商配置不同的轮询策略
  3. 动态并发控制:支持实时调整并发数,无需重启 Worker
  4. 准确的流量计算:正确处理 Gateway 返回的月总量,计算跨月流量
  5. 完善的监控告警:实时监控队列状态、任务执行情况,支持告警通知
  6. 数据生命周期管理:定期清理历史数据,避免数据膨胀

Non-Goals非目标

  1. 不支持分布式调度(单 Worker 进程调度,多 Worker 并发执行任务)
  2. 不支持实时流量监控(轮询间隔最短 30 秒,非实时)
  3. 不实现 Gateway API 限流(在并发控制层面控制调用频率)
  4. 不支持跨运营商批量查询Gateway API 当前不支持)
  5. 不支持历史流量数据分析报表(只记录原始数据,报表需单独开发)

Decisions

决策 1使用 Redis Sorted Set 实现轮询队列

问题:百万级卡量如何高效调度?

选择:使用 Redis Sorted Set 存储 {card_id: next_check_timestamp}Score 为下次检查的 Unix 时间戳。

理由

  • 性能Redis Sorted Set 的 ZRANGEBYSCORE 操作时间复杂度 O(log(N)+M),可以高效查询到期的卡
  • 内存可控1000万卡 × 20字节Score + Member≈ 200 MB三个队列共 600 MB可接受
  • 自然语义Score 即下次检查时间,直观且易于调试

替代方案

  • 数据库轮询SELECT * FROM iot_cards WHERE last_check_at <= NOW() - interval
    • 问题:百万行扫描,即使有索引也慢;高频查询打爆数据库
  • Redis List:只能 FIFO无法按时间排序
  • 延迟队列DelayQueue:需要额外组件,增加复杂度

权衡

  • 高性能,低延迟
  • 易于实现优先级Score 越小越优先)
  • ⚠️ Redis 内存占用增加(但在可接受范围内)
  • ⚠️ 需要保持 Redis 和数据库数据一致性(通过定期同步和懒加载机制)

决策 2渐进式初始化 + 懒加载

问题1000万卡全量初始化到 Redis 需要 10-20 分钟Worker 启动时间太长。

选择:三阶段初始化策略

阶段 1快速启动10秒内

  • 只加载轮询配置到 Redis
  • 启动调度器 Goroutine
  • Worker 进程立即可用

阶段 2后台渐进式初始化20-30分钟

  • 异步任务分批加载卡数据(每批 10万张
  • 每批处理后 sleep 1秒避免打爆数据库
  • 使用游标(主键范围)而不是 OFFSET提升性能
  • 进度存储在 Redis支持断点续传

阶段 3懒加载机制运行时

  • 如果卡未初始化但被触发操作API 调用、手动触发),实时加载
  • 保证热点卡优先初始化

理由

  • 快速启动Worker 10秒可用不阻塞服务
  • 平缓负载:数据库压力平滑,不会突发高峰
  • 支持中断恢复Worker 重启不会重新初始化
  • 热点优先:频繁访问的卡优先加载

替代方案

  • 全量初始化:启动时间 10-20 分钟,不可接受
  • 完全懒加载:第一次访问时加载,会有延迟

权衡

  • 启动快速,用户体验好
  • 数据库负载平滑
  • ⚠️ 初始化期间,部分卡可能还未入队(通过懒加载补偿)
  • ⚠️ 增加系统复杂度(需要管理初始化进度)

决策 3自定义并发控制而非 Asynq 原生并发

问题:需要动态调整并发数(通过管理接口),但 Asynq 的并发数在启动时固定。

选择:基于 Redis 信号量自定义并发控制。

实现

// 获取信号量
maxConcurrency := redis.Get("polling:concurrency:config:realname")
current := redis.Incr("polling:concurrency:current:realname")
if current > maxConcurrency {
    redis.Decr("polling:concurrency:current:realname")
    return false  // 并发已满
}

// 执行任务
defer redis.Decr("polling:concurrency:current:realname")

理由

  • 动态调整:管理员可以通过接口实时修改并发数,立即生效
  • 分类控制:不同类型任务(实名、流量、套餐)独立配置并发数
  • 简单实现:基于 Redis 原子操作,无需复杂分布式锁

替代方案

  • Asynq 原生并发控制:启动时固定,需要重启 Worker 才能调整
  • 信号 + 优雅重启:修改配置后发送 SIGHUP 重启 Worker
    • 问题:重启有服务中断风险,操作复杂

权衡

  • 实时调整,无需重启
  • 灵活性高,支持精细化控制
  • ⚠️ 需要在每个 Handler 开头获取信号量(轻微性能开销)
  • ⚠️ 如果 Redis 故障,并发控制失效(通过默认值兜底)

决策 4跨月流量计算方案

问题Gateway 返回的是自然月总量每月1号重置如何计算增量和累计流量

选择:在 iot_cards 表增加三个字段:

  • current_month_usage_mb:本月已用流量
  • current_month_start_date:本月开始日期
  • last_month_total_mb:上月结束时的总流量

流程

1. 查询 Gateway 获取本月总量(如 1024 MB
2. 判断是否跨月:
   - current_month_start_date != 本月1号 → 跨月了
3. 如果跨月:
   - 增量 = last_month_total_mb + current_month_total_mb
   - 更新 last_month_total_mb = current_month_usage_mb上月结束值
   - 更新 current_month_start_date = 本月1号
   - 更新 current_month_usage_mb = 当前值
4. 如果同月:
   - 增量 = 当前值 - current_month_usage_mb
   - 更新 current_month_usage_mb = 当前值
5. 累计流量 += 增量

理由

  • 准确计算:即使跨月时未轮询到,也不会漏掉上月最后的流量
  • 简单实现:只需要三个字段,逻辑清晰
  • 支持调试:保留月度数据,便于排查问题

替代方案

  • 记录上次查询值:如果跨月时未轮询,会漏掉上月最后的流量
  • 根据激活日期计算账单周期Gateway 返回的是自然月,不是账单周期

权衡

  • 计算准确,不漏流量
  • 支持跨月检测
  • ⚠️ 增加三个数据库字段(开销很小)

决策 5套餐检查混合模式即时 + 定期)

问题:套餐流量检查何时触发?

选择:混合模式

  1. 即时触发:卡流量检查完成后,立即触发关联套餐的检查
  2. 定期扫描Scheduler 定期扫描所有生效中的套餐(兜底)

理由

  • 实时性:流量增加后立即检查套餐,超额立即停机
  • 可靠性:定期扫描兜底,避免漏检(比如卡流量检查失败)

替代方案

  • 只即时触发:如果卡流量检查失败,套餐永远不会检查
  • 只定期扫描:实时性差,超额后延迟停机

权衡

  • 实时性好,可靠性高
  • ⚠️ 可能有重复检查(但套餐检查逻辑幂等,无影响)

决策 6轮询配置匹配机制

问题:一张卡可能匹配多个配置(如"未实名卡"和"未实名移动卡"),如何选择?

选择:优先级机制(数字越小优先级越高)

匹配规则

  1. 查询所有启用的配置(status = 1),按 priority ASC 排序
  2. 逐个检查配置的匹配条件:
    • card_condition卡状态条件not_real_name/real_name/activated/suspended
    • card_category卡业务类型normal/industry
    • carrier_id:运营商 ID
  3. 返回第一个匹配的配置

示例

配置 1未实名移动卡priority=10
配置 2未实名卡priority=20

卡A未实名 + 移动 → 匹配配置1优先级更高
卡B未实名 + 联通 → 匹配配置2

理由

  • 灵活性:可以针对特定运营商设置特殊策略
  • 简单实现:优先级排序,第一个匹配即返回
  • 易于调试:配置优先级清晰可见

替代方案

  • 最精确匹配:条件最多的配置优先
    • 问题:定义"精确度"复杂,难以理解
  • 多配置合并:同时应用多个配置
    • 问题:合并逻辑复杂,冲突难以处理

权衡

  • 简单直观,易于理解
  • 灵活性高,支持特殊策略
  • ⚠️ 配置顺序很重要,需要文档说明

决策 7卡生命周期管理

问题:新增、删除、状态变更的卡如何同步到轮询系统?

选择:在 Service 层集成 PollingService提供生命周期回调

  • OnCardCreated(card):新卡创建时调用
  • OnBatchCardsCreated(cards):批量卡导入时调用
  • OnCardStatusChanged(cardID):卡状态变化时调用
  • OnCardDeleted(cardID):删除卡时调用
  • OnCardDisabled(cardID):禁用轮询时调用
  • OnCardEnabled(cardID):启用轮询时调用

实现

// IotCardService.Create()
func (s *IotCardService) Create(ctx context.Context, req *CreateReq) (*IotCard, error) {
    // 1. 创建卡
    card := &IotCard{...}
    if err := s.store.Create(ctx, card); err != nil {
        return nil, err
    }

    // 2. 加入轮询系统
    if card.EnablePolling {
        s.pollingService.OnCardCreated(ctx, card)
    }

    return card, nil
}

// RealNameCheckHandler 检测到状态变化
func (h *RealNameCheckHandler) HandleRealNameCheck(...) {
    // ...
    if newStatus != oldStatus {
        h.pollingService.OnCardStatusChanged(ctx, cardID)
    }
    // ...
}

理由

  • 自动化:无需手动干预,卡变化自动同步到轮询系统
  • 解耦业务逻辑和轮询系统分离Service 只需调用回调
  • 可测试PollingService 可以独立测试

替代方案

  • 数据库触发器Go 生态不推荐使用触发器,调试困难
  • 定期全量同步:延迟高,资源浪费

权衡

  • 实时同步,无延迟
  • 易于维护和测试
  • ⚠️ 需要在多个 Service 方法中调用回调(可以通过拦截器优化)

决策 8监控统计数据存储

问题:监控指标(成功率、平均耗时、队列长度)如何存储和计算?

选择Redis Hash 存储统计数据,每次任务执行后更新。

数据结构

polling:stats:realname → {
  queue_size: 1234567,        # 从 Sorted Set 读取
  processing: 50,             # 从并发控制读取
  success_count_1h: 12345,    # 最近1小时成功次数
  failure_count_1h: 123,      # 最近1小时失败次数
  total_duration_1h: 1234567, # 最近1小时总耗时(ms)
  last_reset: "2026-02-04 10:00:00"
}

计算

  • 成功率 = success_count / (success_count + failure_count)
  • 平均耗时 = total_duration / success_count

定期重置:每小时重置计数器,保持时间窗口滚动。

理由

  • 高性能Redis Hash 读写快,支持原子操作
  • 简单实现:无需复杂的时序数据库
  • 实时性:每次任务执行后立即更新

替代方案

  • 时序数据库InfluxDB/Prometheus:需要额外组件,过度设计
  • 数据库统计表:写入性能差,延迟高

权衡

  • 简单高效
  • 实时性好
  • ⚠️ 只保留最近1小时数据长期数据需要归档
  • ⚠️ Redis 重启后数据丢失(可以通过持久化缓解)

决策 9告警检查频率

问题:告警规则多久检查一次?

选择独立的告警检查器AlertChecker每 1 分钟运行一次。

流程

  1. 读取所有启用的告警规则
  2. 从 Redis 读取对应的监控指标
  3. 判断是否满足告警条件(如 queue_size > 1000000
  4. 如果满足条件且持续时间达到阈值(如 5 分钟),发送告警
  5. 记录告警历史,避免重复发送(冷却期)

理由

  • 独立运行:不阻塞轮询任务
  • 可配置:告警规则灵活配置
  • 避免误报:持续时间阈值避免短暂波动触发告警

替代方案

  • 实时告警:每次任务执行后检查
    • 问题:频率太高,性能开销大
  • 定时任务Cron:依赖外部调度
    • 问题:增加依赖,不够灵活

权衡

  • 平衡性能和实时性
  • 易于实现和维护
  • ⚠️ 1 分钟延迟(对告警来说可接受)

决策 10数据清理策略

问题:流量历史记录(data_usage_records)会快速增长,如何清理?

选择:定时清理任务,每天凌晨 2 点运行。

流程

  1. 读取清理配置(tb_data_cleanup_config
  2. 对每个配置的表,删除超过保留天数的数据
    DELETE FROM tb_data_usage_record
    WHERE created_at < NOW() - INTERVAL '90 days'
    LIMIT 10000;  -- 分批删除,避免锁表
    
  3. 记录清理日志

理由

  • 避免数据膨胀:定期清理历史数据,控制表大小
  • 可配置:保留天数可配置
  • 分批删除:避免长时间锁表

替代方案

  • 分区表Partition:按月自动删除旧分区
    • 问题:需要数据库层面支持,配置复杂
  • 手动清理:依赖人工操作,容易遗忘

权衡

  • 简单可靠
  • 支持灵活配置
  • ⚠️ 删除期间表可能有轻微性能影响(通过 LIMIT 控制)

Risks / Trade-offs

风险 1Redis 内存不足

风险1000万卡 × 200字节 = 2 GB 缓存 + 600 MB 队列 = ~3 GB如果卡量增长到 2000万需要 6 GB。

缓解措施

  • 监控 Redis 内存使用率,设置告警(超过 80%
  • 卡信息缓存设置 TTL7天自动淘汰
  • 如果内存不足可以只缓存热点卡LRU 策略)

风险 2Redis 和数据库数据不一致

风险Redis 缓存的卡信息可能与数据库不同步(如卡状态变更但未更新 Redis

缓解措施

  • 定时同步任务(每小时):从数据库读取最近更新的卡,更新 Redis
  • 懒加载机制:如果缓存未命中,从数据库读取并更新缓存
  • 卡状态变更时主动更新 RedisOnCardStatusChanged

风险 3Gateway API 调用失败

风险Gateway 不可用或超时,导致轮询任务失败。

缓解措施

  • 任务失败不重试(MaxRetry = 0),避免重复调用打挂 Gateway
  • 失败任务重新入队(按原计划下次检查)
  • 记录失败统计,触发告警(失败率 > 5%
  • Gateway 调用设置超时30秒

风险 4渐进式初始化期间卡未入队

风险:初始化未完成时,部分卡还未加入轮询队列。

缓解措施

  • 懒加载机制:卡被访问时自动加载
  • 监控初始化进度,提供管理接口查看
  • 支持手动触发检查(优先级最高)

风险 5并发控制 Redis 故障

风险Redis 故障导致并发控制失效,可能有大量任务同时执行。

缓解措施

  • Redis 连接失败时使用默认并发数50
  • Asynq 队列本身有并发控制(作为二级保护)
  • 监控 Gateway 负载,设置告警

Trade-off 1实时性 vs 资源消耗

权衡轮询间隔越短实时性越好但资源消耗数据库、Redis、Gateway API越高。

选择:支持灵活配置,根据卡状态动态调整间隔

  • 未实名卡30-60秒需要及时发现实名完成
  • 已实名卡1小时状态变化少
  • 已激活卡30分钟流量监控

Trade-off 2缓存一致性 vs 性能

权衡:强一致性需要每次从数据库读取,性能差;最终一致性性能好,但可能有短暂不一致。

选择:最终一致性

  • 通过定时同步和懒加载保证最终一致
  • 对业务影响小(轮询任务本身就是定期的,短暂不一致可接受)

Trade-off 3自定义并发控制 vs Asynq 原生

权衡自定义并发控制灵活性高但增加复杂度Asynq 原生简单,但不支持动态调整。

选择:自定义并发控制

  • 业务需求明确(需要动态调整)
  • 实现简单(基于 Redis 原子操作)
  • 性能开销小(每个任务只需 1 次 INCR/DECR

Migration Plan

部署步骤

阶段 1数据库迁移无服务中断

# 1. 执行数据库迁移(新增表和字段)
go run cmd/migrate/main.go up

# 2. 验证迁移成功
psql -U user -d database -c "\d tb_polling_config"
psql -U user -d database -c "\d tb_iot_card"

迁移内容

  • 新增表:tb_polling_configtb_polling_concurrency_configtb_polling_alert_ruletb_data_cleanup_config
  • 修改表:tb_iot_card 增加字段 current_month_usage_mbcurrent_month_start_datelast_month_total_mb

影响:无,新增字段有默认值,不影响已有数据

阶段 2初始化配置数据

# 执行配置初始化脚本
psql -U user -d database -f scripts/init_polling_config.sql

初始化内容

  • 创建默认轮询配置(未实名卡、已实名卡、行业卡等)
  • 创建默认并发控制配置
  • 创建数据清理配置

阶段 3部署新版本 Worker灰度发布

# 1. 先部署一台 Worker 测试
# 停止旧 Worker
kill -TERM <worker_pid>

# 启动新 Worker
./bin/worker

# 2. 观察日志,确认初始化成功
tail -f logs/app.log | grep "轮询系统"

# 3. 检查 Redis 数据
redis-cli
> ZCARD polling:queue:realname
> HGETALL polling:card:1
> GET polling:configs

# 4. 逐步部署所有 Worker

关键检查点

  • Worker 启动时间 < 10秒
  • 渐进式初始化正常运行
  • Redis 队列有数据
  • 轮询任务正常执行

阶段 4部署 API 服务(新增管理接口)

# 部署新版本 API 服务
./bin/api

# 验证管理接口
curl http://localhost:8080/api/admin/polling-configs
curl http://localhost:8080/api/admin/polling-stats

阶段 5启用告警

# 通过管理接口创建告警规则
curl -X POST http://localhost:8080/api/admin/polling-alert-rules \
  -d '{
    "rule_name": "实名检查队列积压告警",
    "task_type": "realname",
    "metric_type": "queue_size",
    "operator": "gt",
    "threshold": 1000000,
    "alert_level": "warning"
  }'

回滚策略

回滚 API 服务

# 部署旧版本 API不包含轮询管理接口
./bin/api-old

# 影响:轮询管理接口不可用,但轮询系统仍正常运行

回滚 Worker 进程

# 停止新 Worker
kill -TERM <worker_pid>

# 启动旧 Worker
./bin/worker-old

# 影响:轮询系统停止工作,但不影响其他业务

回滚数据库(慎用)

# 只有在数据异常时才回滚数据库
go run cmd/migrate/main.go down

# 影响:
# - 删除轮询相关表
# - 删除 iot_cards 表的新增字段(数据丢失!)

建议:除非数据严重错误,否则不回滚数据库,新增字段不影响旧版本代码。

数据迁移(如果需要)

场景:如果已有卡的流量数据需要迁移到新字段。

-- 初始化新字段(如果需要)
UPDATE tb_iot_card
SET current_month_start_date = DATE_TRUNC('month', CURRENT_DATE),
    current_month_usage_mb = 0,
    last_month_total_mb = 0
WHERE current_month_start_date IS NULL;

验证清单

  • 数据库迁移成功,表和字段创建完成
  • 轮询配置初始化成功,有默认配置
  • Worker 启动时间 < 10秒
  • 渐进式初始化正常运行,进度可查询
  • Redis 队列有数据,卡信息缓存正常
  • 轮询任务正常执行,日志无错误
  • 管理接口可用,可以查询配置和统计
  • 手动触发功能正常
  • 并发控制生效,可以动态调整
  • 监控面板显示正确数据
  • 告警规则配置成功,告警通知正常

Open Questions

问题 1告警通知渠道的实现细节

问题邮件、短信、Webhook 的发送如何实现?

待决策

  • 是否复用现有的邮件发送服务?
  • 短信服务使用哪个供应商(阿里云、腾讯云)?
  • Webhook 是否需要签名验证?

影响:告警功能的完整性


问题 2分区表优化

问题data_usage_records 表是否使用 PostgreSQL 分区表(按月分区)?

待决策

  • 分区表可以提升查询和删除性能
  • 但增加配置复杂度

影响:数据清理性能


问题 3分布式部署支持

问题:是否需要支持多个 Worker 进程部署(分布式调度)?

当前方案:单 Worker 调度,多 Worker 执行任务(通过 Asynq 队列)

待决策

  • 如果卡量增长到亿级,单 Worker 可能成为瓶颈
  • 可以通过分片Sharding支持多 Worker 调度

影响:系统扩展性