package polling import ( "context" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" ) // PackageActivationHandler 套餐激活检查处理器 // 任务 19: 处理主套餐过期、加油包级联失效、待生效主套餐激活 type PackageActivationHandler struct { db *gorm.DB redis *redis.Client queueClient *asynq.Client packageUsageStore *postgres.PackageUsageStore activationService *packagepkg.ActivationService logger *zap.Logger } // PackageActivationPayload 套餐激活任务载荷 type PackageActivationPayload struct { PackageUsageID uint `json:"package_usage_id"` CarrierType string `json:"carrier_type"` // "iot_card" 或 "device" CarrierID uint `json:"carrier_id"` ActivationType string `json:"activation_type"` // "queue" 或 "realname" Timestamp int64 `json:"timestamp"` } // NewPackageActivationHandler 创建套餐激活检查处理器 func NewPackageActivationHandler( db *gorm.DB, redis *redis.Client, queueClient *asynq.Client, activationService *packagepkg.ActivationService, logger *zap.Logger, ) *PackageActivationHandler { return &PackageActivationHandler{ db: db, redis: redis, queueClient: queueClient, packageUsageStore: postgres.NewPackageUsageStore(db, redis), activationService: activationService, logger: logger, } } // HandlePackageActivationCheck 任务 19.2-19.5: 处理套餐激活检查 // 每 10 秒调度一次,检查过期主套餐并激活下一个待生效主套餐 func (h *PackageActivationHandler) HandlePackageActivationCheck(ctx context.Context) error { startTime := time.Now() // 任务 19.2: 查询已过期的主套餐(status=1 AND expires_at <= NOW) expiredPackages, err := h.findExpiredMainPackages(ctx) if err != nil { h.logger.Error("查询过期主套餐失败", zap.Error(err)) return err } if len(expiredPackages) == 0 { return nil } h.logger.Info("发现过期主套餐", zap.Int("count", len(expiredPackages)), zap.Duration("check_duration", time.Since(startTime))) // 处理每个过期的主套餐 for _, pkg := range expiredPackages { if err := h.processExpiredPackage(ctx, pkg); err != nil { h.logger.Error("处理过期套餐失败", zap.Uint("package_usage_id", pkg.ID), zap.Error(err)) // 继续处理下一个,不中断 continue } } h.logger.Info("套餐激活检查完成", zap.Int("processed", len(expiredPackages)), zap.Duration("total_duration", time.Since(startTime))) return nil } // findExpiredMainPackages 任务 19.2: 查询已过期的主套餐 func (h *PackageActivationHandler) findExpiredMainPackages(ctx context.Context) ([]*model.PackageUsage, error) { var packages []*model.PackageUsage now := time.Now() // 查询 status=1 (生效中) AND expires_at <= NOW AND master_usage_id IS NULL (主套餐) err := h.db.WithContext(ctx). Where("status = ?", constants.PackageUsageStatusActive). Where("expires_at <= ?", now). Where("master_usage_id IS NULL"). // 主套餐没有 master_usage_id Limit(1000). // 每次最多处理 1000 个,避免长事务 Find(&packages).Error return packages, err } // processExpiredPackage 处理单个过期套餐 func (h *PackageActivationHandler) processExpiredPackage(ctx context.Context, pkg *model.PackageUsage) error { return h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // 任务 19.3: 更新过期主套餐状态为 Expired (status=3) if err := tx.Model(pkg).Update("status", constants.PackageUsageStatusExpired).Error; err != nil { return err } h.logger.Info("主套餐已过期", zap.Uint("package_usage_id", pkg.ID), zap.Time("expires_at", pkg.ExpiresAt)) // 任务 19.4: 加油包级联失效 if err := h.invalidateAddons(ctx, tx, pkg.ID); err != nil { h.logger.Warn("加油包级联失效失败", zap.Uint("master_usage_id", pkg.ID), zap.Error(err)) // 不返回错误,继续处理 } // 任务 19.5: 查询并激活下一个待生效主套餐 carrierType, carrierID := h.getCarrierInfo(pkg) if carrierType != "" && carrierID > 0 { if err := h.activateNextPackage(ctx, tx, carrierType, carrierID); err != nil { h.logger.Warn("激活下一个待生效套餐失败", zap.String("carrier_type", carrierType), zap.Uint("carrier_id", carrierID), zap.Error(err)) // 不返回错误,继续处理 } } return nil }) } // invalidateAddons 任务 19.4: 加油包级联失效 func (h *PackageActivationHandler) invalidateAddons(ctx context.Context, tx *gorm.DB, masterUsageID uint) error { // 查询主套餐下的所有加油包(status IN (0,1,2) 的加油包) result := tx.Model(&model.PackageUsage{}). Where("master_usage_id = ?", masterUsageID). Where("status IN ?", []int{ constants.PackageUsageStatusPending, constants.PackageUsageStatusActive, constants.PackageUsageStatusDepleted, }). Update("status", constants.PackageUsageStatusInvalidated) if result.Error != nil { return result.Error } if result.RowsAffected > 0 { h.logger.Info("加油包已级联失效", zap.Uint("master_usage_id", masterUsageID), zap.Int64("invalidated_count", result.RowsAffected)) } return nil } // getCarrierInfo 获取载体信息 func (h *PackageActivationHandler) getCarrierInfo(pkg *model.PackageUsage) (string, uint) { if pkg.IotCardID > 0 { return "iot_card", pkg.IotCardID } if pkg.DeviceID > 0 { return "device", pkg.DeviceID } return "", 0 } // activateNextPackage 任务 19.5: 激活下一个待生效主套餐 func (h *PackageActivationHandler) activateNextPackage(ctx context.Context, tx *gorm.DB, carrierType string, carrierID uint) error { // 查询下一个待生效主套餐 // WHERE status=0 AND master_usage_id IS NULL ORDER BY priority ASC LIMIT 1 var nextPkg model.PackageUsage query := tx.Where("status = ?", constants.PackageUsageStatusPending). Where("master_usage_id IS NULL"). // 主套餐 Order("priority ASC"). Limit(1) if carrierType == "iot_card" { query = query.Where("iot_card_id = ?", carrierID) } else if carrierType == "device" { query = query.Where("device_id = ?", carrierID) } if err := query.First(&nextPkg).Error; err != nil { if err == gorm.ErrRecordNotFound { // 没有待生效套餐,正常情况 return nil } return err } // 提交 Asynq 任务进行激活(避免长事务) return h.enqueueActivationTask(ctx, nextPkg.ID, carrierType, carrierID, "queue") } // enqueueActivationTask 提交套餐激活任务到 Asynq func (h *PackageActivationHandler) enqueueActivationTask(ctx context.Context, packageUsageID uint, carrierType string, carrierID uint, activationType string) error { payload := PackageActivationPayload{ PackageUsageID: packageUsageID, CarrierType: carrierType, CarrierID: carrierID, ActivationType: activationType, Timestamp: time.Now().Unix(), } payloadBytes, err := sonic.Marshal(payload) if err != nil { return err } task := asynq.NewTask(constants.TaskTypePackageQueueActivation, payloadBytes, asynq.MaxRetry(3), asynq.Timeout(30*time.Second), asynq.Queue(constants.QueueDefault), ) _, err = h.queueClient.Enqueue(task) if err != nil { h.logger.Error("提交套餐激活任务失败", zap.Uint("package_usage_id", packageUsageID), zap.Error(err)) return err } h.logger.Info("已提交套餐激活任务", zap.Uint("package_usage_id", packageUsageID), zap.String("activation_type", activationType)) return nil } // HandlePackageQueueActivation 处理套餐排队激活任务(Asynq Handler) // 任务 23: 由 Asynq 调用,执行实际的套餐激活逻辑 func (h *PackageActivationHandler) HandlePackageQueueActivation(ctx context.Context, t *asynq.Task) error { var payload PackageActivationPayload if err := sonic.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析套餐激活任务载荷失败", zap.Error(err)) return nil // 不重试 } h.logger.Info("开始执行套餐激活", zap.Uint("package_usage_id", payload.PackageUsageID), zap.String("activation_type", payload.ActivationType)) // 查询套餐使用记录 var pkg model.PackageUsage if err := h.db.First(&pkg, payload.PackageUsageID).Error; err != nil { if err == gorm.ErrRecordNotFound { h.logger.Warn("套餐使用记录不存在", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } return err } // 幂等性检查:如果已经是生效状态,跳过 if pkg.Status == constants.PackageUsageStatusActive { h.logger.Info("套餐已激活,跳过", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } // 调用 ActivationService 执行激活 if h.activationService != nil { if err := h.activationService.ActivateQueuedPackage(ctx, payload.CarrierType, payload.CarrierID); err != nil { h.logger.Error("套餐激活失败", zap.Uint("package_usage_id", payload.PackageUsageID), zap.String("carrier_type", payload.CarrierType), zap.Uint("carrier_id", payload.CarrierID), zap.Error(err)) return err } } else { // ActivationService 未注入,直接更新状态 now := time.Now() if err := h.db.Model(&pkg).Updates(map[string]interface{}{ "status": constants.PackageUsageStatusActive, "activated_at": now, }).Error; err != nil { return err } } h.logger.Info("套餐激活成功", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } // HandlePackageFirstActivation 处理首次实名激活任务(Asynq Handler) // 任务 22: 由 Asynq 调用,执行首次实名后的套餐激活 func (h *PackageActivationHandler) HandlePackageFirstActivation(ctx context.Context, t *asynq.Task) error { var payload PackageActivationPayload if err := sonic.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析首次实名激活任务载荷失败", zap.Error(err)) return nil // 不重试 } h.logger.Info("开始执行首次实名激活", zap.Uint("package_usage_id", payload.PackageUsageID), zap.String("carrier_type", payload.CarrierType), zap.Uint("carrier_id", payload.CarrierID)) // 任务 22.4: 幂等性检查 var pkg model.PackageUsage if err := h.db.First(&pkg, payload.PackageUsageID).Error; err != nil { if err == gorm.ErrRecordNotFound { h.logger.Warn("套餐使用记录不存在", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } return err } // 检查 pending_realname_activation 是否已为 false(已处理过) if !pkg.PendingRealnameActivation { h.logger.Info("套餐已处理过首次实名激活,跳过", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } // 如果已经是生效状态,跳过 if pkg.Status == constants.PackageUsageStatusActive { h.logger.Info("套餐已激活,跳过", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil } // 任务 22.3: 调用 ActivationService.ActivateByRealname 激活套餐 if h.activationService != nil { if err := h.activationService.ActivateByRealname(ctx, payload.CarrierType, payload.CarrierID); err != nil { h.logger.Error("首次实名激活失败", zap.Uint("package_usage_id", payload.PackageUsageID), zap.String("carrier_type", payload.CarrierType), zap.Uint("carrier_id", payload.CarrierID), zap.Error(err)) return err } } else { // ActivationService 未注入,直接更新状态(备用逻辑) now := time.Now() if err := h.db.Model(&pkg).Updates(map[string]any{ "status": constants.PackageUsageStatusActive, "activated_at": now, "pending_realname_activation": false, }).Error; err != nil { return err } } h.logger.Info("首次实名激活成功", zap.Uint("package_usage_id", payload.PackageUsageID)) return nil }