All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
实现功能: - 实名状态检查轮询(可配置间隔) - 卡流量检查轮询(支持跨月流量追踪) - 套餐检查与超额自动停机 - 分布式并发控制(Redis 信号量) - 手动触发轮询(单卡/批量/条件筛选) - 数据清理配置与执行 - 告警规则与历史记录 - 实时监控统计(队列/性能/并发) 性能优化: - Redis 缓存卡信息,减少 DB 查询 - Pipeline 批量写入 Redis - 异步流量记录写入 - 渐进式初始化(10万卡/批) 压测工具(scripts/benchmark/): - Mock Gateway 模拟上游服务 - 测试卡生成器 - 配置初始化脚本 - 实时监控脚本 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
229 lines
5.9 KiB
Go
229 lines
5.9 KiB
Go
package polling
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/break/junhong_cmp_fiber/internal/model"
|
|
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
|
)
|
|
|
|
// OnCardCreated 卡创建时的回调
|
|
// 将新卡加入轮询队列
|
|
func (s *Scheduler) OnCardCreated(ctx context.Context, card *model.IotCard) {
|
|
if card == nil {
|
|
return
|
|
}
|
|
|
|
s.logger.Debug("卡创建回调", zap.Uint("card_id", card.ID))
|
|
|
|
if err := s.initCardPolling(ctx, card); err != nil {
|
|
s.logger.Error("初始化新卡轮询失败",
|
|
zap.Uint("card_id", card.ID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// OnBatchCardsCreated 批量卡创建时的回调
|
|
func (s *Scheduler) OnBatchCardsCreated(ctx context.Context, cards []*model.IotCard) {
|
|
if len(cards) == 0 {
|
|
return
|
|
}
|
|
|
|
s.logger.Info("批量卡创建回调", zap.Int("count", len(cards)))
|
|
|
|
for _, card := range cards {
|
|
if err := s.initCardPolling(ctx, card); err != nil {
|
|
s.logger.Warn("初始化批量导入卡轮询失败",
|
|
zap.Uint("card_id", card.ID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnCardStatusChanged 卡状态变化时的回调
|
|
// 重新匹配配置并更新轮询队列
|
|
func (s *Scheduler) OnCardStatusChanged(ctx context.Context, cardID uint) {
|
|
s.logger.Debug("卡状态变化回调", zap.Uint("card_id", cardID))
|
|
|
|
// 从数据库重新加载卡信息
|
|
card, err := s.iotCardStore.GetByID(ctx, cardID)
|
|
if err != nil {
|
|
s.logger.Error("加载卡信息失败",
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// 先从所有队列中移除
|
|
s.removeFromAllQueues(ctx, cardID)
|
|
|
|
// 重新初始化轮询
|
|
if err := s.initCardPolling(ctx, card); err != nil {
|
|
s.logger.Error("重新初始化卡轮询失败",
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// OnCardDeleted 卡删除时的回调
|
|
// 从轮询队列中移除
|
|
func (s *Scheduler) OnCardDeleted(ctx context.Context, cardID uint) {
|
|
s.logger.Debug("卡删除回调", zap.Uint("card_id", cardID))
|
|
|
|
// 从所有队列中移除
|
|
s.removeFromAllQueues(ctx, cardID)
|
|
|
|
// 删除缓存
|
|
key := constants.RedisPollingCardInfoKey(cardID)
|
|
if err := s.redis.Del(ctx, key).Err(); err != nil {
|
|
s.logger.Warn("删除卡缓存失败",
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// OnCardEnabled 卡启用轮询时的回调
|
|
func (s *Scheduler) OnCardEnabled(ctx context.Context, cardID uint) {
|
|
s.logger.Debug("卡启用轮询回调", zap.Uint("card_id", cardID))
|
|
|
|
// 从数据库加载卡信息
|
|
card, err := s.iotCardStore.GetByID(ctx, cardID)
|
|
if err != nil {
|
|
s.logger.Error("加载卡信息失败",
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// 初始化轮询
|
|
if err := s.initCardPolling(ctx, card); err != nil {
|
|
s.logger.Error("启用卡轮询失败",
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// OnCardDisabled 卡禁用轮询时的回调
|
|
func (s *Scheduler) OnCardDisabled(ctx context.Context, cardID uint) {
|
|
s.logger.Debug("卡禁用轮询回调", zap.Uint("card_id", cardID))
|
|
|
|
// 从所有队列中移除
|
|
s.removeFromAllQueues(ctx, cardID)
|
|
}
|
|
|
|
// removeFromAllQueues 从所有轮询队列中移除卡
|
|
func (s *Scheduler) removeFromAllQueues(ctx context.Context, cardID uint) {
|
|
member := formatUint(cardID)
|
|
|
|
queues := []string{
|
|
constants.RedisPollingQueueRealnameKey(),
|
|
constants.RedisPollingQueueCarddataKey(),
|
|
constants.RedisPollingQueuePackageKey(),
|
|
}
|
|
|
|
for _, queueKey := range queues {
|
|
if err := s.redis.ZRem(ctx, queueKey, member).Err(); err != nil {
|
|
s.logger.Warn("从队列移除卡失败",
|
|
zap.String("queue", queueKey),
|
|
zap.Uint("card_id", cardID),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// RequeueCard 重新将卡加入队列
|
|
// 用于任务完成后重新入队
|
|
func (s *Scheduler) RequeueCard(ctx context.Context, cardID uint, taskType string) error {
|
|
// 从数据库加载卡信息
|
|
card, err := s.iotCardStore.GetByID(ctx, cardID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 匹配配置
|
|
config := s.MatchConfig(card)
|
|
if config == nil {
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
var queueKey string
|
|
var intervalSeconds int
|
|
|
|
switch taskType {
|
|
case constants.TaskTypePollingRealname:
|
|
if config.RealnameCheckInterval == nil {
|
|
return nil
|
|
}
|
|
queueKey = constants.RedisPollingQueueRealnameKey()
|
|
intervalSeconds = *config.RealnameCheckInterval
|
|
case constants.TaskTypePollingCarddata:
|
|
if config.CarddataCheckInterval == nil {
|
|
return nil
|
|
}
|
|
queueKey = constants.RedisPollingQueueCarddataKey()
|
|
intervalSeconds = *config.CarddataCheckInterval
|
|
case constants.TaskTypePollingPackage:
|
|
if config.PackageCheckInterval == nil {
|
|
return nil
|
|
}
|
|
queueKey = constants.RedisPollingQueuePackageKey()
|
|
intervalSeconds = *config.PackageCheckInterval
|
|
default:
|
|
return nil
|
|
}
|
|
|
|
nextCheck := now.Add(time.Duration(intervalSeconds) * time.Second)
|
|
return s.addToQueue(ctx, queueKey, cardID, nextCheck)
|
|
}
|
|
|
|
// TriggerManualCheck 触发手动检查
|
|
func (s *Scheduler) TriggerManualCheck(ctx context.Context, cardID uint, taskType string) error {
|
|
key := constants.RedisPollingManualQueueKey(taskType)
|
|
return s.redis.RPush(ctx, key, formatUint(cardID)).Err()
|
|
}
|
|
|
|
// TriggerBatchManualCheck 批量触发手动检查
|
|
func (s *Scheduler) TriggerBatchManualCheck(ctx context.Context, cardIDs []uint, taskType string) error {
|
|
if len(cardIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
key := constants.RedisPollingManualQueueKey(taskType)
|
|
|
|
// 转换为 interface{} 切片
|
|
members := make([]interface{}, len(cardIDs))
|
|
for i, id := range cardIDs {
|
|
members[i] = formatUint(id)
|
|
}
|
|
|
|
return s.redis.RPush(ctx, key, members...).Err()
|
|
}
|
|
|
|
// LazyLoad 懒加载卡信息
|
|
// 当卡未初始化但被访问时调用
|
|
func (s *Scheduler) LazyLoad(ctx context.Context, cardID uint) error {
|
|
// 检查是否已在缓存中
|
|
key := constants.RedisPollingCardInfoKey(cardID)
|
|
exists, err := s.redis.Exists(ctx, key).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exists > 0 {
|
|
return nil // 已缓存
|
|
}
|
|
|
|
// 从数据库加载
|
|
card, err := s.iotCardStore.GetByID(ctx, cardID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 初始化轮询
|
|
return s.initCardPolling(ctx, card)
|
|
}
|