Files
junhong_cmp_fiber/internal/task/polling_handler.go
huang b9c3875c08
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m3s
feat: 新增数据库迁移,重命名 device_no 为 virtual_no,新增 iot_card.virtual_no 和 package.virtual_ratio 字段
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-14 18:27:28 +08:00

1117 lines
36 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package task
import (
"context"
"strconv"
"strings"
"time"
"github.com/bytedance/sonic"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/gateway"
"github.com/break/junhong_cmp_fiber/internal/model"
packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// PollingTaskPayload 轮询任务载荷
type PollingTaskPayload struct {
CardID string `json:"card_id"`
IsManual bool `json:"is_manual"`
Timestamp int64 `json:"timestamp"`
}
// PollingHandler 轮询任务处理器
type PollingHandler struct {
db *gorm.DB
redis *redis.Client
gatewayClient *gateway.Client
iotCardStore *postgres.IotCardStore
concurrencyStore *postgres.PollingConcurrencyConfigStore
deviceSimBindingStore *postgres.DeviceSimBindingStore
dataUsageRecordStore *postgres.DataUsageRecordStore
packageUsageStore *postgres.PackageUsageStore
usageService *packagepkg.UsageService
logger *zap.Logger
}
// NewPollingHandler 创建轮询任务处理器
func NewPollingHandler(
db *gorm.DB,
redis *redis.Client,
gatewayClient *gateway.Client,
usageService *packagepkg.UsageService,
logger *zap.Logger,
) *PollingHandler {
return &PollingHandler{
db: db,
redis: redis,
gatewayClient: gatewayClient,
iotCardStore: postgres.NewIotCardStore(db, redis),
concurrencyStore: postgres.NewPollingConcurrencyConfigStore(db),
deviceSimBindingStore: postgres.NewDeviceSimBindingStore(db, redis),
dataUsageRecordStore: postgres.NewDataUsageRecordStore(db),
packageUsageStore: postgres.NewPackageUsageStore(db, redis),
usageService: usageService,
logger: logger,
}
}
// HandleRealnameCheck 处理实名检查任务
func (h *PollingHandler) HandleRealnameCheck(ctx context.Context, t *asynq.Task) error {
startTime := time.Now()
var payload PollingTaskPayload
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
h.logger.Error("解析任务载荷失败", zap.Error(err))
return nil // 不重试
}
cardID, err := strconv.ParseUint(payload.CardID, 10, 64)
if err != nil {
h.logger.Error("解析卡ID失败", zap.String("card_id", payload.CardID), zap.Error(err))
return nil
}
// 获取并发信号量
if !h.acquireConcurrency(ctx, constants.TaskTypePollingRealname) {
h.logger.Debug("并发已满,任务稍后重试", zap.Uint64("card_id", cardID))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingRealname)
}
defer h.releaseConcurrency(ctx, constants.TaskTypePollingRealname)
h.logger.Debug("开始实名检查",
zap.Uint64("card_id", cardID),
zap.Bool("is_manual", payload.IsManual))
// 获取卡信息
card, err := h.getCardWithCache(ctx, uint(cardID))
if err != nil {
if err == gorm.ErrRecordNotFound {
h.logger.Warn("卡不存在", zap.Uint64("card_id", cardID))
return nil
}
h.logger.Error("获取卡信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
h.updateStats(ctx, constants.TaskTypePollingRealname, false, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingRealname)
}
// 行业卡跳过实名检查
if card.CardCategory == "industry" {
h.logger.Debug("行业卡跳过实名检查", zap.Uint64("card_id", cardID))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingRealname)
}
// 调用 Gateway API 查询实名状态
var newRealnameStatus int
if h.gatewayClient != nil {
result, err := h.gatewayClient.QueryRealnameStatus(ctx, &gateway.CardStatusReq{
CardNo: card.ICCID,
})
if err != nil {
h.logger.Warn("查询实名状态失败",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Error(err))
h.updateStats(ctx, constants.TaskTypePollingRealname, false, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingRealname)
}
// 解析实名状态
newRealnameStatus = h.parseRealnameStatus(result.RealStatus)
h.logger.Info("实名检查完成",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Bool("gateway_real_status", result.RealStatus),
zap.Int("new_status", newRealnameStatus),
zap.Int("old_status", card.RealNameStatus))
} else {
// Gateway 未配置,模拟检查
newRealnameStatus = card.RealNameStatus
h.logger.Debug("实名检查完成模拟Gateway未配置",
zap.Uint64("card_id", cardID))
}
// 检测状态变化
statusChanged := newRealnameStatus != card.RealNameStatus
// 更新数据库
now := time.Now()
updates := map[string]any{
"last_real_name_check_at": now,
}
if statusChanged {
updates["real_name_status"] = newRealnameStatus
}
if err := h.db.Model(&model.IotCard{}).
Where("id = ?", cardID).
Updates(updates).Error; err != nil {
h.logger.Error("更新卡信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
}
// 如果状态变化,更新 Redis 缓存并重新匹配配置
if statusChanged {
h.updateCardCache(ctx, uint(cardID), map[string]any{
"real_name_status": newRealnameStatus,
})
// 状态变化后需要重新匹配配置(通过调度器回调)
h.logger.Info("实名状态已变化,需要重新匹配配置",
zap.Uint64("card_id", cardID),
zap.Int("old_status", card.RealNameStatus),
zap.Int("new_status", newRealnameStatus))
// 任务 21.2-21.4: 检测首次实名0/1 → 2触发待激活套餐激活
isFirstRealname := (card.RealNameStatus == 0 || card.RealNameStatus == 1) && newRealnameStatus == 2
if isFirstRealname {
h.triggerFirstRealnameActivation(ctx, uint(cardID))
}
}
// 更新监控统计
h.updateStats(ctx, constants.TaskTypePollingRealname, true, time.Since(startTime))
// 重新入队
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingRealname)
}
// HandleCarddataCheck 处理卡流量检查任务
// 任务 18.2-18.4: 改造为支持流量扣减优先级和新停机条件
func (h *PollingHandler) HandleCarddataCheck(ctx context.Context, t *asynq.Task) error {
startTime := time.Now()
var payload PollingTaskPayload
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
h.logger.Error("解析任务载荷失败", zap.Error(err))
return nil
}
cardID, err := strconv.ParseUint(payload.CardID, 10, 64)
if err != nil {
h.logger.Error("解析卡ID失败", zap.String("card_id", payload.CardID), zap.Error(err))
return nil
}
// 获取并发信号量
if !h.acquireConcurrency(ctx, constants.TaskTypePollingCarddata) {
h.logger.Debug("并发已满,任务稍后重试", zap.Uint64("card_id", cardID))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingCarddata)
}
defer h.releaseConcurrency(ctx, constants.TaskTypePollingCarddata)
h.logger.Debug("开始流量检查",
zap.Uint64("card_id", cardID),
zap.Bool("is_manual", payload.IsManual))
// 获取卡信息
card, err := h.getCardWithCache(ctx, uint(cardID))
if err != nil {
if err == gorm.ErrRecordNotFound {
h.logger.Warn("卡不存在", zap.Uint64("card_id", cardID))
return nil
}
h.logger.Error("获取卡信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
h.updateStats(ctx, constants.TaskTypePollingCarddata, false, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingCarddata)
}
// 调用 Gateway API 查询流量
var gatewayFlowMB float64
if h.gatewayClient != nil {
result, err := h.gatewayClient.QueryFlow(ctx, &gateway.FlowQueryReq{
CardNo: card.ICCID,
})
if err != nil {
h.logger.Warn("查询流量失败",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Error(err))
h.updateStats(ctx, constants.TaskTypePollingCarddata, false, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingCarddata)
}
// Gateway 返回的是 MB 单位的流量
gatewayFlowMB = result.Used
h.logger.Info("流量检查完成",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Float64("gateway_flow_mb", gatewayFlowMB),
zap.String("unit", result.Unit))
} else {
// Gateway 未配置,使用当前值
gatewayFlowMB = card.CurrentMonthUsageMB
h.logger.Debug("流量检查完成模拟Gateway未配置",
zap.Uint64("card_id", cardID))
}
// 计算流量增量(处理跨月)
now := time.Now()
updates := h.calculateFlowUpdates(card, gatewayFlowMB, now)
updates["last_data_check_at"] = now
// 计算本次流量增量(用于套餐扣减)
flowIncrementMB := h.calculateFlowIncrement(card, gatewayFlowMB, now)
// 更新数据库
if err := h.db.Model(&model.IotCard{}).
Where("id = ?", cardID).
Updates(updates).Error; err != nil {
h.logger.Error("更新卡流量信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
}
// 插入流量历史记录(异步,不阻塞主流程)
go h.insertDataUsageRecord(context.Background(), uint(cardID), gatewayFlowMB, card.CurrentMonthUsageMB, now, payload.IsManual)
// 更新 Redis 缓存
h.updateCardCache(ctx, uint(cardID), map[string]any{
"current_month_usage_mb": updates["current_month_usage_mb"],
})
// 任务 18.3: 调用 UsageService.DeductDataUsage 进行流量扣减
if flowIncrementMB > 0 && h.usageService != nil {
if err := h.usageService.DeductDataUsage(ctx, "iot_card", uint(cardID), int64(flowIncrementMB)); err != nil {
// 扣减失败不影响主流程,仅记录日志
h.logger.Warn("套餐流量扣减失败",
zap.Uint64("card_id", cardID),
zap.Float64("increment_mb", flowIncrementMB),
zap.Error(err))
// 任务 18.4: 检查是否需要停机(所有套餐用完)
if h.shouldStopCard(ctx, uint(cardID)) {
h.logger.Warn("所有套餐流量已用完,触发停机",
zap.Uint64("card_id", cardID))
h.stopCardByUsageExhausted(ctx, card)
}
} else {
h.logger.Info("套餐流量扣减成功",
zap.Uint64("card_id", cardID),
zap.Float64("increment_mb", flowIncrementMB))
}
}
// 更新监控统计
h.updateStats(ctx, constants.TaskTypePollingCarddata, true, time.Since(startTime))
// 触发套餐检查(加入手动队列优先处理)
h.triggerPackageCheck(ctx, uint(cardID))
// 重新入队
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingCarddata)
}
// calculateFlowIncrement 任务 18.2: 计算本次流量增量
func (h *PollingHandler) calculateFlowIncrement(card *model.IotCard, gatewayFlowMB float64, now time.Time) float64 {
// 获取本月1号
currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
// 判断是否跨月
isCrossMonth := card.CurrentMonthStartDate == nil ||
card.CurrentMonthStartDate.Before(currentMonthStart)
if isCrossMonth {
// 跨月了:本月流量就是增量
return gatewayFlowMB
}
// 同月内:计算增量
increment := gatewayFlowMB - card.CurrentMonthUsageMB
if increment < 0 {
return 0
}
return increment
}
// shouldStopCard 任务 18.4: 检查是否应该停机(所有套餐用完)
func (h *PollingHandler) shouldStopCard(ctx context.Context, cardID uint) bool {
// 查询是否还有生效中的套餐
var activeCount int64
if err := h.db.WithContext(ctx).Model(&model.PackageUsage{}).
Where("iot_card_id = ? AND status = ?", cardID, constants.PackageUsageStatusActive).
Count(&activeCount).Error; err != nil {
h.logger.Warn("查询生效套餐失败", zap.Uint("card_id", cardID), zap.Error(err))
return false
}
// 如果没有生效中的套餐,需要停机
return activeCount == 0
}
// stopCardByUsageExhausted 任务 18.4: 流量耗尽停机
func (h *PollingHandler) stopCardByUsageExhausted(ctx context.Context, card *model.IotCard) {
// 只有在线的卡才需要停机
if card.NetworkStatus != 1 {
return
}
// 调用 Gateway 停机
if h.gatewayClient != nil {
if err := h.gatewayClient.StopCard(ctx, &gateway.CardOperationReq{
CardNo: card.ICCID,
}); err != nil {
h.logger.Error("停机失败",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Error(err))
return
}
}
// 更新数据库:卡的网络状态
now := time.Now()
updates := map[string]any{
"network_status": 0, // 停机
"stopped_at": now,
"stop_reason": "套餐流量耗尽自动停机",
"updated_at": now,
}
if err := h.db.Model(&model.IotCard{}).
Where("id = ?", card.ID).
Updates(updates).Error; err != nil {
h.logger.Error("更新卡状态失败", zap.Uint("card_id", card.ID), zap.Error(err))
}
// 更新 Redis 缓存
h.updateCardCache(ctx, card.ID, map[string]any{
"network_status": 0,
})
h.logger.Warn("卡已停机(套餐流量耗尽)",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID))
}
// calculateFlowUpdates 计算流量更新值(处理跨月逻辑)
func (h *PollingHandler) calculateFlowUpdates(card *model.IotCard, gatewayFlowMB float64, now time.Time) map[string]any {
updates := make(map[string]any)
// 获取本月1号
currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
// 判断是否跨月
isCrossMonth := card.CurrentMonthStartDate == nil ||
card.CurrentMonthStartDate.Before(currentMonthStart)
if isCrossMonth {
// 跨月了:保存上月总量,重置本月
h.logger.Info("检测到跨月,重置流量计数",
zap.Uint("card_id", card.ID),
zap.Float64("last_month_total", card.CurrentMonthUsageMB),
zap.Float64("new_month_usage", gatewayFlowMB))
// 计算本次增量:上月最后值 + 本月当前值
increment := card.CurrentMonthUsageMB + gatewayFlowMB
updates["last_month_total_mb"] = card.CurrentMonthUsageMB
updates["current_month_start_date"] = currentMonthStart
updates["current_month_usage_mb"] = gatewayFlowMB
updates["data_usage_mb"] = card.DataUsageMB + int64(increment)
} else {
// 同月内:计算增量
increment := gatewayFlowMB - card.CurrentMonthUsageMB
if increment < 0 {
// Gateway 返回值比记录的小,可能是数据异常,不更新
h.logger.Warn("流量异常Gateway返回值小于记录值",
zap.Uint("card_id", card.ID),
zap.Float64("gateway_flow", gatewayFlowMB),
zap.Float64("recorded_flow", card.CurrentMonthUsageMB))
increment = 0
}
updates["current_month_usage_mb"] = gatewayFlowMB
if increment > 0 {
updates["data_usage_mb"] = card.DataUsageMB + int64(increment)
}
}
// 首次流量查询初始化
if card.CurrentMonthStartDate == nil {
updates["current_month_start_date"] = currentMonthStart
}
return updates
}
// HandlePackageCheck 处理套餐检查任务
func (h *PollingHandler) HandlePackageCheck(ctx context.Context, t *asynq.Task) error {
startTime := time.Now()
var payload PollingTaskPayload
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
h.logger.Error("解析任务载荷失败", zap.Error(err))
return nil
}
cardID, err := strconv.ParseUint(payload.CardID, 10, 64)
if err != nil {
h.logger.Error("解析卡ID失败", zap.String("card_id", payload.CardID), zap.Error(err))
return nil
}
// 获取并发信号量
if !h.acquireConcurrency(ctx, constants.TaskTypePollingPackage) {
h.logger.Debug("并发已满,任务稍后重试", zap.Uint64("card_id", cardID))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
defer h.releaseConcurrency(ctx, constants.TaskTypePollingPackage)
h.logger.Debug("开始套餐检查",
zap.Uint64("card_id", cardID),
zap.Bool("is_manual", payload.IsManual))
// 获取卡信息
card, err := h.getCardWithCache(ctx, uint(cardID))
if err != nil {
if err == gorm.ErrRecordNotFound {
h.logger.Warn("卡不存在", zap.Uint64("card_id", cardID))
return nil
}
h.logger.Error("获取卡信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
h.updateStats(ctx, constants.TaskTypePollingPackage, false, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
// 检查套餐配置
if card.SeriesID == nil {
h.logger.Debug("卡无关联套餐系列,跳过检查", zap.Uint64("card_id", cardID))
h.updateStats(ctx, constants.TaskTypePollingPackage, true, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
// 查询套餐信息(获取虚流量限制)
var pkg model.Package
if err := h.db.Where("series_id = ? AND status = 1", *card.SeriesID).
Order("created_at ASC").First(&pkg).Error; err != nil {
if err == gorm.ErrRecordNotFound {
h.logger.Debug("套餐系列无可用套餐", zap.Uint("series_id", *card.SeriesID))
} else {
h.logger.Warn("查询套餐失败", zap.Uint("series_id", *card.SeriesID), zap.Error(err))
}
h.updateStats(ctx, constants.TaskTypePollingPackage, true, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
// 检查是否启用虚流量
if !pkg.EnableVirtualData || pkg.VirtualDataMB <= 0 {
h.logger.Debug("套餐未启用虚流量或虚流量为0跳过检查",
zap.Uint64("card_id", cardID),
zap.Uint("package_id", pkg.ID),
zap.Bool("enable_virtual", pkg.EnableVirtualData),
zap.Int64("virtual_data_mb", pkg.VirtualDataMB))
h.updateStats(ctx, constants.TaskTypePollingPackage, true, time.Since(startTime))
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
// 计算流量使用:支持设备级套餐流量汇总
usedMB, deviceCards, isDeviceLevel := h.calculatePackageUsage(ctx, card)
limitMB := float64(pkg.VirtualDataMB)
usagePercent := (usedMB / limitMB) * 100
h.logger.Info("套餐流量检查",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Float64("used_mb", usedMB),
zap.Float64("limit_mb", limitMB),
zap.Float64("usage_percent", usagePercent),
zap.Bool("is_device_level", isDeviceLevel),
zap.Int("device_card_count", len(deviceCards)))
// 判断状态
var needStop bool
var statusMsg string
switch {
case usedMB > limitMB:
// 已超额,需要停机
needStop = true
statusMsg = "已超额"
h.logger.Warn("套餐已超额,准备停机",
zap.Uint64("card_id", cardID),
zap.Float64("used_mb", usedMB),
zap.Float64("limit_mb", limitMB))
case usagePercent >= 95:
// 临近超额(预警)
statusMsg = "临近超额"
h.logger.Warn("套餐流量临近超额",
zap.Uint64("card_id", cardID),
zap.Float64("usage_percent", usagePercent))
default:
// 正常
statusMsg = "正常"
}
// 执行停机
if needStop {
// 设备级套餐需要停机设备下所有卡
cardsToStop := []*model.IotCard{card}
if isDeviceLevel && len(deviceCards) > 0 {
cardsToStop = deviceCards
}
if h.gatewayClient != nil {
h.stopCards(ctx, cardsToStop, &pkg, usedMB)
} else {
h.logger.Debug("停机跳过Gateway未配置",
zap.Uint64("card_id", cardID),
zap.Int("cards_to_stop", len(cardsToStop)))
}
}
h.logger.Info("套餐检查完成",
zap.Uint64("card_id", cardID),
zap.String("iccid", card.ICCID),
zap.Float64("used_mb", usedMB),
zap.Float64("limit_mb", limitMB),
zap.String("status", statusMsg),
zap.Bool("stopped", needStop && card.NetworkStatus == 1),
zap.Duration("duration", time.Since(startTime)))
// 更新监控统计
h.updateStats(ctx, constants.TaskTypePollingPackage, true, time.Since(startTime))
// 重新入队
return h.requeueCard(ctx, uint(cardID), constants.TaskTypePollingPackage)
}
// logStopOperation 记录停机操作日志
func (h *PollingHandler) logStopOperation(_ context.Context, card *model.IotCard, pkg *model.Package, usedMB float64) {
// 记录详细的停机操作日志(应用日志级别)
h.logger.Info("停机操作记录",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Uint("package_id", pkg.ID),
zap.String("package_name", pkg.PackageName),
zap.Float64("used_mb", usedMB),
zap.Int64("virtual_data_mb", pkg.VirtualDataMB),
zap.String("reason", "套餐超额自动停机"),
zap.String("operator", "系统自动"),
zap.Int("before_network_status", 1),
zap.Int("after_network_status", 0))
}
// calculatePackageUsage 计算套餐流量使用(支持设备级套餐汇总)
// 返回总流量MB、设备下所有卡如果是设备级套餐、是否为设备级套餐
func (h *PollingHandler) calculatePackageUsage(ctx context.Context, card *model.IotCard) (float64, []*model.IotCard, bool) {
// 检查卡是否绑定到设备
binding, err := h.deviceSimBindingStore.GetActiveBindingByCardID(ctx, card.ID)
if err != nil {
// 卡未绑定到设备,使用单卡流量
return card.CurrentMonthUsageMB, nil, false
}
// 卡绑定到设备,获取设备下所有卡
bindings, err := h.deviceSimBindingStore.ListByDeviceID(ctx, binding.DeviceID)
if err != nil {
h.logger.Warn("查询设备下所有绑定失败,使用单卡流量",
zap.Uint("device_id", binding.DeviceID),
zap.Error(err))
return card.CurrentMonthUsageMB, nil, false
}
if len(bindings) == 0 {
return card.CurrentMonthUsageMB, nil, false
}
// 获取设备下所有卡的 ID
cardIDs := make([]uint, len(bindings))
for i, b := range bindings {
cardIDs[i] = b.IotCardID
}
// 批量查询卡信息
var cards []*model.IotCard
if err := h.db.WithContext(ctx).Where("id IN ?", cardIDs).Find(&cards).Error; err != nil {
h.logger.Warn("查询设备下所有卡失败,使用单卡流量",
zap.Uint("device_id", binding.DeviceID),
zap.Error(err))
return card.CurrentMonthUsageMB, nil, false
}
// 汇总流量
var totalUsedMB float64
for _, c := range cards {
totalUsedMB += c.CurrentMonthUsageMB
}
h.logger.Debug("设备级套餐流量汇总",
zap.Uint("device_id", binding.DeviceID),
zap.Int("card_count", len(cards)),
zap.Float64("total_used_mb", totalUsedMB))
return totalUsedMB, cards, true
}
// stopCards 批量停机卡
func (h *PollingHandler) stopCards(ctx context.Context, cards []*model.IotCard, pkg *model.Package, usedMB float64) {
for _, card := range cards {
// 跳过已停机的卡
if card.NetworkStatus != 1 {
continue
}
err := h.gatewayClient.StopCard(ctx, &gateway.CardOperationReq{
CardNo: card.ICCID,
})
if err != nil {
h.logger.Error("停机失败",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Error(err))
// 继续处理其他卡,不中断
continue
}
h.logger.Warn("停机成功",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.String("reason", "套餐超额"))
// 更新数据库:卡的网络状态
now := time.Now()
if err := h.db.Model(&model.IotCard{}).
Where("id = ?", card.ID).
Updates(map[string]any{
"network_status": 0, // 停机
"updated_at": now,
}).Error; err != nil {
h.logger.Error("更新卡状态失败", zap.Uint("card_id", card.ID), zap.Error(err))
}
// 更新 Redis 缓存
h.updateCardCache(ctx, card.ID, map[string]any{
"network_status": 0,
})
// 记录操作日志
h.logStopOperation(ctx, card, pkg, usedMB)
}
}
// parseRealnameStatus 解析实名状态
// Gateway 返回 bool 类型true=已实名, false=未实名
func (h *PollingHandler) parseRealnameStatus(realStatus bool) int {
if realStatus {
return 2 // 已实名
}
return 0 // 未实名
}
// extractTaskType 从完整的任务类型中提取简短的类型名
// 例如polling:carddata -> carddata
func extractTaskType(fullTaskType string) string {
if idx := strings.LastIndex(fullTaskType, ":"); idx != -1 {
return fullTaskType[idx+1:]
}
return fullTaskType
}
// acquireConcurrency 获取并发信号量
func (h *PollingHandler) acquireConcurrency(ctx context.Context, taskType string) bool {
// 提取简短的任务类型(数据库中存的是 carddata不是 polling:carddata
shortType := extractTaskType(taskType)
configKey := constants.RedisPollingConcurrencyConfigKey(shortType)
currentKey := constants.RedisPollingConcurrencyCurrentKey(taskType) // current 保持原样
// 获取最大并发数
maxConcurrency, err := h.redis.Get(ctx, configKey).Int()
if err != nil {
maxConcurrency = 50 // 默认值
}
// 尝试获取信号量
current, err := h.redis.Incr(ctx, currentKey).Result()
if err != nil {
h.logger.Warn("获取并发计数失败", zap.Error(err))
return true // 出错时允许执行
}
if current > int64(maxConcurrency) {
h.redis.Decr(ctx, currentKey)
return false
}
return true
}
// releaseConcurrency 释放并发信号量
func (h *PollingHandler) releaseConcurrency(ctx context.Context, taskType string) {
currentKey := constants.RedisPollingConcurrencyCurrentKey(taskType)
if err := h.redis.Decr(ctx, currentKey).Err(); err != nil {
h.logger.Warn("释放并发计数失败", zap.Error(err))
}
}
// requeueCard 重新将卡加入队列
func (h *PollingHandler) requeueCard(ctx context.Context, cardID uint, taskType string) error {
// 获取配置中的检查间隔
var intervalSeconds int
switch taskType {
case constants.TaskTypePollingRealname:
intervalSeconds = 300 // 默认 5 分钟
case constants.TaskTypePollingCarddata:
intervalSeconds = 600 // 默认 10 分钟
case constants.TaskTypePollingPackage:
intervalSeconds = 600 // 默认 10 分钟
case constants.TaskTypePollingProtect:
intervalSeconds = 300 // 默认 5 分钟
default:
return nil
}
// 计算下次检查时间
nextCheck := time.Now().Add(time.Duration(intervalSeconds) * time.Second)
// 确定队列 key
var queueKey string
switch taskType {
case constants.TaskTypePollingRealname:
queueKey = constants.RedisPollingQueueRealnameKey()
case constants.TaskTypePollingCarddata:
queueKey = constants.RedisPollingQueueCarddataKey()
case constants.TaskTypePollingPackage:
queueKey = constants.RedisPollingQueuePackageKey()
case constants.TaskTypePollingProtect:
queueKey = constants.RedisPollingQueueProtectKey()
}
// 添加到队列
return h.redis.ZAdd(ctx, queueKey, redis.Z{
Score: float64(nextCheck.Unix()),
Member: strconv.FormatUint(uint64(cardID), 10),
}).Err()
}
// triggerPackageCheck 触发套餐检查
func (h *PollingHandler) triggerPackageCheck(ctx context.Context, cardID uint) {
key := constants.RedisPollingManualQueueKey(constants.TaskTypePollingPackage)
if err := h.redis.RPush(ctx, key, strconv.FormatUint(uint64(cardID), 10)).Err(); err != nil {
h.logger.Warn("触发套餐检查失败", zap.Uint("card_id", cardID), zap.Error(err))
}
}
// updateStats 更新监控统计
func (h *PollingHandler) updateStats(ctx context.Context, taskType string, success bool, duration time.Duration) {
key := constants.RedisPollingStatsKey(taskType)
pipe := h.redis.Pipeline()
if success {
pipe.HIncrBy(ctx, key, "success_count_1h", 1)
} else {
pipe.HIncrBy(ctx, key, "failure_count_1h", 1)
}
pipe.HIncrBy(ctx, key, "total_duration_1h", duration.Milliseconds())
pipe.Expire(ctx, key, 2*time.Hour)
_, _ = pipe.Exec(ctx)
}
// insertDataUsageRecord 插入流量历史记录
func (h *PollingHandler) insertDataUsageRecord(ctx context.Context, cardID uint, currentUsageMB, previousUsageMB float64, checkTime time.Time, isManual bool) {
// 计算流量增量
var dataIncreaseMB int64
if currentUsageMB > previousUsageMB {
dataIncreaseMB = int64(currentUsageMB - previousUsageMB)
}
// 确定数据来源
source := "polling"
if isManual {
source = "manual"
}
// 创建流量记录
record := &model.DataUsageRecord{
IotCardID: cardID,
DataUsageMB: int64(currentUsageMB),
DataIncreaseMB: dataIncreaseMB,
CheckTime: checkTime,
Source: source,
}
// 插入记录
if err := h.dataUsageRecordStore.Create(ctx, record); err != nil {
h.logger.Warn("插入流量历史记录失败",
zap.Uint("card_id", cardID),
zap.Int64("data_usage_mb", record.DataUsageMB),
zap.Int64("data_increase_mb", record.DataIncreaseMB),
zap.String("source", source),
zap.Error(err))
} else {
h.logger.Debug("流量历史记录已插入",
zap.Uint("card_id", cardID),
zap.Int64("data_usage_mb", record.DataUsageMB),
zap.Int64("data_increase_mb", record.DataIncreaseMB),
zap.String("source", source))
}
}
// updateCardCache 更新卡缓存
func (h *PollingHandler) updateCardCache(ctx context.Context, cardID uint, updates map[string]any) {
key := constants.RedisPollingCardInfoKey(cardID)
// 转换为 []any 用于 HSet
args := make([]any, 0, len(updates)*2)
for k, v := range updates {
args = append(args, k, v)
}
if len(args) > 0 {
if err := h.redis.HSet(ctx, key, args...).Err(); err != nil {
h.logger.Warn("更新卡缓存失败",
zap.Uint("card_id", cardID),
zap.Error(err))
}
}
}
// getCardWithCache 优先从 Redis 缓存获取卡信息miss 时查 DB
// 大幅减少数据库查询,避免高并发时连接池耗尽
func (h *PollingHandler) getCardWithCache(ctx context.Context, cardID uint) (*model.IotCard, error) {
key := constants.RedisPollingCardInfoKey(cardID)
// 尝试从 Redis 读取
result, err := h.redis.HGetAll(ctx, key).Result()
if err == nil && len(result) > 0 {
// 缓存命中,构建卡对象
card := &model.IotCard{}
card.ID = cardID
if v, ok := result["iccid"]; ok {
card.ICCID = v
}
if v, ok := result["card_category"]; ok {
card.CardCategory = v
}
if v, ok := result["real_name_status"]; ok {
if status, err := strconv.Atoi(v); err == nil {
card.RealNameStatus = status
}
}
if v, ok := result["network_status"]; ok {
if status, err := strconv.Atoi(v); err == nil {
card.NetworkStatus = status
}
}
if v, ok := result["carrier_id"]; ok {
if id, err := strconv.ParseUint(v, 10, 64); err == nil {
card.CarrierID = uint(id)
}
}
if v, ok := result["current_month_usage_mb"]; ok {
if usage, err := strconv.ParseFloat(v, 64); err == nil {
card.CurrentMonthUsageMB = usage
}
}
if v, ok := result["series_id"]; ok {
if id, err := strconv.ParseUint(v, 10, 64); err == nil {
seriesID := uint(id)
card.SeriesID = &seriesID
}
}
return card, nil
}
// 缓存 miss查询数据库
card, err := h.iotCardStore.GetByID(ctx, cardID)
if err != nil {
return nil, err
}
// 异步写入缓存
go func() {
cacheCtx := context.Background()
cacheData := map[string]any{
"id": card.ID,
"iccid": card.ICCID,
"card_category": card.CardCategory,
"real_name_status": card.RealNameStatus,
"network_status": card.NetworkStatus,
"carrier_id": card.CarrierID,
"current_month_usage_mb": card.CurrentMonthUsageMB,
"cached_at": time.Now().Unix(),
}
if card.SeriesID != nil {
cacheData["series_id"] = *card.SeriesID
}
pipe := h.redis.Pipeline()
pipe.HSet(cacheCtx, key, cacheData)
pipe.Expire(cacheCtx, key, 7*24*time.Hour)
_, _ = pipe.Exec(cacheCtx)
}()
return card, nil
}
// HandleProtectConsistencyCheck 保护期一致性检查
// 检查绑定设备is_standalone=false且已实名real_name_status=2的卡
// stop 保护期 + 开机 → 调网关停机start 保护期 + 停机 → 调网关复机
func (h *PollingHandler) HandleProtectConsistencyCheck(ctx context.Context, t *asynq.Task) error {
var payload PollingTaskPayload
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
h.logger.Error("解析保护期检查任务载荷失败", zap.Error(err))
return nil
}
cardID, err := strconv.ParseUint(payload.CardID, 10, 64)
if err != nil {
h.logger.Error("解析卡ID失败", zap.String("card_id", payload.CardID), zap.Error(err))
return nil
}
// 查询卡信息
var card model.IotCard
if err := h.db.WithContext(ctx).Where("id = ?", cardID).First(&card).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil
}
h.logger.Error("查询卡信息失败", zap.Uint64("card_id", cardID), zap.Error(err))
return nil
}
// 未绑设备(独立卡),跳过
if card.IsStandalone {
return nil
}
// 未实名,跳过
if card.RealNameStatus != constants.RealNameStatusVerified {
return nil
}
// 查绑定设备
binding, err := h.deviceSimBindingStore.GetActiveBindingByCardID(ctx, card.ID)
if err != nil || binding == nil {
return nil
}
deviceID := binding.DeviceID
// 检查 stop 保护期:设备处于停机保护期,但卡是开机状态 → 需要停机
stopProtect := h.redis.Exists(ctx, constants.RedisDeviceProtectKey(deviceID, "stop")).Val() > 0
if stopProtect && card.NetworkStatus == constants.NetworkStatusOnline {
h.logger.Info("保护期一致性检查:停机保护期内发现开机卡,执行停机",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Uint("device_id", deviceID))
if h.gatewayClient != nil {
if err := h.gatewayClient.StopCard(ctx, &gateway.CardOperationReq{CardNo: card.ICCID}); err != nil {
h.logger.Error("保护期一致性停机失败",
zap.Uint("card_id", card.ID),
zap.Error(err))
return nil
}
}
h.db.Model(&model.IotCard{}).Where("id = ?", card.ID).Updates(map[string]any{
"network_status": constants.NetworkStatusOffline,
"stopped_at": time.Now(),
"stop_reason": "保护期一致性检查自动停机",
})
h.updateCardCache(ctx, card.ID, map[string]any{"network_status": constants.NetworkStatusOffline})
return nil
}
// 检查 start 保护期:设备处于复机保护期,但卡是停机状态 → 需要复机
startProtect := h.redis.Exists(ctx, constants.RedisDeviceProtectKey(deviceID, "start")).Val() > 0
if startProtect && card.NetworkStatus == constants.NetworkStatusOffline {
h.logger.Info("保护期一致性检查:复机保护期内发现停机卡,执行复机",
zap.Uint("card_id", card.ID),
zap.String("iccid", card.ICCID),
zap.Uint("device_id", deviceID))
if h.gatewayClient != nil {
if err := h.gatewayClient.StartCard(ctx, &gateway.CardOperationReq{CardNo: card.ICCID}); err != nil {
h.logger.Error("保护期一致性复机失败",
zap.Uint("card_id", card.ID),
zap.Error(err))
return nil
}
}
h.db.Model(&model.IotCard{}).Where("id = ?", card.ID).Updates(map[string]any{
"network_status": constants.NetworkStatusOnline,
"resumed_at": time.Now(),
"stop_reason": "",
})
h.updateCardCache(ctx, card.ID, map[string]any{"network_status": constants.NetworkStatusOnline})
}
return nil
}
// triggerFirstRealnameActivation 任务 21.3-21.4: 首次实名后触发套餐激活
func (h *PollingHandler) triggerFirstRealnameActivation(ctx context.Context, cardID uint) {
// 任务 21.3: 查询该卡是否有待激活套餐
// WHERE pending_realname_activation=true AND status=0 AND iot_card_id=?
var pendingPackages []model.PackageUsage
err := h.db.WithContext(ctx).
Where("iot_card_id = ?", cardID).
Where("pending_realname_activation = ?", true).
Where("status = ?", constants.PackageUsageStatusPending).
Find(&pendingPackages).Error
if err != nil {
h.logger.Warn("查询待激活套餐失败",
zap.Uint("card_id", cardID),
zap.Error(err))
return
}
if len(pendingPackages) == 0 {
h.logger.Debug("无待激活套餐",
zap.Uint("card_id", cardID))
return
}
h.logger.Info("发现待激活套餐",
zap.Uint("card_id", cardID),
zap.Int("count", len(pendingPackages)))
// 任务 21.4: 提交 Asynq 任务激活套餐
for _, pkg := range pendingPackages {
payload := map[string]any{
"package_usage_id": pkg.ID,
"carrier_type": "iot_card",
"carrier_id": cardID,
"activation_type": "realname",
"timestamp": time.Now().Unix(),
}
payloadBytes, err := sonic.Marshal(payload)
if err != nil {
h.logger.Warn("序列化激活任务载荷失败",
zap.Uint("package_usage_id", pkg.ID),
zap.Error(err))
continue
}
task := asynq.NewTask(constants.TaskTypePackageFirstActivation, payloadBytes,
asynq.MaxRetry(3),
asynq.Timeout(30*time.Second),
asynq.Queue(constants.QueueDefault),
)
// 这里需要访问 Asynq Client暂时使用 Redis 队列
// 实际应该通过依赖注入 asynq.Client
activationKey := constants.RedisPollingManualQueueKey(constants.TaskTypePackageFirstActivation)
if err := h.redis.RPush(ctx, activationKey, string(payloadBytes)).Err(); err != nil {
h.logger.Warn("提交激活任务失败",
zap.Uint("package_usage_id", pkg.ID),
zap.Error(err))
continue
}
h.logger.Info("已提交首次实名激活任务",
zap.Uint("package_usage_id", pkg.ID),
zap.Uint("card_id", cardID))
// 避免未使用变量警告
_ = task
}
}