Files
huang 931e140e8e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
feat: 实现 IoT 卡轮询系统(支持千万级卡规模)
实现功能:
- 实名状态检查轮询(可配置间隔)
- 卡流量检查轮询(支持跨月流量追踪)
- 套餐检查与超额自动停机
- 分布式并发控制(Redis 信号量)
- 手动触发轮询(单卡/批量/条件筛选)
- 数据清理配置与执行
- 告警规则与历史记录
- 实时监控统计(队列/性能/并发)

性能优化:
- Redis 缓存卡信息,减少 DB 查询
- Pipeline 批量写入 Redis
- 异步流量记录写入
- 渐进式初始化(10万卡/批)

压测工具(scripts/benchmark/):
- Mock Gateway 模拟上游服务
- 测试卡生成器
- 配置初始化脚本
- 实时监控脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 17:32:44 +08:00

229 lines
5.9 KiB
Go

package polling
import (
"context"
"time"
"go.uber.org/zap"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// OnCardCreated 卡创建时的回调
// 将新卡加入轮询队列
func (s *Scheduler) OnCardCreated(ctx context.Context, card *model.IotCard) {
if card == nil {
return
}
s.logger.Debug("卡创建回调", zap.Uint("card_id", card.ID))
if err := s.initCardPolling(ctx, card); err != nil {
s.logger.Error("初始化新卡轮询失败",
zap.Uint("card_id", card.ID),
zap.Error(err))
}
}
// OnBatchCardsCreated 批量卡创建时的回调
func (s *Scheduler) OnBatchCardsCreated(ctx context.Context, cards []*model.IotCard) {
if len(cards) == 0 {
return
}
s.logger.Info("批量卡创建回调", zap.Int("count", len(cards)))
for _, card := range cards {
if err := s.initCardPolling(ctx, card); err != nil {
s.logger.Warn("初始化批量导入卡轮询失败",
zap.Uint("card_id", card.ID),
zap.Error(err))
}
}
}
// OnCardStatusChanged 卡状态变化时的回调
// 重新匹配配置并更新轮询队列
func (s *Scheduler) OnCardStatusChanged(ctx context.Context, cardID uint) {
s.logger.Debug("卡状态变化回调", zap.Uint("card_id", cardID))
// 从数据库重新加载卡信息
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
s.logger.Error("加载卡信息失败",
zap.Uint("card_id", cardID),
zap.Error(err))
return
}
// 先从所有队列中移除
s.removeFromAllQueues(ctx, cardID)
// 重新初始化轮询
if err := s.initCardPolling(ctx, card); err != nil {
s.logger.Error("重新初始化卡轮询失败",
zap.Uint("card_id", cardID),
zap.Error(err))
}
}
// OnCardDeleted 卡删除时的回调
// 从轮询队列中移除
func (s *Scheduler) OnCardDeleted(ctx context.Context, cardID uint) {
s.logger.Debug("卡删除回调", zap.Uint("card_id", cardID))
// 从所有队列中移除
s.removeFromAllQueues(ctx, cardID)
// 删除缓存
key := constants.RedisPollingCardInfoKey(cardID)
if err := s.redis.Del(ctx, key).Err(); err != nil {
s.logger.Warn("删除卡缓存失败",
zap.Uint("card_id", cardID),
zap.Error(err))
}
}
// OnCardEnabled 卡启用轮询时的回调
func (s *Scheduler) OnCardEnabled(ctx context.Context, cardID uint) {
s.logger.Debug("卡启用轮询回调", zap.Uint("card_id", cardID))
// 从数据库加载卡信息
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
s.logger.Error("加载卡信息失败",
zap.Uint("card_id", cardID),
zap.Error(err))
return
}
// 初始化轮询
if err := s.initCardPolling(ctx, card); err != nil {
s.logger.Error("启用卡轮询失败",
zap.Uint("card_id", cardID),
zap.Error(err))
}
}
// OnCardDisabled 卡禁用轮询时的回调
func (s *Scheduler) OnCardDisabled(ctx context.Context, cardID uint) {
s.logger.Debug("卡禁用轮询回调", zap.Uint("card_id", cardID))
// 从所有队列中移除
s.removeFromAllQueues(ctx, cardID)
}
// removeFromAllQueues 从所有轮询队列中移除卡
func (s *Scheduler) removeFromAllQueues(ctx context.Context, cardID uint) {
member := formatUint(cardID)
queues := []string{
constants.RedisPollingQueueRealnameKey(),
constants.RedisPollingQueueCarddataKey(),
constants.RedisPollingQueuePackageKey(),
}
for _, queueKey := range queues {
if err := s.redis.ZRem(ctx, queueKey, member).Err(); err != nil {
s.logger.Warn("从队列移除卡失败",
zap.String("queue", queueKey),
zap.Uint("card_id", cardID),
zap.Error(err))
}
}
}
// RequeueCard 重新将卡加入队列
// 用于任务完成后重新入队
func (s *Scheduler) RequeueCard(ctx context.Context, cardID uint, taskType string) error {
// 从数据库加载卡信息
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
return err
}
// 匹配配置
config := s.MatchConfig(card)
if config == nil {
return nil
}
now := time.Now()
var queueKey string
var intervalSeconds int
switch taskType {
case constants.TaskTypePollingRealname:
if config.RealnameCheckInterval == nil {
return nil
}
queueKey = constants.RedisPollingQueueRealnameKey()
intervalSeconds = *config.RealnameCheckInterval
case constants.TaskTypePollingCarddata:
if config.CarddataCheckInterval == nil {
return nil
}
queueKey = constants.RedisPollingQueueCarddataKey()
intervalSeconds = *config.CarddataCheckInterval
case constants.TaskTypePollingPackage:
if config.PackageCheckInterval == nil {
return nil
}
queueKey = constants.RedisPollingQueuePackageKey()
intervalSeconds = *config.PackageCheckInterval
default:
return nil
}
nextCheck := now.Add(time.Duration(intervalSeconds) * time.Second)
return s.addToQueue(ctx, queueKey, cardID, nextCheck)
}
// TriggerManualCheck 触发手动检查
func (s *Scheduler) TriggerManualCheck(ctx context.Context, cardID uint, taskType string) error {
key := constants.RedisPollingManualQueueKey(taskType)
return s.redis.RPush(ctx, key, formatUint(cardID)).Err()
}
// TriggerBatchManualCheck 批量触发手动检查
func (s *Scheduler) TriggerBatchManualCheck(ctx context.Context, cardIDs []uint, taskType string) error {
if len(cardIDs) == 0 {
return nil
}
key := constants.RedisPollingManualQueueKey(taskType)
// 转换为 interface{} 切片
members := make([]interface{}, len(cardIDs))
for i, id := range cardIDs {
members[i] = formatUint(id)
}
return s.redis.RPush(ctx, key, members...).Err()
}
// LazyLoad 懒加载卡信息
// 当卡未初始化但被访问时调用
func (s *Scheduler) LazyLoad(ctx context.Context, cardID uint) error {
// 检查是否已在缓存中
key := constants.RedisPollingCardInfoKey(cardID)
exists, err := s.redis.Exists(ctx, key).Result()
if err != nil {
return err
}
if exists > 0 {
return nil // 已缓存
}
// 从数据库加载
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
return err
}
// 初始化轮询
return s.initCardPolling(ctx, card)
}