# IoT SIM 管理系统 - 轮询机制说明 ## 概述 IoT SIM 管理系统实现了一套灵活的三层轮询机制,用于定期检查 IoT 卡的实名状态、流量使用情况和套餐流量情况。轮询机制支持梯度策略配置,可以针对不同卡状态、不同运营商设置不同的轮询间隔和优先级。 --- ## 轮询架构 ### 三层轮询体系 ``` ┌──────────────────────────────────────────────────────────┐ │ 轮询调度器 │ │ (Polling Scheduler) │ │ - 读取轮询配置表 │ │ - 按优先级和间隔时间调度任务 │ │ - 使用 Asynq 异步任务队列 │ └──────────────────────────────────────────────────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ 实名检查进程 │ │ 卡流量检查进程 │ │ 套餐流量检查进程 │ │ (Real Name) │ │ (Card Data) │ │ (Package Data) │ ├────────────────┤ ├────────────────┤ ├────────────────┤ │ - 查询未实名卡 │ │ - 查询激活的卡 │ │ - 查询生效中套餐 │ │ - 调用运营商API │ │ - 同步流量使用 │ │ - 检查流量使用 │ │ - 更新实名状态 │ │ - 更新 IoT 卡 │ │ - 判断是否停机 │ └────────────────┘ └────────────────┘ └────────────────┘ ``` --- ## 轮询配置表 ### 表结构: `polling_configs` 轮询配置表支持灵活的梯度轮询策略: ```sql CREATE TABLE polling_configs ( id BIGSERIAL PRIMARY KEY, config_name VARCHAR(100) UNIQUE NOT NULL, description VARCHAR(500), card_condition VARCHAR(50), carrier_id BIGINT, real_name_check_enabled BOOLEAN DEFAULT false, real_name_check_interval INT DEFAULT 60, card_data_check_enabled BOOLEAN DEFAULT false, card_data_check_interval INT DEFAULT 60, package_check_enabled BOOLEAN DEFAULT false, package_check_interval INT DEFAULT 60, priority INT NOT NULL DEFAULT 100, status INT NOT NULL DEFAULT 1, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); ``` ### 配置字段说明 | 字段名 | 类型 | 说明 | |--------|------|------| | config_name | VARCHAR(100) | 配置名称,如"未实名卡-移动"、"已激活卡-联通" | | card_condition | VARCHAR(50) | 卡状态条件: `not_real_name`/`real_name`/`activated`/`suspended` | | carrier_id | BIGINT | 运营商ID,NULL 表示所有运营商 | | real_name_check_enabled | BOOLEAN | 是否启用实名检查 | | real_name_check_interval | INT | 实名检查间隔(秒) | | card_data_check_enabled | BOOLEAN | 是否启用卡流量检查 | | card_data_check_interval | INT | 卡流量检查间隔(秒) | | package_check_enabled | BOOLEAN | 是否启用套餐流量检查 | | package_check_interval | INT | 套餐流量检查间隔(秒) | | priority | INT | 优先级(数字越小优先级越高) | | status | INT | 状态: 1-启用, 2-禁用 | --- ## 轮询配置示例 ### 示例 1: 未实名卡快速轮询 ```sql INSERT INTO polling_configs ( config_name, description, card_condition, carrier_id, real_name_check_enabled, real_name_check_interval, card_data_check_enabled, card_data_check_interval, priority, status ) VALUES ( '未实名卡-快速轮询', '对未实名卡每30秒检查一次实名状态', 'not_real_name', NULL, -- 所有运营商 true, -- 启用实名检查 30, -- 30秒间隔 false, -- 不检查流量 0, 10, -- 高优先级 1 -- 启用 ); ``` **说明**: 未实名卡需要频繁检查实名状态,以便及时发现已完成实名认证的卡。 --- ### 示例 2: 已激活卡流量监控 ```sql INSERT INTO polling_configs ( config_name, description, card_condition, carrier_id, real_name_check_enabled, real_name_check_interval, card_data_check_enabled, card_data_check_interval, package_check_enabled, package_check_interval, priority, status ) VALUES ( '已激活卡-流量监控', '对已激活卡每60秒检查流量使用', 'activated', NULL, false, -- 不检查实名 0, true, -- 启用卡流量检查 60, -- 60秒间隔 true, -- 启用套餐流量检查 60, -- 60秒间隔 20, -- 中优先级 1 ); ``` **说明**: 已激活卡需要监控流量使用,防止超额使用和及时停机。 --- ### 示例 3: 移动运营商特殊策略 ```sql INSERT INTO polling_configs ( config_name, description, card_condition, carrier_id, real_name_check_enabled, real_name_check_interval, card_data_check_enabled, card_data_check_interval, priority, status ) VALUES ( '移动-已激活卡-慢速轮询', '移动运营商已激活卡每180秒检查一次流量', 'activated', 1, -- 中国移动 carrier_id false, 0, true, 180, -- 180秒间隔 50, -- 低优先级 1 ); ``` **说明**: 可以针对特定运营商设置不同的轮询策略,优化 API 调用频率。 --- ## 三种轮询进程 ### 1. 实名检查进程 (Real Name Check) **目标**: 检查未实名的 IoT 卡是否已完成实名认证 **工作流程**: 1. 查询符合条件的 IoT 卡: - `card_category = 'normal'` (普通卡需要实名) - `real_name_status = 0` (未实名) - `enable_polling = true` (参与轮询) - 根据 `last_real_name_check_at` 判断是否到达检查间隔 2. 调用运营商 Gateway API 查询实名状态 3. 更新 IoT 卡的 `real_name_status` 和 `last_real_name_check_at` 4. 记录日志和异常情况 **轮询间隔控制**: ```go // 伪代码 if time.Since(card.LastRealNameCheckAt) >= config.RealNameCheckInterval { // 执行实名检查 result := gateway.CheckRealName(card.ICCID) card.RealNameStatus = result.Status card.LastRealNameCheckAt = time.Now() db.Save(&card) } ``` **配置参数**: - `real_name_check_enabled`: 是否启用 - `real_name_check_interval`: 检查间隔(秒) **注意事项**: - 行业卡 (`card_category = 'industry'`) 无需实名检查 - 已实名的卡 (`real_name_status = 1`) 不再参与轮询 --- ### 2. 卡流量检查进程 (Card Data Check) **目标**: 同步 IoT 卡的流量使用情况 **工作流程**: 1. 查询符合条件的 IoT 卡: - `activation_status = 1` (已激活) - `enable_polling = true` (参与轮询) - 根据 `last_data_check_at` 判断是否到达检查间隔 2. 调用运营商 Gateway API 查询流量使用 3. 更新 IoT 卡的 `data_usage_mb` 和 `last_data_check_at` 4. 记录流量使用历史到 `data_usage_records` 表 5. 判断是否需要停机: - 如果流量超过套餐的虚流量额度,触发停机逻辑 **轮询间隔控制**: ```go // 伪代码 if time.Since(card.LastDataCheckAt) >= config.CardDataCheckInterval { // 执行流量检查 usage := gateway.GetDataUsage(card.ICCID) card.DataUsageMB = usage.TotalMB card.LastDataCheckAt = time.Now() db.Save(&card) // 记录历史数据 record := DataUsageRecord{ IotCardID: card.ID, UsageDate: time.Now().Format("2006-01-02"), DataUsageMB: usage.TodayMB, CarrierSyncData: usage.RawData, SyncedAt: time.Now(), } db.Create(&record) } ``` **配置参数**: - `card_data_check_enabled`: 是否启用 - `card_data_check_interval`: 检查间隔(秒) **停机判断逻辑**: ```go // 伪代码 if card.DataUsageMB >= package.VirtualDataMB { // 触发停机 card.NetworkStatus = 0 // 停机 gateway.SuspendCard(card.ICCID) } ``` --- ### 3. 套餐流量检查进程 (Package Check) **目标**: 检查套餐流量使用情况,判断套餐状态 **工作流程**: 1. 查询符合条件的套餐使用记录: - `status = 1` (生效中) - `expires_at > NOW()` (未过期) - 根据 `last_package_check_at` 判断是否到达检查间隔 2. 计算套餐的流量使用情况: - 单卡套餐: 统计该卡的流量使用 - 设备级套餐: 统计该设备绑定的所有卡的流量使用 3. 更新套餐使用记录的 `data_usage_mb` 和 `last_package_check_at` 4. 判断套餐状态: - 如果流量用完: `status = 2` (已用完) - 如果时间过期: `status = 3` (已过期) 5. 如果套餐用完或过期,触发停机逻辑 **轮询间隔控制**: ```go // 伪代码 if time.Since(packageUsage.LastPackageCheckAt) >= config.PackageCheckInterval { // 执行套餐检查 var totalUsage int64 if packageUsage.UsageType == "single_card" { // 单卡套餐 card := db.FindIotCardByID(packageUsage.IotCardID) totalUsage = card.DataUsageMB } else { // 设备级套餐 bindings := db.FindDeviceSimBindings(packageUsage.DeviceID) for _, binding := range bindings { card := db.FindIotCardByID(binding.IotCardID) totalUsage += card.DataUsageMB } } packageUsage.DataUsageMB = totalUsage packageUsage.LastPackageCheckAt = time.Now() // 判断状态 if totalUsage >= packageUsage.DataLimitMB { packageUsage.Status = 2 // 已用完 // 触发停机 } if time.Now().After(packageUsage.ExpiresAt) { packageUsage.Status = 3 // 已过期 // 触发停机 } db.Save(&packageUsage) } ``` **配置参数**: - `package_check_enabled`: 是否启用 - `package_check_interval`: 检查间隔(秒) **停机判断逻辑**: ```go // 伪代码 if packageUsage.Status == 2 || packageUsage.Status == 3 { // 套餐用完或过期,触发停机 if packageUsage.UsageType == "single_card" { card := db.FindIotCardByID(packageUsage.IotCardID) card.NetworkStatus = 0 // 停机 gateway.SuspendCard(card.ICCID) } else { // 设备级套餐,停掉所有绑定的卡 bindings := db.FindDeviceSimBindings(packageUsage.DeviceID) for _, binding := range bindings { card := db.FindIotCardByID(binding.IotCardID) card.NetworkStatus = 0 gateway.SuspendCard(card.ICCID) } } } ``` --- ## 轮询调度器设计 ### 调度器架构 ```go // 伪代码 type PollingScheduler struct { db *gorm.DB queue *asynq.Client } func (s *PollingScheduler) Start() { // 启动三个独立的调度协程 go s.scheduleRealNameCheck() go s.scheduleCardDataCheck() go s.schedulePackageCheck() } func (s *PollingScheduler) scheduleRealNameCheck() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for range ticker.C { configs := s.loadPollingConfigs("real_name_check_enabled = true") for _, config := range configs { // 查询需要检查的卡 cards := s.findCardsForRealNameCheck(config) for _, card := range cards { // 使用 Asynq 异步任务队列 task := asynq.NewTask("iot:realname:check", map[string]interface{}{ "card_id": card.ID, "config_id": config.ID, }) s.queue.Enqueue(task, asynq.ProcessIn(0)) } } } } ``` ### 任务队列设计 使用 Asynq 异步任务队列处理轮询任务: ```go // 伪代码 type RealNameCheckHandler struct { db *gorm.DB gateway *CarrierGateway } func (h *RealNameCheckHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { var payload struct { CardID uint `json:"card_id"` ConfigID uint `json:"config_id"` } if err := json.Unmarshal(task.Payload(), &payload); err != nil { return err } // 加载卡信息 card := h.db.FindIotCardByID(payload.CardID) // 调用运营商 API result, err := h.gateway.CheckRealName(card.ICCID) if err != nil { return err } // 更新卡状态 card.RealNameStatus = result.Status card.LastRealNameCheckAt = time.Now() h.db.Save(&card) return nil } ``` --- ## 轮询优先级和并发控制 ### 优先级机制 轮询配置表的 `priority` 字段控制执行优先级: - **高优先级 (1-30)**: 紧急任务,如未实名卡检查 - **中优先级 (31-70)**: 常规任务,如流量监控 - **低优先级 (71-100)**: 非紧急任务,如历史数据同步 调度器按优先级排序执行: ```sql SELECT * FROM polling_configs WHERE status = 1 ORDER BY priority ASC, id ASC; ``` ### 并发控制 使用 Asynq 的并发控制功能: ```go // 伪代码 queue := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"}) // 设置队列并发数 queues := map[string]int{ "iot:realname": 10, // 实名检查队列,10个并发 "iot:carddata": 20, // 卡流量检查队列,20个并发 "iot:package": 20, // 套餐检查队列,20个并发 } server := asynq.NewServer( asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Queues: queues}, ) ``` ### 限流保护 为了避免过度调用运营商 API,需要实现限流保护: ```go // 伪代码 type RateLimiter struct { limiter *rate.Limiter } func NewRateLimiter(carrierID uint) *RateLimiter { // 每秒最多 10 次 API 调用 return &RateLimiter{ limiter: rate.NewLimiter(rate.Limit(10), 10), } } func (r *RateLimiter) Wait(ctx context.Context) error { return r.limiter.Wait(ctx) } ``` --- ## 轮询间隔策略 ### 推荐配置 | 卡状态 | 实名检查间隔 | 卡流量检查间隔 | 套餐流量检查间隔 | 优先级 | |--------|-------------|---------------|----------------|-------| | 未实名卡 | 30秒 | - | - | 10 (高) | | 已实名未激活 | - | - | - | - | | 已激活卡(正常) | - | 60秒 | 60秒 | 20 (中) | | 已激活卡(套餐即将用完) | - | 30秒 | 30秒 | 15 (高) | | 已停用卡 | - | - | - | - | ### 动态调整策略 根据卡的流量使用情况动态调整轮询间隔: ```go // 伪代码 func calculateCheckInterval(packageUsage *PackageUsage) int { usagePercent := float64(packageUsage.DataUsageMB) / float64(packageUsage.DataLimitMB) if usagePercent >= 0.9 { return 30 // 90%以上,30秒检查一次 } else if usagePercent >= 0.7 { return 60 // 70-90%,60秒检查一次 } else { return 180 // 70%以下,180秒检查一次 } } ``` --- ## 轮询开关控制 ### 全局开关 IoT 卡的 `enable_polling` 字段控制是否参与轮询: ```sql -- 禁用某张卡的轮询 UPDATE iot_cards SET enable_polling = false WHERE iccid = '89860123456789012345'; -- 启用某张卡的轮询 UPDATE iot_cards SET enable_polling = true WHERE iccid = '89860123456789012345'; ``` ### 配置开关 轮询配置表的 `status` 字段控制整个配置是否启用: ```sql -- 禁用某个轮询配置 UPDATE polling_configs SET status = 2 WHERE config_name = '未实名卡-快速轮询'; -- 启用某个轮询配置 UPDATE polling_configs SET status = 1 WHERE config_name = '未实名卡-快速轮询'; ``` ### 单项开关 轮询配置表的 `*_check_enabled` 字段控制具体检查类型: ```sql -- 只启用实名检查,禁用流量检查 UPDATE polling_configs SET real_name_check_enabled = true, card_data_check_enabled = false, package_check_enabled = false WHERE config_name = '未实名卡-快速轮询'; ``` --- ## 错误处理和重试 ### 错误处理策略 轮询任务可能因为以下原因失败: 1. 运营商 API 超时 2. 运营商 API 返回错误 3. 数据库连接失败 4. 网络故障 使用 Asynq 的重试机制: ```go // 伪代码 task := asynq.NewTask("iot:realname:check", payload) // 设置重试策略 opts := []asynq.Option{ asynq.MaxRetry(3), // 最多重试 3 次 asynq.Timeout(30 * time.Second), // 任务超时时间 30 秒 } queue.Enqueue(task, opts...) ``` ### 失败日志记录 记录失败的轮询任务: ```go // 伪代码 type PollingLog struct { ID uint `gorm:"primaryKey"` TaskType string // realname/carddata/package CardID uint ConfigID uint Success bool ErrorMsg string ExecutedAt time.Time CreatedAt time.Time } func logPollingResult(taskType string, cardID, configID uint, err error) { log := PollingLog{ TaskType: taskType, CardID: cardID, ConfigID: configID, Success: err == nil, ErrorMsg: fmt.Sprintf("%v", err), ExecutedAt: time.Now(), CreatedAt: time.Now(), } db.Create(&log) } ``` --- ## 监控和告警 ### 监控指标 1. **轮询任务执行成功率** - 实名检查成功率 - 卡流量检查成功率 - 套餐流量检查成功率 2. **轮询任务延迟** - 任务入队时间到执行时间的延迟 - 平均延迟、P95、P99 3. **运营商 API 调用统计** - 每分钟 API 调用次数 - API 响应时间 - API 错误率 4. **卡状态统计** - 未实名卡数量 - 已激活卡数量 - 已停用卡数量 ### 告警规则 1. **高失败率告警** - 如果某类轮询任务 5 分钟内失败率超过 50%,触发告警 2. **高延迟告警** - 如果轮询任务延迟超过 5 分钟,触发告警 3. **API 异常告警** - 如果运营商 API 连续失败 10 次,触发告警 4. **流量异常告警** - 如果某张卡流量使用突增(1小时内增加超过 100MB),触发告警 --- ## 最佳实践 ### 1. 合理设置轮询间隔 - **频繁轮询的代价**: 增加运营商 API 调用次数,可能触发限流 - **稀疏轮询的风险**: 流量超额检测不及时,可能导致停机延迟 - **建议**: 根据业务需求和运营商 API 限制平衡轮询频率 ### 2. 使用批量查询 ```go // 不推荐: 逐个查询 for _, card := range cards { usage := gateway.GetDataUsage(card.ICCID) card.DataUsageMB = usage.TotalMB db.Save(&card) } // 推荐: 批量查询 iccids := []string{} for _, card := range cards { iccids = append(iccids, card.ICCID) } usages := gateway.BatchGetDataUsage(iccids) // 批量查询 for _, card := range cards { card.DataUsageMB = usages[card.ICCID] } db.Save(&cards) // 批量更新 ``` ### 3. 实现幂等性 轮询任务可能会重复执行,必须保证幂等性: ```go // 伪代码 func ProcessRealNameCheck(cardID uint) error { // 加锁,防止重复执行 lockKey := fmt.Sprintf("iot:realname:lock:%d", cardID) lock := redis.SetNX(lockKey, "1", 60*time.Second) if !lock { return errors.New("task already running") } defer redis.Del(lockKey) // 执行检查 card := db.FindIotCardByID(cardID) result := gateway.CheckRealName(card.ICCID) card.RealNameStatus = result.Status card.LastRealNameCheckAt = time.Now() db.Save(&card) return nil } ``` ### 4. 记录详细日志 ```go // 伪代码 logger.Info("开始实名检查", zap.Uint("card_id", card.ID), zap.String("iccid", card.ICCID), zap.Uint("config_id", config.ID), ) result, err := gateway.CheckRealName(card.ICCID) if err != nil { logger.Error("实名检查失败", zap.Uint("card_id", card.ID), zap.String("iccid", card.ICCID), zap.Error(err), ) return err } logger.Info("实名检查成功", zap.Uint("card_id", card.ID), zap.String("iccid", card.ICCID), zap.Int("real_name_status", result.Status), ) ``` --- ## 总结 IoT SIM 管理系统的轮询机制具有以下特点: 1. **三层轮询体系**: 实名检查、卡流量检查、套餐流量检查相互独立 2. **灵活配置**: 支持按卡状态、运营商、优先级配置不同的轮询策略 3. **异步任务队列**: 使用 Asynq 实现高并发、可重试的任务处理 4. **梯度策略**: 支持根据流量使用情况动态调整轮询间隔 5. **开关控制**: 支持全局、配置、单项的轮询开关 6. **错误处理**: 完善的重试机制和错误日志记录 7. **监控告警**: 实时监控轮询任务执行情况和运营商 API 调用 通过合理配置和使用轮询机制,可以实现 IoT 卡的自动化管理,提高运营效率,降低人工成本。 --- **文档版本**: v1.0 **最后更新**: 2026-01-12 **维护人员**: Claude Sonnet 4.5