- 添加 IoT 核心业务表:运营商、IoT 卡、设备、号卡、套餐、订单等 - 添加分佣系统表:分佣规则、分佣记录、运营商结算等 - 添加轮询和流量管理表:轮询配置、流量使用记录等 - 添加财务和系统管理表:佣金提现、换卡申请等 - 实现完整的 GORM 模型和常量定义 - 添加数据库迁移脚本和详细文档 - 集成 OpenSpec 工作流工具(opsx 命令和 skills) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
21 KiB
IoT SIM 管理系统 - 轮询机制说明
概述
IoT SIM 管理系统实现了一套灵活的三层轮询机制,用于定期检查 IoT 卡的实名状态、流量使用情况和套餐流量情况。轮询机制支持梯度策略配置,可以针对不同卡状态、不同运营商设置不同的轮询间隔和优先级。
轮询架构
三层轮询体系
┌──────────────────────────────────────────────────────────┐
│ 轮询调度器 │
│ (Polling Scheduler) │
│ - 读取轮询配置表 │
│ - 按优先级和间隔时间调度任务 │
│ - 使用 Asynq 异步任务队列 │
└──────────────────────────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 实名检查进程 │ │ 卡流量检查进程 │ │ 套餐流量检查进程 │
│ (Real Name) │ │ (Card Data) │ │ (Package Data) │
├────────────────┤ ├────────────────┤ ├────────────────┤
│ - 查询未实名卡 │ │ - 查询激活的卡 │ │ - 查询生效中套餐 │
│ - 调用运营商API │ │ - 同步流量使用 │ │ - 检查流量使用 │
│ - 更新实名状态 │ │ - 更新 IoT 卡 │ │ - 判断是否停机 │
└────────────────┘ └────────────────┘ └────────────────┘
轮询配置表
表结构: polling_configs
轮询配置表支持灵活的梯度轮询策略:
CREATE TABLE polling_configs (
id BIGSERIAL PRIMARY KEY,
config_name VARCHAR(100) UNIQUE NOT NULL,
description VARCHAR(500),
card_condition VARCHAR(50),
carrier_id BIGINT,
real_name_check_enabled BOOLEAN DEFAULT false,
real_name_check_interval INT DEFAULT 60,
card_data_check_enabled BOOLEAN DEFAULT false,
card_data_check_interval INT DEFAULT 60,
package_check_enabled BOOLEAN DEFAULT false,
package_check_interval INT DEFAULT 60,
priority INT NOT NULL DEFAULT 100,
status INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
配置字段说明
| 字段名 | 类型 | 说明 |
|---|---|---|
| config_name | VARCHAR(100) | 配置名称,如"未实名卡-移动"、"已激活卡-联通" |
| card_condition | VARCHAR(50) | 卡状态条件: not_real_name/real_name/activated/suspended |
| carrier_id | BIGINT | 运营商ID,NULL 表示所有运营商 |
| real_name_check_enabled | BOOLEAN | 是否启用实名检查 |
| real_name_check_interval | INT | 实名检查间隔(秒) |
| card_data_check_enabled | BOOLEAN | 是否启用卡流量检查 |
| card_data_check_interval | INT | 卡流量检查间隔(秒) |
| package_check_enabled | BOOLEAN | 是否启用套餐流量检查 |
| package_check_interval | INT | 套餐流量检查间隔(秒) |
| priority | INT | 优先级(数字越小优先级越高) |
| status | INT | 状态: 1-启用, 2-禁用 |
轮询配置示例
示例 1: 未实名卡快速轮询
INSERT INTO polling_configs (
config_name,
description,
card_condition,
carrier_id,
real_name_check_enabled,
real_name_check_interval,
card_data_check_enabled,
card_data_check_interval,
priority,
status
) VALUES (
'未实名卡-快速轮询',
'对未实名卡每30秒检查一次实名状态',
'not_real_name',
NULL, -- 所有运营商
true, -- 启用实名检查
30, -- 30秒间隔
false, -- 不检查流量
0,
10, -- 高优先级
1 -- 启用
);
说明: 未实名卡需要频繁检查实名状态,以便及时发现已完成实名认证的卡。
示例 2: 已激活卡流量监控
INSERT INTO polling_configs (
config_name,
description,
card_condition,
carrier_id,
real_name_check_enabled,
real_name_check_interval,
card_data_check_enabled,
card_data_check_interval,
package_check_enabled,
package_check_interval,
priority,
status
) VALUES (
'已激活卡-流量监控',
'对已激活卡每60秒检查流量使用',
'activated',
NULL,
false, -- 不检查实名
0,
true, -- 启用卡流量检查
60, -- 60秒间隔
true, -- 启用套餐流量检查
60, -- 60秒间隔
20, -- 中优先级
1
);
说明: 已激活卡需要监控流量使用,防止超额使用和及时停机。
示例 3: 移动运营商特殊策略
INSERT INTO polling_configs (
config_name,
description,
card_condition,
carrier_id,
real_name_check_enabled,
real_name_check_interval,
card_data_check_enabled,
card_data_check_interval,
priority,
status
) VALUES (
'移动-已激活卡-慢速轮询',
'移动运营商已激活卡每180秒检查一次流量',
'activated',
1, -- 中国移动 carrier_id
false,
0,
true,
180, -- 180秒间隔
50, -- 低优先级
1
);
说明: 可以针对特定运营商设置不同的轮询策略,优化 API 调用频率。
三种轮询进程
1. 实名检查进程 (Real Name Check)
目标: 检查未实名的 IoT 卡是否已完成实名认证
工作流程:
- 查询符合条件的 IoT 卡:
card_category = 'normal'(普通卡需要实名)real_name_status = 0(未实名)enable_polling = true(参与轮询)- 根据
last_real_name_check_at判断是否到达检查间隔
- 调用运营商 Gateway API 查询实名状态
- 更新 IoT 卡的
real_name_status和last_real_name_check_at - 记录日志和异常情况
轮询间隔控制:
// 伪代码
if time.Since(card.LastRealNameCheckAt) >= config.RealNameCheckInterval {
// 执行实名检查
result := gateway.CheckRealName(card.ICCID)
card.RealNameStatus = result.Status
card.LastRealNameCheckAt = time.Now()
db.Save(&card)
}
配置参数:
real_name_check_enabled: 是否启用real_name_check_interval: 检查间隔(秒)
注意事项:
- 行业卡 (
card_category = 'industry') 无需实名检查 - 已实名的卡 (
real_name_status = 1) 不再参与轮询
2. 卡流量检查进程 (Card Data Check)
目标: 同步 IoT 卡的流量使用情况
工作流程:
- 查询符合条件的 IoT 卡:
activation_status = 1(已激活)enable_polling = true(参与轮询)- 根据
last_data_check_at判断是否到达检查间隔
- 调用运营商 Gateway API 查询流量使用
- 更新 IoT 卡的
data_usage_mb和last_data_check_at - 记录流量使用历史到
data_usage_records表 - 判断是否需要停机:
- 如果流量超过套餐的虚流量额度,触发停机逻辑
轮询间隔控制:
// 伪代码
if time.Since(card.LastDataCheckAt) >= config.CardDataCheckInterval {
// 执行流量检查
usage := gateway.GetDataUsage(card.ICCID)
card.DataUsageMB = usage.TotalMB
card.LastDataCheckAt = time.Now()
db.Save(&card)
// 记录历史数据
record := DataUsageRecord{
IotCardID: card.ID,
UsageDate: time.Now().Format("2006-01-02"),
DataUsageMB: usage.TodayMB,
CarrierSyncData: usage.RawData,
SyncedAt: time.Now(),
}
db.Create(&record)
}
配置参数:
card_data_check_enabled: 是否启用card_data_check_interval: 检查间隔(秒)
停机判断逻辑:
// 伪代码
if card.DataUsageMB >= package.VirtualDataMB {
// 触发停机
card.NetworkStatus = 0 // 停机
gateway.SuspendCard(card.ICCID)
}
3. 套餐流量检查进程 (Package Check)
目标: 检查套餐流量使用情况,判断套餐状态
工作流程:
- 查询符合条件的套餐使用记录:
status = 1(生效中)expires_at > NOW()(未过期)- 根据
last_package_check_at判断是否到达检查间隔
- 计算套餐的流量使用情况:
- 单卡套餐: 统计该卡的流量使用
- 设备级套餐: 统计该设备绑定的所有卡的流量使用
- 更新套餐使用记录的
data_usage_mb和last_package_check_at - 判断套餐状态:
- 如果流量用完:
status = 2(已用完) - 如果时间过期:
status = 3(已过期)
- 如果流量用完:
- 如果套餐用完或过期,触发停机逻辑
轮询间隔控制:
// 伪代码
if time.Since(packageUsage.LastPackageCheckAt) >= config.PackageCheckInterval {
// 执行套餐检查
var totalUsage int64
if packageUsage.UsageType == "single_card" {
// 单卡套餐
card := db.FindIotCardByID(packageUsage.IotCardID)
totalUsage = card.DataUsageMB
} else {
// 设备级套餐
bindings := db.FindDeviceSimBindings(packageUsage.DeviceID)
for _, binding := range bindings {
card := db.FindIotCardByID(binding.IotCardID)
totalUsage += card.DataUsageMB
}
}
packageUsage.DataUsageMB = totalUsage
packageUsage.LastPackageCheckAt = time.Now()
// 判断状态
if totalUsage >= packageUsage.DataLimitMB {
packageUsage.Status = 2 // 已用完
// 触发停机
}
if time.Now().After(packageUsage.ExpiresAt) {
packageUsage.Status = 3 // 已过期
// 触发停机
}
db.Save(&packageUsage)
}
配置参数:
package_check_enabled: 是否启用package_check_interval: 检查间隔(秒)
停机判断逻辑:
// 伪代码
if packageUsage.Status == 2 || packageUsage.Status == 3 {
// 套餐用完或过期,触发停机
if packageUsage.UsageType == "single_card" {
card := db.FindIotCardByID(packageUsage.IotCardID)
card.NetworkStatus = 0 // 停机
gateway.SuspendCard(card.ICCID)
} else {
// 设备级套餐,停掉所有绑定的卡
bindings := db.FindDeviceSimBindings(packageUsage.DeviceID)
for _, binding := range bindings {
card := db.FindIotCardByID(binding.IotCardID)
card.NetworkStatus = 0
gateway.SuspendCard(card.ICCID)
}
}
}
轮询调度器设计
调度器架构
// 伪代码
type PollingScheduler struct {
db *gorm.DB
queue *asynq.Client
}
func (s *PollingScheduler) Start() {
// 启动三个独立的调度协程
go s.scheduleRealNameCheck()
go s.scheduleCardDataCheck()
go s.schedulePackageCheck()
}
func (s *PollingScheduler) scheduleRealNameCheck() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
configs := s.loadPollingConfigs("real_name_check_enabled = true")
for _, config := range configs {
// 查询需要检查的卡
cards := s.findCardsForRealNameCheck(config)
for _, card := range cards {
// 使用 Asynq 异步任务队列
task := asynq.NewTask("iot:realname:check", map[string]interface{}{
"card_id": card.ID,
"config_id": config.ID,
})
s.queue.Enqueue(task, asynq.ProcessIn(0))
}
}
}
}
任务队列设计
使用 Asynq 异步任务队列处理轮询任务:
// 伪代码
type RealNameCheckHandler struct {
db *gorm.DB
gateway *CarrierGateway
}
func (h *RealNameCheckHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
var payload struct {
CardID uint `json:"card_id"`
ConfigID uint `json:"config_id"`
}
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return err
}
// 加载卡信息
card := h.db.FindIotCardByID(payload.CardID)
// 调用运营商 API
result, err := h.gateway.CheckRealName(card.ICCID)
if err != nil {
return err
}
// 更新卡状态
card.RealNameStatus = result.Status
card.LastRealNameCheckAt = time.Now()
h.db.Save(&card)
return nil
}
轮询优先级和并发控制
优先级机制
轮询配置表的 priority 字段控制执行优先级:
- 高优先级 (1-30): 紧急任务,如未实名卡检查
- 中优先级 (31-70): 常规任务,如流量监控
- 低优先级 (71-100): 非紧急任务,如历史数据同步
调度器按优先级排序执行:
SELECT * FROM polling_configs
WHERE status = 1
ORDER BY priority ASC, id ASC;
并发控制
使用 Asynq 的并发控制功能:
// 伪代码
queue := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// 设置队列并发数
queues := map[string]int{
"iot:realname": 10, // 实名检查队列,10个并发
"iot:carddata": 20, // 卡流量检查队列,20个并发
"iot:package": 20, // 套餐检查队列,20个并发
}
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Queues: queues},
)
限流保护
为了避免过度调用运营商 API,需要实现限流保护:
// 伪代码
type RateLimiter struct {
limiter *rate.Limiter
}
func NewRateLimiter(carrierID uint) *RateLimiter {
// 每秒最多 10 次 API 调用
return &RateLimiter{
limiter: rate.NewLimiter(rate.Limit(10), 10),
}
}
func (r *RateLimiter) Wait(ctx context.Context) error {
return r.limiter.Wait(ctx)
}
轮询间隔策略
推荐配置
| 卡状态 | 实名检查间隔 | 卡流量检查间隔 | 套餐流量检查间隔 | 优先级 |
|---|---|---|---|---|
| 未实名卡 | 30秒 | - | - | 10 (高) |
| 已实名未激活 | - | - | - | - |
| 已激活卡(正常) | - | 60秒 | 60秒 | 20 (中) |
| 已激活卡(套餐即将用完) | - | 30秒 | 30秒 | 15 (高) |
| 已停用卡 | - | - | - | - |
动态调整策略
根据卡的流量使用情况动态调整轮询间隔:
// 伪代码
func calculateCheckInterval(packageUsage *PackageUsage) int {
usagePercent := float64(packageUsage.DataUsageMB) / float64(packageUsage.DataLimitMB)
if usagePercent >= 0.9 {
return 30 // 90%以上,30秒检查一次
} else if usagePercent >= 0.7 {
return 60 // 70-90%,60秒检查一次
} else {
return 180 // 70%以下,180秒检查一次
}
}
轮询开关控制
全局开关
IoT 卡的 enable_polling 字段控制是否参与轮询:
-- 禁用某张卡的轮询
UPDATE iot_cards SET enable_polling = false WHERE iccid = '89860123456789012345';
-- 启用某张卡的轮询
UPDATE iot_cards SET enable_polling = true WHERE iccid = '89860123456789012345';
配置开关
轮询配置表的 status 字段控制整个配置是否启用:
-- 禁用某个轮询配置
UPDATE polling_configs SET status = 2 WHERE config_name = '未实名卡-快速轮询';
-- 启用某个轮询配置
UPDATE polling_configs SET status = 1 WHERE config_name = '未实名卡-快速轮询';
单项开关
轮询配置表的 *_check_enabled 字段控制具体检查类型:
-- 只启用实名检查,禁用流量检查
UPDATE polling_configs
SET real_name_check_enabled = true,
card_data_check_enabled = false,
package_check_enabled = false
WHERE config_name = '未实名卡-快速轮询';
错误处理和重试
错误处理策略
轮询任务可能因为以下原因失败:
- 运营商 API 超时
- 运营商 API 返回错误
- 数据库连接失败
- 网络故障
使用 Asynq 的重试机制:
// 伪代码
task := asynq.NewTask("iot:realname:check", payload)
// 设置重试策略
opts := []asynq.Option{
asynq.MaxRetry(3), // 最多重试 3 次
asynq.Timeout(30 * time.Second), // 任务超时时间 30 秒
}
queue.Enqueue(task, opts...)
失败日志记录
记录失败的轮询任务:
// 伪代码
type PollingLog struct {
ID uint `gorm:"primaryKey"`
TaskType string // realname/carddata/package
CardID uint
ConfigID uint
Success bool
ErrorMsg string
ExecutedAt time.Time
CreatedAt time.Time
}
func logPollingResult(taskType string, cardID, configID uint, err error) {
log := PollingLog{
TaskType: taskType,
CardID: cardID,
ConfigID: configID,
Success: err == nil,
ErrorMsg: fmt.Sprintf("%v", err),
ExecutedAt: time.Now(),
CreatedAt: time.Now(),
}
db.Create(&log)
}
监控和告警
监控指标
-
轮询任务执行成功率
- 实名检查成功率
- 卡流量检查成功率
- 套餐流量检查成功率
-
轮询任务延迟
- 任务入队时间到执行时间的延迟
- 平均延迟、P95、P99
-
运营商 API 调用统计
- 每分钟 API 调用次数
- API 响应时间
- API 错误率
-
卡状态统计
- 未实名卡数量
- 已激活卡数量
- 已停用卡数量
告警规则
-
高失败率告警
- 如果某类轮询任务 5 分钟内失败率超过 50%,触发告警
-
高延迟告警
- 如果轮询任务延迟超过 5 分钟,触发告警
-
API 异常告警
- 如果运营商 API 连续失败 10 次,触发告警
-
流量异常告警
- 如果某张卡流量使用突增(1小时内增加超过 100MB),触发告警
最佳实践
1. 合理设置轮询间隔
- 频繁轮询的代价: 增加运营商 API 调用次数,可能触发限流
- 稀疏轮询的风险: 流量超额检测不及时,可能导致停机延迟
- 建议: 根据业务需求和运营商 API 限制平衡轮询频率
2. 使用批量查询
// 不推荐: 逐个查询
for _, card := range cards {
usage := gateway.GetDataUsage(card.ICCID)
card.DataUsageMB = usage.TotalMB
db.Save(&card)
}
// 推荐: 批量查询
iccids := []string{}
for _, card := range cards {
iccids = append(iccids, card.ICCID)
}
usages := gateway.BatchGetDataUsage(iccids) // 批量查询
for _, card := range cards {
card.DataUsageMB = usages[card.ICCID]
}
db.Save(&cards) // 批量更新
3. 实现幂等性
轮询任务可能会重复执行,必须保证幂等性:
// 伪代码
func ProcessRealNameCheck(cardID uint) error {
// 加锁,防止重复执行
lockKey := fmt.Sprintf("iot:realname:lock:%d", cardID)
lock := redis.SetNX(lockKey, "1", 60*time.Second)
if !lock {
return errors.New("task already running")
}
defer redis.Del(lockKey)
// 执行检查
card := db.FindIotCardByID(cardID)
result := gateway.CheckRealName(card.ICCID)
card.RealNameStatus = result.Status
card.LastRealNameCheckAt = time.Now()
db.Save(&card)
return nil
}
4. 记录详细日志
// 伪代码
logger.Info("开始实名检查",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Uint("config_id", config.ID),
)
result, err := gateway.CheckRealName(card.ICCID)
if err != nil {
logger.Error("实名检查失败",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Error(err),
)
return err
}
logger.Info("实名检查成功",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Int("real_name_status", result.Status),
)
总结
IoT SIM 管理系统的轮询机制具有以下特点:
- 三层轮询体系: 实名检查、卡流量检查、套餐流量检查相互独立
- 灵活配置: 支持按卡状态、运营商、优先级配置不同的轮询策略
- 异步任务队列: 使用 Asynq 实现高并发、可重试的任务处理
- 梯度策略: 支持根据流量使用情况动态调整轮询间隔
- 开关控制: 支持全局、配置、单项的轮询开关
- 错误处理: 完善的重试机制和错误日志记录
- 监控告警: 实时监控轮询任务执行情况和运营商 API 调用
通过合理配置和使用轮询机制,可以实现 IoT 卡的自动化管理,提高运营效率,降低人工成本。
文档版本: v1.0 最后更新: 2026-01-12 维护人员: Claude Sonnet 4.5