package task import ( "context" "errors" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" ) // AutoPurchasePayload 充值后自动购包任务载荷 type AutoPurchasePayload struct { RechargeRecordID uint `json:"recharge_record_id"` } // AutoPurchaseHandler 充值后自动购包任务处理器 type AutoPurchaseHandler struct { db *gorm.DB orderStore *postgres.OrderStore rechargeRecordStore *postgres.AssetRechargeStore walletStore *postgres.AssetWalletStore walletTransactionStore *postgres.AssetWalletTransactionStore packageUsageStore *postgres.PackageUsageStore redis *redis.Client logger *zap.Logger } // NewAutoPurchaseHandler 创建充值后自动购包处理器 func NewAutoPurchaseHandler( db *gorm.DB, orderStore *postgres.OrderStore, rechargeRecordStore *postgres.AssetRechargeStore, walletStore *postgres.AssetWalletStore, walletTransactionStore *postgres.AssetWalletTransactionStore, packageUsageStore *postgres.PackageUsageStore, redisClient *redis.Client, logger *zap.Logger, ) *AutoPurchaseHandler { if orderStore == nil { orderStore = postgres.NewOrderStore(db, redisClient) } if rechargeRecordStore == nil { rechargeRecordStore = postgres.NewAssetRechargeStore(db, redisClient) } if walletStore == nil { walletStore = postgres.NewAssetWalletStore(db, redisClient) } if walletTransactionStore == nil { walletTransactionStore = postgres.NewAssetWalletTransactionStore(db, redisClient) } if packageUsageStore == nil { packageUsageStore = postgres.NewPackageUsageStore(db, redisClient) } return &AutoPurchaseHandler{ db: db, orderStore: orderStore, rechargeRecordStore: rechargeRecordStore, walletStore: walletStore, walletTransactionStore: walletTransactionStore, packageUsageStore: packageUsageStore, redis: redisClient, logger: logger, } } // ProcessTask 处理充值后自动购包任务 func (h *AutoPurchaseHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { var payload AutoPurchasePayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析自动购包任务载荷失败", zap.Error(err)) return asynq.SkipRetry } if payload.RechargeRecordID == 0 { h.logger.Error("自动购包任务载荷无效", zap.Uint("recharge_record_id", payload.RechargeRecordID)) return asynq.SkipRetry } rechargeRecord, err := h.rechargeRecordStore.GetByID(ctx, payload.RechargeRecordID) if err != nil { if err == gorm.ErrRecordNotFound { h.logger.Warn("充值记录不存在,跳过自动购包", zap.Uint("recharge_record_id", payload.RechargeRecordID)) return asynq.SkipRetry } h.logger.Error("查询充值记录失败", zap.Uint("recharge_record_id", payload.RechargeRecordID), zap.Error(err)) return err } if rechargeRecord.AutoPurchaseStatus == constants.AutoPurchaseStatusSuccess { return nil } if rechargeRecord.AutoPurchaseStatus == constants.AutoPurchaseStatusFailed { return nil } packageIDs, err := parseLinkedPackageIDs(rechargeRecord.LinkedPackageIDs) if err != nil { h.logger.Error("解析关联套餐ID失败", zap.Uint("recharge_record_id", rechargeRecord.ID), zap.Error(err)) h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID) return asynq.SkipRetry } if len(packageIDs) == 0 { h.logger.Error("关联套餐ID为空,无法自动购包", zap.Uint("recharge_record_id", rechargeRecord.ID)) h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID) return asynq.SkipRetry } packages, totalAmount, err := h.loadPackages(ctx, packageIDs) if err != nil { h.logger.Error("加载关联套餐失败", zap.Uint("recharge_record_id", rechargeRecord.ID), zap.Error(err)) h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID) return err } if err := h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { wallet, walletErr := h.walletStore.GetByID(ctx, rechargeRecord.AssetWalletID) if walletErr != nil { if walletErr == gorm.ErrRecordNotFound { return errors.New("资产钱包不存在") } return walletErr } if wallet.GetAvailableBalance() < totalAmount { return errors.New("钱包余额不足") } if err = h.walletStore.DeductBalanceWithTx(ctx, tx, wallet.ID, totalAmount, wallet.Version); err != nil { return err } now := time.Now() order, orderItems, buildErr := h.buildOrderAndItems(rechargeRecord, packages, totalAmount, now) if buildErr != nil { return buildErr } if err = tx.Create(order).Error; err != nil { return err } for _, item := range orderItems { item.OrderID = order.ID } if err = tx.CreateInBatches(orderItems, 100).Error; err != nil { return err } refType := constants.ReferenceTypeOrder walletTx := &model.AssetWalletTransaction{ AssetWalletID: wallet.ID, ResourceType: wallet.ResourceType, ResourceID: wallet.ResourceID, UserID: rechargeRecord.UserID, TransactionType: constants.AssetTransactionTypeDeduct, Amount: -totalAmount, BalanceBefore: wallet.Balance, BalanceAfter: wallet.Balance - totalAmount, Status: constants.TransactionStatusSuccess, ReferenceType: &refType, ReferenceNo: &order.OrderNo, Creator: rechargeRecord.UserID, ShopIDTag: wallet.ShopIDTag, EnterpriseIDTag: wallet.EnterpriseIDTag, } if err = h.walletTransactionStore.CreateWithTx(ctx, tx, walletTx); err != nil { return err } if err = h.activatePackages(ctx, tx, order, packages, now); err != nil { return err } if err = tx.Model(&model.AssetRechargeRecord{}). Where("id = ?", rechargeRecord.ID). Update("auto_purchase_status", constants.AutoPurchaseStatusSuccess).Error; err != nil { return err } return nil }); err != nil { h.logger.Error("自动购包任务执行失败", zap.Uint("recharge_record_id", rechargeRecord.ID), zap.Error(err), ) h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID) return err } h.logger.Info("自动购包任务执行成功", zap.Uint("recharge_record_id", rechargeRecord.ID)) return nil } // NewAutoPurchaseTask 创建充值后自动购包任务 func NewAutoPurchaseTask(rechargeRecordID uint) (*asynq.Task, error) { payloadBytes, err := sonic.Marshal(AutoPurchasePayload{RechargeRecordID: rechargeRecordID}) if err != nil { return nil, err } return asynq.NewTask(constants.TaskTypeAutoPurchaseAfterRecharge, payloadBytes, asynq.MaxRetry(3), asynq.Timeout(2*time.Minute), asynq.Queue(constants.QueueDefault), ), nil } func (h *AutoPurchaseHandler) markAutoPurchaseFailedIfFinalRetry(ctx context.Context, rechargeRecordID uint) { retryCount, ok := asynq.GetRetryCount(ctx) if !ok { return } maxRetry, ok := asynq.GetMaxRetry(ctx) if !ok { return } if retryCount < maxRetry-1 { return } if err := h.db.WithContext(ctx). Model(&model.AssetRechargeRecord{}). Where("id = ?", rechargeRecordID). Update("auto_purchase_status", constants.AutoPurchaseStatusFailed).Error; err != nil { h.logger.Error("更新自动购包失败状态失败", zap.Uint("recharge_record_id", rechargeRecordID), zap.Error(err), ) return } h.logger.Warn("自动购包达到最大重试次数,已标记失败", zap.Uint("recharge_record_id", rechargeRecordID)) } func (h *AutoPurchaseHandler) loadPackages(ctx context.Context, packageIDs []uint) ([]*model.Package, int64, error) { packages := make([]*model.Package, 0, len(packageIDs)) if err := h.db.WithContext(ctx).Where("id IN ?", packageIDs).Find(&packages).Error; err != nil { return nil, 0, err } if len(packages) != len(packageIDs) { return nil, 0, gorm.ErrRecordNotFound } totalAmount := int64(0) for _, pkg := range packages { totalAmount += pkg.SuggestedRetailPrice } if err := validatePackageTypeMix(packages); err != nil { return nil, 0, err } return packages, totalAmount, nil } func (h *AutoPurchaseHandler) buildOrderAndItems( rechargeRecord *model.AssetRechargeRecord, packages []*model.Package, totalAmount int64, now time.Time, ) (*model.Order, []*model.OrderItem, error) { orderType, iotCardID, deviceID, err := parseLinkedCarrier(rechargeRecord.LinkedOrderType, rechargeRecord.LinkedCarrierType, rechargeRecord.LinkedCarrierID) if err != nil { return nil, nil, err } generation := rechargeRecord.Generation if generation <= 0 { generation = 1 } paidAmount := totalAmount order := &model.Order{ BaseModel: model.BaseModel{ Creator: rechargeRecord.UserID, Updater: rechargeRecord.UserID, }, OrderNo: h.orderStore.GenerateOrderNo(), OrderType: orderType, BuyerType: model.BuyerTypePersonal, BuyerID: rechargeRecord.UserID, IotCardID: iotCardID, DeviceID: deviceID, TotalAmount: totalAmount, PaymentMethod: model.PaymentMethodWallet, PaymentStatus: model.PaymentStatusPaid, PaidAt: &now, CommissionStatus: model.CommissionStatusPending, CommissionConfigVersion: 0, Source: constants.OrderSourceClient, Generation: generation, ActualPaidAmount: &paidAmount, SellerShopID: &rechargeRecord.ShopIDTag, } items := make([]*model.OrderItem, 0, len(packages)) for _, pkg := range packages { items = append(items, &model.OrderItem{ BaseModel: model.BaseModel{ Creator: rechargeRecord.UserID, Updater: rechargeRecord.UserID, }, PackageID: pkg.ID, PackageName: pkg.PackageName, Quantity: 1, UnitPrice: pkg.SuggestedRetailPrice, Amount: pkg.SuggestedRetailPrice, }) } return order, items, nil } func (h *AutoPurchaseHandler) activatePackages( ctx context.Context, tx *gorm.DB, order *model.Order, packages []*model.Package, now time.Time, ) error { carrierType := constants.AssetWalletResourceTypeIotCard carrierID := uint(0) if order.OrderType == model.OrderTypeSingleCard && order.IotCardID != nil { carrierID = *order.IotCardID } else if order.OrderType == model.OrderTypeDevice && order.DeviceID != nil { carrierType = constants.AssetWalletResourceTypeDevice carrierID = *order.DeviceID } else { return errors.New("无效的订单载体") } for _, pkg := range packages { var existingUsage model.PackageUsage err := tx.Where("order_id = ? AND package_id = ?", order.ID, pkg.ID).First(&existingUsage).Error if err == nil { continue } if err != gorm.ErrRecordNotFound { return err } if pkg.PackageType == constants.PackageTypeFormal { if err = h.activateMainPackage(ctx, tx, order, pkg, carrierType, carrierID, now); err != nil { return err } continue } if pkg.PackageType == constants.PackageTypeAddon { if err = h.activateAddonPackage(ctx, tx, order, pkg, carrierType, carrierID, now); err != nil { return err } } } return nil } func (h *AutoPurchaseHandler) activateMainPackage( ctx context.Context, tx *gorm.DB, order *model.Order, pkg *model.Package, carrierType string, carrierID uint, now time.Time, ) error { _ = ctx var activeMainPackage model.PackageUsage err := tx.Where("status = ?", constants.PackageUsageStatusActive). Where("master_usage_id IS NULL"). Where(carrierType+"_id = ?", carrierID). Order("priority ASC"). First(&activeMainPackage).Error hasActiveMain := err == nil var status int var priority int var activatedAt time.Time var expiresAt time.Time var nextResetAt *time.Time var pendingRealnameActivation bool if hasActiveMain { status = constants.PackageUsageStatusPending var maxPriority int tx.Model(&model.PackageUsage{}). Where(carrierType+"_id = ?", carrierID). Select("COALESCE(MAX(priority), 0)"). Scan(&maxPriority) priority = maxPriority + 1 } else { status = constants.PackageUsageStatusActive priority = 1 activatedAt = now expiresAt = packagepkg.CalculateExpiryTime(pkg.CalendarType, activatedAt, pkg.DurationMonths, pkg.DurationDays) nextResetAt = packagepkg.CalculateNextResetTime(pkg.DataResetCycle, pkg.CalendarType, now, activatedAt) } if pkg.EnableRealnameActivation { status = constants.PackageUsageStatusPending pendingRealnameActivation = true } usage := &model.PackageUsage{ BaseModel: model.BaseModel{ Creator: order.Creator, Updater: order.Creator, }, OrderID: order.ID, PackageID: pkg.ID, UsageType: order.OrderType, DataLimitMB: pkg.RealDataMB, Status: status, Priority: priority, DataResetCycle: pkg.DataResetCycle, PendingRealnameActivation: pendingRealnameActivation, Generation: order.Generation, } if carrierType == constants.AssetWalletResourceTypeIotCard { usage.IotCardID = carrierID } else { usage.DeviceID = carrierID } if status == constants.PackageUsageStatusActive { usage.ActivatedAt = activatedAt usage.ExpiresAt = expiresAt usage.NextResetAt = nextResetAt } if err = tx.Omit("status", "pending_realname_activation").Create(usage).Error; err != nil { return err } return tx.Model(usage).Updates(map[string]any{ "status": usage.Status, "pending_realname_activation": usage.PendingRealnameActivation, }).Error } func (h *AutoPurchaseHandler) activateAddonPackage( ctx context.Context, tx *gorm.DB, order *model.Order, pkg *model.Package, carrierType string, carrierID uint, now time.Time, ) error { _ = ctx var mainPackage model.PackageUsage err := tx.Where("status IN ?", []int{constants.PackageUsageStatusPending, constants.PackageUsageStatusActive}). Where("master_usage_id IS NULL"). Where(carrierType+"_id = ?", carrierID). Order("priority ASC"). First(&mainPackage).Error if err == gorm.ErrRecordNotFound { return errors.New("必须有主套餐才能购买加油包") } if err != nil { return err } var maxPriority int tx.Model(&model.PackageUsage{}). Where(carrierType+"_id = ?", carrierID). Select("COALESCE(MAX(priority), 0)"). Scan(&maxPriority) priority := maxPriority + 1 expiresAt := mainPackage.ExpiresAt usage := &model.PackageUsage{ BaseModel: model.BaseModel{ Creator: order.Creator, Updater: order.Creator, }, OrderID: order.ID, PackageID: pkg.ID, UsageType: order.OrderType, DataLimitMB: pkg.RealDataMB, Status: constants.PackageUsageStatusActive, Priority: priority, MasterUsageID: &mainPackage.ID, ActivatedAt: now, ExpiresAt: expiresAt, DataResetCycle: pkg.DataResetCycle, Generation: order.Generation, } if carrierType == constants.AssetWalletResourceTypeIotCard { usage.IotCardID = carrierID } else { usage.DeviceID = carrierID } return tx.Create(usage).Error } func parseLinkedPackageIDs(raw []byte) ([]uint, error) { var packageIDs []uint if len(raw) == 0 { return nil, nil } if err := sonic.Unmarshal(raw, &packageIDs); err != nil { return nil, err } return packageIDs, nil } func parseLinkedCarrier(linkedOrderType string, linkedCarrierType string, linkedCarrierID *uint) (string, *uint, *uint, error) { if linkedCarrierID == nil || *linkedCarrierID == 0 { return "", nil, nil, errors.New("关联载体ID为空") } if linkedOrderType == model.OrderTypeSingleCard || linkedCarrierType == "card" || linkedCarrierType == constants.AssetWalletResourceTypeIotCard { id := *linkedCarrierID return model.OrderTypeSingleCard, &id, nil, nil } if linkedOrderType == model.OrderTypeDevice || linkedCarrierType == "device" || linkedCarrierType == constants.AssetWalletResourceTypeDevice { id := *linkedCarrierID return model.OrderTypeDevice, nil, &id, nil } return "", nil, nil, errors.New("关联载体类型无效") } func validatePackageTypeMix(packages []*model.Package) error { hasFormal := false hasAddon := false for _, pkg := range packages { switch pkg.PackageType { case constants.PackageTypeFormal: hasFormal = true case constants.PackageTypeAddon: hasAddon = true } if hasFormal && hasAddon { return errors.New("不允许在同一订单中同时购买正式套餐和加油包") } } return nil }