Files
junhong_cmp_fiber/internal/service/commission_calculation/service.go
huang 18daeae65a
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
## 变更概述
将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。

## 数据库变更
- 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record
- 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record
- 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包
- 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备

## 代码变更
- Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型
- Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存
- Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务
- Bootstrap 层:更新 Store 和 Service 依赖注入
- 常量层:按钱包类型重新组织常量和 Redis Key 生成函数

## 技术特性
- 乐观锁:使用 version 字段防止并发冲突
- 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤
- 事务管理:所有余额变动使用事务保证 ACID
- 缓存策略:Cache-Aside 模式,余额变动后删除缓存

## 业务影响
- 代理钱包和卡钱包业务完全隔离,互不影响
- 为独立监控、优化、扩展打下基础
- 提升代理钱包的稳定性和独立性

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-25 09:51:00 +08:00

621 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package commission_calculation
import (
"context"
"fmt"
"time"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/service/commission_stats"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"go.uber.org/zap"
"gorm.io/gorm"
)
type Service struct {
db *gorm.DB
commissionRecordStore *postgres.CommissionRecordStore
shopStore *postgres.ShopStore
shopPackageAllocationStore *postgres.ShopPackageAllocationStore
shopSeriesAllocationStore *postgres.ShopSeriesAllocationStore
packageSeriesStore *postgres.PackageSeriesStore
iotCardStore *postgres.IotCardStore
deviceStore *postgres.DeviceStore
agentWalletStore *postgres.AgentWalletStore
agentWalletTransactionStore *postgres.AgentWalletTransactionStore
orderStore *postgres.OrderStore
orderItemStore *postgres.OrderItemStore
packageStore *postgres.PackageStore
commissionStatsStore *postgres.ShopSeriesCommissionStatsStore
commissionStatsService *commission_stats.Service
logger *zap.Logger
}
func New(
db *gorm.DB,
commissionRecordStore *postgres.CommissionRecordStore,
shopStore *postgres.ShopStore,
shopPackageAllocationStore *postgres.ShopPackageAllocationStore,
shopSeriesAllocationStore *postgres.ShopSeriesAllocationStore,
packageSeriesStore *postgres.PackageSeriesStore,
iotCardStore *postgres.IotCardStore,
deviceStore *postgres.DeviceStore,
agentWalletStore *postgres.AgentWalletStore,
agentWalletTransactionStore *postgres.AgentWalletTransactionStore,
orderStore *postgres.OrderStore,
orderItemStore *postgres.OrderItemStore,
packageStore *postgres.PackageStore,
commissionStatsStore *postgres.ShopSeriesCommissionStatsStore,
commissionStatsService *commission_stats.Service,
logger *zap.Logger,
) *Service {
return &Service{
db: db,
commissionRecordStore: commissionRecordStore,
shopStore: shopStore,
shopPackageAllocationStore: shopPackageAllocationStore,
shopSeriesAllocationStore: shopSeriesAllocationStore,
packageSeriesStore: packageSeriesStore,
iotCardStore: iotCardStore,
deviceStore: deviceStore,
agentWalletStore: agentWalletStore,
agentWalletTransactionStore: agentWalletTransactionStore,
orderStore: orderStore,
orderItemStore: orderItemStore,
packageStore: packageStore,
commissionStatsStore: commissionStatsStore,
commissionStatsService: commissionStatsService,
logger: logger,
}
}
func (s *Service) CalculateCommission(ctx context.Context, orderID uint) error {
return s.db.Transaction(func(tx *gorm.DB) error {
order, err := s.orderStore.GetByID(ctx, orderID)
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取订单失败")
}
if order.CommissionStatus == model.CommissionStatusCalculated {
s.logger.Warn("订单佣金已计算,跳过", zap.String("order_id", fmt.Sprint(orderID)))
return nil
}
costDiffRecords, err := s.CalculateCostDiffCommission(ctx, order)
if err != nil {
return errors.Wrap(errors.CodeInternalError, err, "计算成本价差佣金失败")
}
for _, record := range costDiffRecords {
if err := tx.Create(record).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "创建成本价差佣金记录失败")
}
if err := s.creditCommissionInTx(ctx, tx, record); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "佣金入账失败")
}
}
// 代购订单不触发一次性佣金和累计充值更新
if !order.IsPurchaseOnBehalf {
if order.OrderType == model.OrderTypeSingleCard && order.IotCardID != nil {
if err := s.triggerOneTimeCommissionForCardInTx(ctx, tx, order, *order.IotCardID); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "触发单卡一次性佣金失败")
}
} else if order.OrderType == model.OrderTypeDevice && order.DeviceID != nil {
if err := s.triggerOneTimeCommissionForDeviceInTx(ctx, tx, order, *order.DeviceID); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "触发设备一次性佣金失败")
}
}
}
if err := tx.Model(&model.Order{}).Where("id = ?", orderID).Update("commission_status", model.CommissionStatusCalculated).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新订单佣金状态失败")
}
return nil
})
}
func (s *Service) CalculateCostDiffCommission(ctx context.Context, order *model.Order) ([]*model.CommissionRecord, error) {
var records []*model.CommissionRecord
if order.SellerShopID == nil {
return records, nil
}
if order.SeriesID == nil {
s.logger.Warn("订单缺少系列ID跳过成本价差佣金计算", zap.Uint("order_id", order.ID))
return records, nil
}
sellerShop, err := s.shopStore.GetByID(ctx, *order.SellerShopID)
if err != nil {
return nil, errors.Wrap(errors.CodeDatabaseError, err, "获取销售店铺失败")
}
sellerProfit := order.TotalAmount - order.SellerCostPrice
if sellerProfit > 0 {
records = append(records, &model.CommissionRecord{
BaseModel: model.BaseModel{
Creator: order.Creator,
Updater: order.Updater,
},
ShopID: *order.SellerShopID,
OrderID: order.ID,
IotCardID: order.IotCardID,
DeviceID: order.DeviceID,
CommissionSource: model.CommissionSourceCostDiff,
Amount: sellerProfit,
Status: model.CommissionStatusReleased,
})
}
// 获取订单明细以获取套餐ID用于成本价查询
orderItems, err := s.orderItemStore.ListByOrderID(ctx, order.ID)
if err != nil || len(orderItems) == 0 {
s.logger.Warn("获取订单明细失败或订单无明细,跳过成本价差佣金计算", zap.Uint("order_id", order.ID), zap.Error(err))
return records, nil
}
packageID := orderItems[0].PackageID
childCostPrice := order.SellerCostPrice
currentShopID := sellerShop.ParentID
for currentShopID != nil && *currentShopID > 0 {
currentShop, err := s.shopStore.GetByID(ctx, *currentShopID)
if err != nil {
s.logger.Error("获取上级店铺失败", zap.Uint("shop_id", *currentShopID), zap.Error(err))
break
}
allocation, err := s.shopPackageAllocationStore.GetByShopAndPackage(ctx, currentShop.ID, packageID)
if err != nil {
s.logger.Warn("上级店铺未分配该套餐,跳过", zap.Uint("shop_id", currentShop.ID), zap.Uint("package_id", packageID))
break
}
myCostPrice := allocation.CostPrice
profit := childCostPrice - myCostPrice
if profit > 0 {
records = append(records, &model.CommissionRecord{
BaseModel: model.BaseModel{
Creator: order.Creator,
Updater: order.Updater,
},
ShopID: currentShop.ID,
OrderID: order.ID,
IotCardID: order.IotCardID,
DeviceID: order.DeviceID,
CommissionSource: model.CommissionSourceCostDiff,
Amount: profit,
Status: model.CommissionStatusReleased,
})
}
childCostPrice = myCostPrice
currentShopID = currentShop.ParentID
}
return records, nil
}
func (s *Service) triggerOneTimeCommissionForCardInTx(ctx context.Context, tx *gorm.DB, order *model.Order, cardID uint) error {
if order.IsPurchaseOnBehalf {
return nil
}
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取卡信息失败")
}
if card.SeriesID == nil || card.ShopID == nil {
return nil
}
seriesID := *card.SeriesID
series, err := s.packageSeriesStore.GetByID(ctx, seriesID)
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取套餐系列失败")
}
config, err := series.GetOneTimeCommissionConfig()
if err != nil || config == nil || !config.Enable {
return nil
}
if s.isOneTimeCommissionExpired(config, card.ActivatedAt) {
s.logger.Info("一次性佣金规则已过期,跳过",
zap.Uint("card_id", cardID),
zap.Uint("series_id", seriesID),
zap.String("validity_type", config.ValidityType))
return nil
}
if config.TriggerType == model.OneTimeCommissionTriggerFirstRecharge {
accumulatedBySeries := card.GetAccumulatedRechargeBySeries(seriesID)
newAccumulated := accumulatedBySeries + order.TotalAmount
card.AddAccumulatedRechargeBySeries(seriesID, order.TotalAmount)
if err := tx.Model(&model.IotCard{}).Where("id = ?", cardID).
Update("accumulated_recharge_by_series", card.AccumulatedRechargeBySeriesJSON).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新卡累计充值金额失败")
}
if card.IsFirstRechargeTriggeredBySeries(seriesID) {
return nil
}
if newAccumulated < config.Threshold {
return nil
}
}
if card.IsFirstRechargeTriggeredBySeries(seriesID) {
return nil
}
records, err := s.calculateChainOneTimeCommission(ctx, *card.ShopID, seriesID, order, &cardID, nil)
if err != nil {
return err
}
for _, record := range records {
if err := tx.Create(record).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "创建一次性佣金记录失败")
}
if err := s.creditCommissionInTx(ctx, tx, record); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "一次性佣金入账失败")
}
}
card.SetFirstRechargeTriggeredBySeries(seriesID, true)
if err := tx.Model(&model.IotCard{}).Where("id = ?", cardID).
Update("first_recharge_triggered_by_series", card.FirstRechargeTriggeredBySeriesJSON).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新卡佣金发放状态失败")
}
return nil
}
func (s *Service) TriggerOneTimeCommissionForCard(ctx context.Context, order *model.Order, cardID uint) error {
return s.db.Transaction(func(tx *gorm.DB) error {
return s.triggerOneTimeCommissionForCardInTx(ctx, tx, order, cardID)
})
}
func (s *Service) triggerOneTimeCommissionForDeviceInTx(ctx context.Context, tx *gorm.DB, order *model.Order, deviceID uint) error {
if order.IsPurchaseOnBehalf {
return nil
}
device, err := s.deviceStore.GetByID(ctx, deviceID)
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取设备信息失败")
}
if device.SeriesID == nil || device.ShopID == nil {
return nil
}
seriesID := *device.SeriesID
series, err := s.packageSeriesStore.GetByID(ctx, seriesID)
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取套餐系列失败")
}
config, err := series.GetOneTimeCommissionConfig()
if err != nil || config == nil || !config.Enable {
return nil
}
if s.isOneTimeCommissionExpired(config, device.ActivatedAt) {
s.logger.Info("一次性佣金规则已过期,跳过",
zap.Uint("device_id", deviceID),
zap.Uint("series_id", seriesID),
zap.String("validity_type", config.ValidityType))
return nil
}
if config.TriggerType == model.OneTimeCommissionTriggerFirstRecharge {
accumulatedBySeries := device.GetAccumulatedRechargeBySeries(seriesID)
newAccumulated := accumulatedBySeries + order.TotalAmount
device.AddAccumulatedRechargeBySeries(seriesID, order.TotalAmount)
if err := tx.Model(&model.Device{}).Where("id = ?", deviceID).
Update("accumulated_recharge_by_series", device.AccumulatedRechargeBySeriesJSON).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新设备累计充值金额失败")
}
if device.IsFirstRechargeTriggeredBySeries(seriesID) {
return nil
}
if newAccumulated < config.Threshold {
return nil
}
}
if device.IsFirstRechargeTriggeredBySeries(seriesID) {
return nil
}
records, err := s.calculateChainOneTimeCommission(ctx, *device.ShopID, seriesID, order, nil, &deviceID)
if err != nil {
return err
}
for _, record := range records {
if err := tx.Create(record).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "创建一次性佣金记录失败")
}
if err := s.creditCommissionInTx(ctx, tx, record); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "一次性佣金入账失败")
}
}
device.SetFirstRechargeTriggeredBySeries(seriesID, true)
if err := tx.Model(&model.Device{}).Where("id = ?", deviceID).
Update("first_recharge_triggered_by_series", device.FirstRechargeTriggeredBySeriesJSON).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新设备佣金发放状态失败")
}
return nil
}
func (s *Service) TriggerOneTimeCommissionForDevice(ctx context.Context, order *model.Order, deviceID uint) error {
return s.db.Transaction(func(tx *gorm.DB) error {
return s.triggerOneTimeCommissionForDeviceInTx(ctx, tx, order, deviceID)
})
}
func (s *Service) isOneTimeCommissionExpired(config *model.OneTimeCommissionConfig, activatedAt *time.Time) bool {
if config == nil {
return true
}
now := time.Now()
switch config.ValidityType {
case model.OneTimeCommissionValidityPermanent:
return false
case model.OneTimeCommissionValidityFixedDate:
if config.ValidityValue == "" {
return false
}
expiryDate, err := time.Parse("2006-01-02", config.ValidityValue)
if err != nil {
s.logger.Warn("解析一次性佣金到期日期失败",
zap.String("validity_value", config.ValidityValue),
zap.Error(err))
return false
}
expiryDate = expiryDate.Add(24*time.Hour - time.Second)
return now.After(expiryDate)
case model.OneTimeCommissionValidityRelative:
if activatedAt == nil {
return false
}
if config.ValidityValue == "" {
return false
}
months := 0
if _, err := fmt.Sscanf(config.ValidityValue, "%d", &months); err != nil || months <= 0 {
s.logger.Warn("解析一次性佣金相对时长失败",
zap.String("validity_value", config.ValidityValue),
zap.Error(err))
return false
}
expiryTime := activatedAt.AddDate(0, months, 0)
return now.After(expiryTime)
default:
return false
}
}
func (s *Service) calculateChainOneTimeCommission(ctx context.Context, bottomShopID uint, seriesID uint, order *model.Order, cardID *uint, deviceID *uint) ([]*model.CommissionRecord, error) {
var records []*model.CommissionRecord
series, err := s.packageSeriesStore.GetByID(ctx, seriesID)
if err != nil {
s.logger.Warn("获取套餐系列失败,跳过一次性佣金", zap.Uint("series_id", seriesID), zap.Error(err))
return records, nil
}
config, err := series.GetOneTimeCommissionConfig()
if err != nil || config == nil || !config.Enable {
return records, nil
}
bottomSeriesAllocation, err := s.shopSeriesAllocationStore.GetByShopAndSeries(ctx, bottomShopID, seriesID)
if err != nil {
s.logger.Warn("底层店铺未分配该系列,跳过一次性佣金", zap.Uint("shop_id", bottomShopID), zap.Uint("series_id", seriesID))
return records, nil
}
bottomShop, err := s.shopStore.GetByID(ctx, bottomShopID)
if err != nil {
return nil, errors.Wrap(errors.CodeDatabaseError, err, "获取店铺信息失败")
}
childAmountGiven := int64(0)
currentShopID := bottomShopID
currentShop := bottomShop
currentSeriesAllocation := bottomSeriesAllocation
for {
var myAmount int64
if config.CommissionType == "tiered" && len(config.Tiers) > 0 {
tieredAmount, tierErr := s.matchOneTimeCommissionTier(ctx, currentShopID, seriesID, currentSeriesAllocation.ID, config.Tiers)
if tierErr != nil {
s.logger.Warn("匹配梯度佣金失败,使用固定金额", zap.Uint("shop_id", currentShopID), zap.Error(tierErr))
myAmount = currentSeriesAllocation.OneTimeCommissionAmount
} else {
myAmount = tieredAmount
}
} else {
myAmount = currentSeriesAllocation.OneTimeCommissionAmount
}
actualProfit := myAmount - childAmountGiven
if actualProfit > 0 {
remark := "一次性佣金"
if deviceID != nil {
remark = "一次性佣金(设备)"
}
records = append(records, &model.CommissionRecord{
BaseModel: model.BaseModel{
Creator: order.Creator,
Updater: order.Updater,
},
ShopID: currentShopID,
OrderID: order.ID,
IotCardID: cardID,
DeviceID: deviceID,
CommissionSource: model.CommissionSourceOneTime,
Amount: actualProfit,
Status: model.CommissionStatusReleased,
Remark: remark,
})
}
if currentShop.ParentID == nil || *currentShop.ParentID == 0 {
break
}
parentShopID := *currentShop.ParentID
parentSeriesAllocation, err := s.shopSeriesAllocationStore.GetByShopAndSeries(ctx, parentShopID, seriesID)
if err != nil {
s.logger.Warn("上级店铺未分配该系列,停止链式计算",
zap.Uint("parent_shop_id", parentShopID),
zap.Uint("series_id", seriesID))
break
}
parentShop, err := s.shopStore.GetByID(ctx, parentShopID)
if err != nil {
s.logger.Error("获取上级店铺失败", zap.Uint("shop_id", parentShopID), zap.Error(err))
break
}
childAmountGiven = myAmount
currentShopID = parentShopID
currentShop = parentShop
currentSeriesAllocation = parentSeriesAllocation
}
return records, nil
}
func (s *Service) matchOneTimeCommissionTier(ctx context.Context, shopID uint, seriesID uint, allocationID uint, tiers []model.OneTimeCommissionTier) (int64, error) {
if len(tiers) == 0 {
return 0, nil
}
now := time.Now()
var matchedAmount int64 = 0
for _, tier := range tiers {
var salesCount, salesAmount int64
var err error
if tier.StatScope == model.OneTimeCommissionStatScopeSelfAndSub {
subordinateIDs, subErr := s.shopStore.GetSubordinateShopIDs(ctx, shopID)
if subErr != nil {
s.logger.Warn("获取下级店铺失败", zap.Uint("shop_id", shopID), zap.Error(subErr))
subordinateIDs = []uint{shopID}
}
allocationIDs, allocErr := s.shopSeriesAllocationStore.GetIDsByShopIDsAndSeries(ctx, subordinateIDs, seriesID)
if allocErr != nil {
return 0, errors.Wrap(errors.CodeDatabaseError, allocErr, "获取下级分配ID失败")
}
salesCount, salesAmount, err = s.commissionStatsStore.GetAggregatedStats(ctx, allocationIDs, "monthly", now)
} else {
salesCount, salesAmount, err = s.commissionStatsStore.GetAggregatedStats(ctx, []uint{allocationID}, "monthly", now)
}
if err != nil {
s.logger.Warn("获取销售统计失败", zap.Uint("allocation_id", allocationID), zap.Error(err))
continue
}
var currentValue int64
if tier.Dimension == model.TierTypeSalesCount {
currentValue = salesCount
} else {
currentValue = salesAmount
}
if currentValue >= tier.Threshold && tier.Amount > matchedAmount {
matchedAmount = tier.Amount
}
}
return matchedAmount, nil
}
func (s *Service) creditCommissionInTx(ctx context.Context, tx *gorm.DB, record *model.CommissionRecord) error {
// 获取店铺的分佣钱包
var wallet model.AgentWallet
if err := tx.Where("shop_id = ? AND wallet_type = ?", record.ShopID, "commission").First(&wallet).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "获取店铺分佣钱包失败")
}
balanceBefore := wallet.Balance
// 使用 AgentWalletStore 的方法增加余额(带事务)
if err := s.agentWalletStore.AddBalanceWithTx(ctx, tx, wallet.ID, record.Amount); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "更新钱包余额失败")
}
// 更新佣金记录状态
now := time.Now()
if err := tx.Model(record).Updates(map[string]any{
"balance_after": balanceBefore + record.Amount,
"status": model.CommissionStatusReleased,
"released_at": now,
}).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "更新佣金记录失败")
}
// 创建代理钱包交易记录
remark := "佣金入账"
transaction := &model.AgentWalletTransaction{
AgentWalletID: wallet.ID,
ShopID: record.ShopID,
UserID: record.Creator,
TransactionType: "commission",
Amount: record.Amount,
BalanceBefore: balanceBefore,
BalanceAfter: balanceBefore + record.Amount,
Status: 1,
ReferenceType: stringPtr("commission"),
ReferenceID: &record.ID,
Remark: &remark,
Creator: record.Creator,
ShopIDTag: record.ShopID,
}
if err := s.agentWalletTransactionStore.CreateWithTx(ctx, tx, transaction); err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "创建钱包交易记录失败")
}
return nil
}
func (s *Service) CreditCommission(ctx context.Context, record *model.CommissionRecord) error {
return s.db.Transaction(func(tx *gorm.DB) error {
return s.creditCommissionInTx(ctx, tx, record)
})
}
func stringPtr(s string) *string {
return &s
}