Files
junhong_cmp_fiber/internal/task/auto_purchase.go
huang 9bd55a1695 feat: 实现客户端核心业务接口(client-core-business-api)
新增客户端资产、钱包、订单、实名、设备管理等核心业务 Handler 与 DTO:
- 客户端资产信息查询、套餐列表、套餐历史、资产刷新
- 客户端钱包详情、流水、充值校验、充值订单、充值记录
- 客户端订单创建、列表、详情
- 客户端实名认证链接获取
- 客户端设备卡列表、重启、恢复出厂、WiFi配置、切卡
- 客户端订单服务(含微信/支付宝支付流程)
- 强充自动代购异步任务处理
- 数据库迁移 000084:充值记录增加自动代购状态字段
2026-03-19 13:28:04 +08:00

557 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package task
import (
"context"
"errors"
"time"
"github.com/bytedance/sonic"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
packagepkg "github.com/break/junhong_cmp_fiber/internal/service/package"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
// AutoPurchasePayload 充值后自动购包任务载荷
type AutoPurchasePayload struct {
RechargeRecordID uint `json:"recharge_record_id"`
}
// AutoPurchaseHandler 充值后自动购包任务处理器
type AutoPurchaseHandler struct {
db *gorm.DB
orderStore *postgres.OrderStore
rechargeRecordStore *postgres.AssetRechargeStore
walletStore *postgres.AssetWalletStore
walletTransactionStore *postgres.AssetWalletTransactionStore
packageUsageStore *postgres.PackageUsageStore
redis *redis.Client
logger *zap.Logger
}
// NewAutoPurchaseHandler 创建充值后自动购包处理器
func NewAutoPurchaseHandler(
db *gorm.DB,
orderStore *postgres.OrderStore,
rechargeRecordStore *postgres.AssetRechargeStore,
walletStore *postgres.AssetWalletStore,
walletTransactionStore *postgres.AssetWalletTransactionStore,
packageUsageStore *postgres.PackageUsageStore,
redisClient *redis.Client,
logger *zap.Logger,
) *AutoPurchaseHandler {
if orderStore == nil {
orderStore = postgres.NewOrderStore(db, redisClient)
}
if rechargeRecordStore == nil {
rechargeRecordStore = postgres.NewAssetRechargeStore(db, redisClient)
}
if walletStore == nil {
walletStore = postgres.NewAssetWalletStore(db, redisClient)
}
if walletTransactionStore == nil {
walletTransactionStore = postgres.NewAssetWalletTransactionStore(db, redisClient)
}
if packageUsageStore == nil {
packageUsageStore = postgres.NewPackageUsageStore(db, redisClient)
}
return &AutoPurchaseHandler{
db: db,
orderStore: orderStore,
rechargeRecordStore: rechargeRecordStore,
walletStore: walletStore,
walletTransactionStore: walletTransactionStore,
packageUsageStore: packageUsageStore,
redis: redisClient,
logger: logger,
}
}
// ProcessTask 处理充值后自动购包任务
func (h *AutoPurchaseHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
var payload AutoPurchasePayload
if err := sonic.Unmarshal(task.Payload(), &payload); err != nil {
h.logger.Error("解析自动购包任务载荷失败", zap.Error(err))
return asynq.SkipRetry
}
if payload.RechargeRecordID == 0 {
h.logger.Error("自动购包任务载荷无效", zap.Uint("recharge_record_id", payload.RechargeRecordID))
return asynq.SkipRetry
}
rechargeRecord, err := h.rechargeRecordStore.GetByID(ctx, payload.RechargeRecordID)
if err != nil {
if err == gorm.ErrRecordNotFound {
h.logger.Warn("充值记录不存在,跳过自动购包", zap.Uint("recharge_record_id", payload.RechargeRecordID))
return asynq.SkipRetry
}
h.logger.Error("查询充值记录失败", zap.Uint("recharge_record_id", payload.RechargeRecordID), zap.Error(err))
return err
}
if rechargeRecord.AutoPurchaseStatus == constants.AutoPurchaseStatusSuccess {
return nil
}
if rechargeRecord.AutoPurchaseStatus == constants.AutoPurchaseStatusFailed {
return nil
}
packageIDs, err := parseLinkedPackageIDs(rechargeRecord.LinkedPackageIDs)
if err != nil {
h.logger.Error("解析关联套餐ID失败", zap.Uint("recharge_record_id", rechargeRecord.ID), zap.Error(err))
h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID)
return asynq.SkipRetry
}
if len(packageIDs) == 0 {
h.logger.Error("关联套餐ID为空无法自动购包", zap.Uint("recharge_record_id", rechargeRecord.ID))
h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID)
return asynq.SkipRetry
}
packages, totalAmount, err := h.loadPackages(ctx, packageIDs)
if err != nil {
h.logger.Error("加载关联套餐失败", zap.Uint("recharge_record_id", rechargeRecord.ID), zap.Error(err))
h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID)
return err
}
if err := h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
wallet, walletErr := h.walletStore.GetByID(ctx, rechargeRecord.AssetWalletID)
if walletErr != nil {
if walletErr == gorm.ErrRecordNotFound {
return errors.New("资产钱包不存在")
}
return walletErr
}
if wallet.GetAvailableBalance() < totalAmount {
return errors.New("钱包余额不足")
}
if err = h.walletStore.DeductBalanceWithTx(ctx, tx, wallet.ID, totalAmount, wallet.Version); err != nil {
return err
}
now := time.Now()
order, orderItems, buildErr := h.buildOrderAndItems(rechargeRecord, packages, totalAmount, now)
if buildErr != nil {
return buildErr
}
if err = tx.Create(order).Error; err != nil {
return err
}
for _, item := range orderItems {
item.OrderID = order.ID
}
if err = tx.CreateInBatches(orderItems, 100).Error; err != nil {
return err
}
refType := constants.ReferenceTypeOrder
walletTx := &model.AssetWalletTransaction{
AssetWalletID: wallet.ID,
ResourceType: wallet.ResourceType,
ResourceID: wallet.ResourceID,
UserID: rechargeRecord.UserID,
TransactionType: constants.AssetTransactionTypeDeduct,
Amount: -totalAmount,
BalanceBefore: wallet.Balance,
BalanceAfter: wallet.Balance - totalAmount,
Status: constants.TransactionStatusSuccess,
ReferenceType: &refType,
ReferenceNo: &order.OrderNo,
Creator: rechargeRecord.UserID,
ShopIDTag: wallet.ShopIDTag,
EnterpriseIDTag: wallet.EnterpriseIDTag,
}
if err = h.walletTransactionStore.CreateWithTx(ctx, tx, walletTx); err != nil {
return err
}
if err = h.activatePackages(ctx, tx, order, packages, now); err != nil {
return err
}
if err = tx.Model(&model.AssetRechargeRecord{}).
Where("id = ?", rechargeRecord.ID).
Update("auto_purchase_status", constants.AutoPurchaseStatusSuccess).Error; err != nil {
return err
}
return nil
}); err != nil {
h.logger.Error("自动购包任务执行失败",
zap.Uint("recharge_record_id", rechargeRecord.ID),
zap.Error(err),
)
h.markAutoPurchaseFailedIfFinalRetry(ctx, rechargeRecord.ID)
return err
}
h.logger.Info("自动购包任务执行成功", zap.Uint("recharge_record_id", rechargeRecord.ID))
return nil
}
// NewAutoPurchaseTask 创建充值后自动购包任务
func NewAutoPurchaseTask(rechargeRecordID uint) (*asynq.Task, error) {
payloadBytes, err := sonic.Marshal(AutoPurchasePayload{RechargeRecordID: rechargeRecordID})
if err != nil {
return nil, err
}
return asynq.NewTask(constants.TaskTypeAutoPurchaseAfterRecharge, payloadBytes,
asynq.MaxRetry(3),
asynq.Timeout(2*time.Minute),
asynq.Queue(constants.QueueDefault),
), nil
}
func (h *AutoPurchaseHandler) markAutoPurchaseFailedIfFinalRetry(ctx context.Context, rechargeRecordID uint) {
retryCount, ok := asynq.GetRetryCount(ctx)
if !ok {
return
}
maxRetry, ok := asynq.GetMaxRetry(ctx)
if !ok {
return
}
if retryCount < maxRetry-1 {
return
}
if err := h.db.WithContext(ctx).
Model(&model.AssetRechargeRecord{}).
Where("id = ?", rechargeRecordID).
Update("auto_purchase_status", constants.AutoPurchaseStatusFailed).Error; err != nil {
h.logger.Error("更新自动购包失败状态失败",
zap.Uint("recharge_record_id", rechargeRecordID),
zap.Error(err),
)
return
}
h.logger.Warn("自动购包达到最大重试次数,已标记失败", zap.Uint("recharge_record_id", rechargeRecordID))
}
func (h *AutoPurchaseHandler) loadPackages(ctx context.Context, packageIDs []uint) ([]*model.Package, int64, error) {
packages := make([]*model.Package, 0, len(packageIDs))
if err := h.db.WithContext(ctx).Where("id IN ?", packageIDs).Find(&packages).Error; err != nil {
return nil, 0, err
}
if len(packages) != len(packageIDs) {
return nil, 0, gorm.ErrRecordNotFound
}
totalAmount := int64(0)
for _, pkg := range packages {
totalAmount += pkg.SuggestedRetailPrice
}
if err := validatePackageTypeMix(packages); err != nil {
return nil, 0, err
}
return packages, totalAmount, nil
}
func (h *AutoPurchaseHandler) buildOrderAndItems(
rechargeRecord *model.AssetRechargeRecord,
packages []*model.Package,
totalAmount int64,
now time.Time,
) (*model.Order, []*model.OrderItem, error) {
orderType, iotCardID, deviceID, err := parseLinkedCarrier(rechargeRecord.LinkedOrderType, rechargeRecord.LinkedCarrierType, rechargeRecord.LinkedCarrierID)
if err != nil {
return nil, nil, err
}
generation := rechargeRecord.Generation
if generation <= 0 {
generation = 1
}
paidAmount := totalAmount
order := &model.Order{
BaseModel: model.BaseModel{
Creator: rechargeRecord.UserID,
Updater: rechargeRecord.UserID,
},
OrderNo: h.orderStore.GenerateOrderNo(),
OrderType: orderType,
BuyerType: model.BuyerTypePersonal,
BuyerID: rechargeRecord.UserID,
IotCardID: iotCardID,
DeviceID: deviceID,
TotalAmount: totalAmount,
PaymentMethod: model.PaymentMethodWallet,
PaymentStatus: model.PaymentStatusPaid,
PaidAt: &now,
CommissionStatus: model.CommissionStatusPending,
CommissionConfigVersion: 0,
Source: constants.OrderSourceClient,
Generation: generation,
ActualPaidAmount: &paidAmount,
SellerShopID: &rechargeRecord.ShopIDTag,
}
items := make([]*model.OrderItem, 0, len(packages))
for _, pkg := range packages {
items = append(items, &model.OrderItem{
BaseModel: model.BaseModel{
Creator: rechargeRecord.UserID,
Updater: rechargeRecord.UserID,
},
PackageID: pkg.ID,
PackageName: pkg.PackageName,
Quantity: 1,
UnitPrice: pkg.SuggestedRetailPrice,
Amount: pkg.SuggestedRetailPrice,
})
}
return order, items, nil
}
func (h *AutoPurchaseHandler) activatePackages(
ctx context.Context,
tx *gorm.DB,
order *model.Order,
packages []*model.Package,
now time.Time,
) error {
carrierType := constants.AssetWalletResourceTypeIotCard
carrierID := uint(0)
if order.OrderType == model.OrderTypeSingleCard && order.IotCardID != nil {
carrierID = *order.IotCardID
} else if order.OrderType == model.OrderTypeDevice && order.DeviceID != nil {
carrierType = constants.AssetWalletResourceTypeDevice
carrierID = *order.DeviceID
} else {
return errors.New("无效的订单载体")
}
for _, pkg := range packages {
var existingUsage model.PackageUsage
err := tx.Where("order_id = ? AND package_id = ?", order.ID, pkg.ID).First(&existingUsage).Error
if err == nil {
continue
}
if err != gorm.ErrRecordNotFound {
return err
}
if pkg.PackageType == constants.PackageTypeFormal {
if err = h.activateMainPackage(ctx, tx, order, pkg, carrierType, carrierID, now); err != nil {
return err
}
continue
}
if pkg.PackageType == constants.PackageTypeAddon {
if err = h.activateAddonPackage(ctx, tx, order, pkg, carrierType, carrierID, now); err != nil {
return err
}
}
}
return nil
}
func (h *AutoPurchaseHandler) activateMainPackage(
ctx context.Context,
tx *gorm.DB,
order *model.Order,
pkg *model.Package,
carrierType string,
carrierID uint,
now time.Time,
) error {
_ = ctx
var activeMainPackage model.PackageUsage
err := tx.Where("status = ?", constants.PackageUsageStatusActive).
Where("master_usage_id IS NULL").
Where(carrierType+"_id = ?", carrierID).
Order("priority ASC").
First(&activeMainPackage).Error
hasActiveMain := err == nil
var status int
var priority int
var activatedAt time.Time
var expiresAt time.Time
var nextResetAt *time.Time
var pendingRealnameActivation bool
if hasActiveMain {
status = constants.PackageUsageStatusPending
var maxPriority int
tx.Model(&model.PackageUsage{}).
Where(carrierType+"_id = ?", carrierID).
Select("COALESCE(MAX(priority), 0)").
Scan(&maxPriority)
priority = maxPriority + 1
} else {
status = constants.PackageUsageStatusActive
priority = 1
activatedAt = now
expiresAt = packagepkg.CalculateExpiryTime(pkg.CalendarType, activatedAt, pkg.DurationMonths, pkg.DurationDays)
nextResetAt = packagepkg.CalculateNextResetTime(pkg.DataResetCycle, pkg.CalendarType, now, activatedAt)
}
if pkg.EnableRealnameActivation {
status = constants.PackageUsageStatusPending
pendingRealnameActivation = true
}
usage := &model.PackageUsage{
BaseModel: model.BaseModel{
Creator: order.Creator,
Updater: order.Creator,
},
OrderID: order.ID,
PackageID: pkg.ID,
UsageType: order.OrderType,
DataLimitMB: pkg.RealDataMB,
Status: status,
Priority: priority,
DataResetCycle: pkg.DataResetCycle,
PendingRealnameActivation: pendingRealnameActivation,
Generation: order.Generation,
}
if carrierType == constants.AssetWalletResourceTypeIotCard {
usage.IotCardID = carrierID
} else {
usage.DeviceID = carrierID
}
if status == constants.PackageUsageStatusActive {
usage.ActivatedAt = activatedAt
usage.ExpiresAt = expiresAt
usage.NextResetAt = nextResetAt
}
if err = tx.Omit("status", "pending_realname_activation").Create(usage).Error; err != nil {
return err
}
return tx.Model(usage).Updates(map[string]any{
"status": usage.Status,
"pending_realname_activation": usage.PendingRealnameActivation,
}).Error
}
func (h *AutoPurchaseHandler) activateAddonPackage(
ctx context.Context,
tx *gorm.DB,
order *model.Order,
pkg *model.Package,
carrierType string,
carrierID uint,
now time.Time,
) error {
_ = ctx
var mainPackage model.PackageUsage
err := tx.Where("status IN ?", []int{constants.PackageUsageStatusPending, constants.PackageUsageStatusActive}).
Where("master_usage_id IS NULL").
Where(carrierType+"_id = ?", carrierID).
Order("priority ASC").
First(&mainPackage).Error
if err == gorm.ErrRecordNotFound {
return errors.New("必须有主套餐才能购买加油包")
}
if err != nil {
return err
}
var maxPriority int
tx.Model(&model.PackageUsage{}).
Where(carrierType+"_id = ?", carrierID).
Select("COALESCE(MAX(priority), 0)").
Scan(&maxPriority)
priority := maxPriority + 1
expiresAt := mainPackage.ExpiresAt
usage := &model.PackageUsage{
BaseModel: model.BaseModel{
Creator: order.Creator,
Updater: order.Creator,
},
OrderID: order.ID,
PackageID: pkg.ID,
UsageType: order.OrderType,
DataLimitMB: pkg.RealDataMB,
Status: constants.PackageUsageStatusActive,
Priority: priority,
MasterUsageID: &mainPackage.ID,
ActivatedAt: now,
ExpiresAt: expiresAt,
DataResetCycle: pkg.DataResetCycle,
Generation: order.Generation,
}
if carrierType == constants.AssetWalletResourceTypeIotCard {
usage.IotCardID = carrierID
} else {
usage.DeviceID = carrierID
}
return tx.Create(usage).Error
}
func parseLinkedPackageIDs(raw []byte) ([]uint, error) {
var packageIDs []uint
if len(raw) == 0 {
return nil, nil
}
if err := sonic.Unmarshal(raw, &packageIDs); err != nil {
return nil, err
}
return packageIDs, nil
}
func parseLinkedCarrier(linkedOrderType string, linkedCarrierType string, linkedCarrierID *uint) (string, *uint, *uint, error) {
if linkedCarrierID == nil || *linkedCarrierID == 0 {
return "", nil, nil, errors.New("关联载体ID为空")
}
if linkedOrderType == model.OrderTypeSingleCard || linkedCarrierType == "card" || linkedCarrierType == constants.AssetWalletResourceTypeIotCard {
id := *linkedCarrierID
return model.OrderTypeSingleCard, &id, nil, nil
}
if linkedOrderType == model.OrderTypeDevice || linkedCarrierType == "device" || linkedCarrierType == constants.AssetWalletResourceTypeDevice {
id := *linkedCarrierID
return model.OrderTypeDevice, nil, &id, nil
}
return "", nil, nil, errors.New("关联载体类型无效")
}
func validatePackageTypeMix(packages []*model.Package) error {
hasFormal := false
hasAddon := false
for _, pkg := range packages {
switch pkg.PackageType {
case constants.PackageTypeFormal:
hasFormal = true
case constants.PackageTypeAddon:
hasAddon = true
}
if hasFormal && hasAddon {
return errors.New("不允许在同一订单中同时购买正式套餐和加油包")
}
}
return nil
}