All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
## 变更概述 将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。 ## 数据库变更 - 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record - 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record - 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包 - 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备 ## 代码变更 - Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型 - Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存 - Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务 - Bootstrap 层:更新 Store 和 Service 依赖注入 - 常量层:按钱包类型重新组织常量和 Redis Key 生成函数 ## 技术特性 - 乐观锁:使用 version 字段防止并发冲突 - 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤 - 事务管理:所有余额变动使用事务保证 ACID - 缓存策略:Cache-Aside 模式,余额变动后删除缓存 ## 业务影响 - 代理钱包和卡钱包业务完全隔离,互不影响 - 为独立监控、优化、扩展打下基础 - 提升代理钱包的稳定性和独立性 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
369 lines
12 KiB
Go
369 lines
12 KiB
Go
package polling
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
|
||
"github.com/bytedance/sonic"
|
||
"github.com/hibiken/asynq"
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"gorm.io/gorm"
|
||
|
||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||
packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package"
|
||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||
)
|
||
|
||
// PackageActivationHandler 套餐激活检查处理器
|
||
// 任务 19: 处理主套餐过期、加油包级联失效、待生效主套餐激活
|
||
type PackageActivationHandler struct {
|
||
db *gorm.DB
|
||
redis *redis.Client
|
||
queueClient *asynq.Client
|
||
packageUsageStore *postgres.PackageUsageStore
|
||
activationService *packagepkg.ActivationService
|
||
logger *zap.Logger
|
||
}
|
||
|
||
// PackageActivationPayload 套餐激活任务载荷
|
||
type PackageActivationPayload struct {
|
||
PackageUsageID uint `json:"package_usage_id"`
|
||
CarrierType string `json:"carrier_type"` // "iot_card" 或 "device"
|
||
CarrierID uint `json:"carrier_id"`
|
||
ActivationType string `json:"activation_type"` // "queue" 或 "realname"
|
||
Timestamp int64 `json:"timestamp"`
|
||
}
|
||
|
||
// NewPackageActivationHandler 创建套餐激活检查处理器
|
||
func NewPackageActivationHandler(
|
||
db *gorm.DB,
|
||
redis *redis.Client,
|
||
queueClient *asynq.Client,
|
||
activationService *packagepkg.ActivationService,
|
||
logger *zap.Logger,
|
||
) *PackageActivationHandler {
|
||
return &PackageActivationHandler{
|
||
db: db,
|
||
redis: redis,
|
||
queueClient: queueClient,
|
||
packageUsageStore: postgres.NewPackageUsageStore(db, redis),
|
||
activationService: activationService,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// HandlePackageActivationCheck 任务 19.2-19.5: 处理套餐激活检查
|
||
// 每 10 秒调度一次,检查过期主套餐并激活下一个待生效主套餐
|
||
func (h *PackageActivationHandler) HandlePackageActivationCheck(ctx context.Context) error {
|
||
startTime := time.Now()
|
||
|
||
// 任务 19.2: 查询已过期的主套餐(status=1 AND expires_at <= NOW)
|
||
expiredPackages, err := h.findExpiredMainPackages(ctx)
|
||
if err != nil {
|
||
h.logger.Error("查询过期主套餐失败", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
if len(expiredPackages) == 0 {
|
||
return nil
|
||
}
|
||
|
||
h.logger.Info("发现过期主套餐",
|
||
zap.Int("count", len(expiredPackages)),
|
||
zap.Duration("check_duration", time.Since(startTime)))
|
||
|
||
// 处理每个过期的主套餐
|
||
for _, pkg := range expiredPackages {
|
||
if err := h.processExpiredPackage(ctx, pkg); err != nil {
|
||
h.logger.Error("处理过期套餐失败",
|
||
zap.Uint("package_usage_id", pkg.ID),
|
||
zap.Error(err))
|
||
// 继续处理下一个,不中断
|
||
continue
|
||
}
|
||
}
|
||
|
||
h.logger.Info("套餐激活检查完成",
|
||
zap.Int("processed", len(expiredPackages)),
|
||
zap.Duration("total_duration", time.Since(startTime)))
|
||
|
||
return nil
|
||
}
|
||
|
||
// findExpiredMainPackages 任务 19.2: 查询已过期的主套餐
|
||
func (h *PackageActivationHandler) findExpiredMainPackages(ctx context.Context) ([]*model.PackageUsage, error) {
|
||
var packages []*model.PackageUsage
|
||
now := time.Now()
|
||
|
||
// 查询 status=1 (生效中) AND expires_at <= NOW AND master_usage_id IS NULL (主套餐)
|
||
err := h.db.WithContext(ctx).
|
||
Where("status = ?", constants.PackageUsageStatusActive).
|
||
Where("expires_at <= ?", now).
|
||
Where("master_usage_id IS NULL"). // 主套餐没有 master_usage_id
|
||
Limit(1000). // 每次最多处理 1000 个,避免长事务
|
||
Find(&packages).Error
|
||
|
||
return packages, err
|
||
}
|
||
|
||
// processExpiredPackage 处理单个过期套餐
|
||
func (h *PackageActivationHandler) processExpiredPackage(ctx context.Context, pkg *model.PackageUsage) error {
|
||
return h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||
// 任务 19.3: 更新过期主套餐状态为 Expired (status=3)
|
||
if err := tx.Model(pkg).Update("status", constants.PackageUsageStatusExpired).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
h.logger.Info("主套餐已过期",
|
||
zap.Uint("package_usage_id", pkg.ID),
|
||
zap.Time("expires_at", pkg.ExpiresAt))
|
||
|
||
// 任务 19.4: 加油包级联失效
|
||
if err := h.invalidateAddons(ctx, tx, pkg.ID); err != nil {
|
||
h.logger.Warn("加油包级联失效失败",
|
||
zap.Uint("master_usage_id", pkg.ID),
|
||
zap.Error(err))
|
||
// 不返回错误,继续处理
|
||
}
|
||
|
||
// 任务 19.5: 查询并激活下一个待生效主套餐
|
||
carrierType, carrierID := h.getCarrierInfo(pkg)
|
||
if carrierType != "" && carrierID > 0 {
|
||
if err := h.activateNextPackage(ctx, tx, carrierType, carrierID); err != nil {
|
||
h.logger.Warn("激活下一个待生效套餐失败",
|
||
zap.String("carrier_type", carrierType),
|
||
zap.Uint("carrier_id", carrierID),
|
||
zap.Error(err))
|
||
// 不返回错误,继续处理
|
||
}
|
||
}
|
||
|
||
return nil
|
||
})
|
||
}
|
||
|
||
// invalidateAddons 任务 19.4: 加油包级联失效
|
||
func (h *PackageActivationHandler) invalidateAddons(ctx context.Context, tx *gorm.DB, masterUsageID uint) error {
|
||
// 查询主套餐下的所有加油包(status IN (0,1,2) 的加油包)
|
||
result := tx.Model(&model.PackageUsage{}).
|
||
Where("master_usage_id = ?", masterUsageID).
|
||
Where("status IN ?", []int{
|
||
constants.PackageUsageStatusPending,
|
||
constants.PackageUsageStatusActive,
|
||
constants.PackageUsageStatusDepleted,
|
||
}).
|
||
Update("status", constants.PackageUsageStatusInvalidated)
|
||
|
||
if result.Error != nil {
|
||
return result.Error
|
||
}
|
||
|
||
if result.RowsAffected > 0 {
|
||
h.logger.Info("加油包已级联失效",
|
||
zap.Uint("master_usage_id", masterUsageID),
|
||
zap.Int64("invalidated_count", result.RowsAffected))
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// getCarrierInfo 获取载体信息
|
||
func (h *PackageActivationHandler) getCarrierInfo(pkg *model.PackageUsage) (string, uint) {
|
||
if pkg.IotCardID > 0 {
|
||
return "iot_card", pkg.IotCardID
|
||
}
|
||
if pkg.DeviceID > 0 {
|
||
return "device", pkg.DeviceID
|
||
}
|
||
return "", 0
|
||
}
|
||
|
||
// activateNextPackage 任务 19.5: 激活下一个待生效主套餐
|
||
func (h *PackageActivationHandler) activateNextPackage(ctx context.Context, tx *gorm.DB, carrierType string, carrierID uint) error {
|
||
// 查询下一个待生效主套餐
|
||
// WHERE status=0 AND master_usage_id IS NULL ORDER BY priority ASC LIMIT 1
|
||
var nextPkg model.PackageUsage
|
||
query := tx.Where("status = ?", constants.PackageUsageStatusPending).
|
||
Where("master_usage_id IS NULL"). // 主套餐
|
||
Order("priority ASC").
|
||
Limit(1)
|
||
|
||
if carrierType == "iot_card" {
|
||
query = query.Where("iot_card_id = ?", carrierID)
|
||
} else if carrierType == "device" {
|
||
query = query.Where("device_id = ?", carrierID)
|
||
}
|
||
|
||
if err := query.First(&nextPkg).Error; err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
// 没有待生效套餐,正常情况
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 提交 Asynq 任务进行激活(避免长事务)
|
||
return h.enqueueActivationTask(ctx, nextPkg.ID, carrierType, carrierID, "queue")
|
||
}
|
||
|
||
// enqueueActivationTask 提交套餐激活任务到 Asynq
|
||
func (h *PackageActivationHandler) enqueueActivationTask(ctx context.Context, packageUsageID uint, carrierType string, carrierID uint, activationType string) error {
|
||
payload := PackageActivationPayload{
|
||
PackageUsageID: packageUsageID,
|
||
CarrierType: carrierType,
|
||
CarrierID: carrierID,
|
||
ActivationType: activationType,
|
||
Timestamp: time.Now().Unix(),
|
||
}
|
||
|
||
payloadBytes, err := sonic.Marshal(payload)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
task := asynq.NewTask(constants.TaskTypePackageQueueActivation, payloadBytes,
|
||
asynq.MaxRetry(3),
|
||
asynq.Timeout(30*time.Second),
|
||
asynq.Queue(constants.QueueDefault),
|
||
)
|
||
|
||
_, err = h.queueClient.Enqueue(task)
|
||
if err != nil {
|
||
h.logger.Error("提交套餐激活任务失败",
|
||
zap.Uint("package_usage_id", packageUsageID),
|
||
zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
h.logger.Info("已提交套餐激活任务",
|
||
zap.Uint("package_usage_id", packageUsageID),
|
||
zap.String("activation_type", activationType))
|
||
|
||
return nil
|
||
}
|
||
|
||
// HandlePackageQueueActivation 处理套餐排队激活任务(Asynq Handler)
|
||
// 任务 23: 由 Asynq 调用,执行实际的套餐激活逻辑
|
||
func (h *PackageActivationHandler) HandlePackageQueueActivation(ctx context.Context, t *asynq.Task) error {
|
||
var payload PackageActivationPayload
|
||
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
|
||
h.logger.Error("解析套餐激活任务载荷失败", zap.Error(err))
|
||
return nil // 不重试
|
||
}
|
||
|
||
h.logger.Info("开始执行套餐激活",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID),
|
||
zap.String("activation_type", payload.ActivationType))
|
||
|
||
// 查询套餐使用记录
|
||
var pkg model.PackageUsage
|
||
if err := h.db.First(&pkg, payload.PackageUsageID).Error; err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
h.logger.Warn("套餐使用记录不存在", zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 幂等性检查:如果已经是生效状态,跳过
|
||
if pkg.Status == constants.PackageUsageStatusActive {
|
||
h.logger.Info("套餐已激活,跳过",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
return nil
|
||
}
|
||
|
||
// 调用 ActivationService 执行激活
|
||
if h.activationService != nil {
|
||
if err := h.activationService.ActivateQueuedPackage(ctx, payload.CarrierType, payload.CarrierID); err != nil {
|
||
h.logger.Error("套餐激活失败",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID),
|
||
zap.String("carrier_type", payload.CarrierType),
|
||
zap.Uint("carrier_id", payload.CarrierID),
|
||
zap.Error(err))
|
||
return err
|
||
}
|
||
} else {
|
||
// ActivationService 未注入,直接更新状态
|
||
now := time.Now()
|
||
if err := h.db.Model(&pkg).Updates(map[string]interface{}{
|
||
"status": constants.PackageUsageStatusActive,
|
||
"activated_at": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
h.logger.Info("套餐激活成功",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
|
||
return nil
|
||
}
|
||
|
||
// HandlePackageFirstActivation 处理首次实名激活任务(Asynq Handler)
|
||
// 任务 22: 由 Asynq 调用,执行首次实名后的套餐激活
|
||
func (h *PackageActivationHandler) HandlePackageFirstActivation(ctx context.Context, t *asynq.Task) error {
|
||
var payload PackageActivationPayload
|
||
if err := sonic.Unmarshal(t.Payload(), &payload); err != nil {
|
||
h.logger.Error("解析首次实名激活任务载荷失败", zap.Error(err))
|
||
return nil // 不重试
|
||
}
|
||
|
||
h.logger.Info("开始执行首次实名激活",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID),
|
||
zap.String("carrier_type", payload.CarrierType),
|
||
zap.Uint("carrier_id", payload.CarrierID))
|
||
|
||
// 任务 22.4: 幂等性检查
|
||
var pkg model.PackageUsage
|
||
if err := h.db.First(&pkg, payload.PackageUsageID).Error; err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
h.logger.Warn("套餐使用记录不存在", zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 检查 pending_realname_activation 是否已为 false(已处理过)
|
||
if !pkg.PendingRealnameActivation {
|
||
h.logger.Info("套餐已处理过首次实名激活,跳过",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
return nil
|
||
}
|
||
|
||
// 如果已经是生效状态,跳过
|
||
if pkg.Status == constants.PackageUsageStatusActive {
|
||
h.logger.Info("套餐已激活,跳过",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
return nil
|
||
}
|
||
|
||
// 任务 22.3: 调用 ActivationService.ActivateByRealname 激活套餐
|
||
if h.activationService != nil {
|
||
if err := h.activationService.ActivateByRealname(ctx, payload.CarrierType, payload.CarrierID); err != nil {
|
||
h.logger.Error("首次实名激活失败",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID),
|
||
zap.String("carrier_type", payload.CarrierType),
|
||
zap.Uint("carrier_id", payload.CarrierID),
|
||
zap.Error(err))
|
||
return err
|
||
}
|
||
} else {
|
||
// ActivationService 未注入,直接更新状态(备用逻辑)
|
||
now := time.Now()
|
||
if err := h.db.Model(&pkg).Updates(map[string]any{
|
||
"status": constants.PackageUsageStatusActive,
|
||
"activated_at": now,
|
||
"pending_realname_activation": false,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
h.logger.Info("首次实名激活成功",
|
||
zap.Uint("package_usage_id", payload.PackageUsageID))
|
||
|
||
return nil
|
||
}
|