重构: 店铺套餐分配系统从加价模式改为返佣模式
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m18s
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 5m18s
主要变更: - 重构分配模型:从加价模式(pricing_mode/pricing_value)改为返佣模式(base_commission + tier_commission) - 删除独立的 my_package 接口,统一到 /api/admin/packages(通过数据权限自动过滤) - 新增批量分配和批量调价功能,支持事务和性能优化 - 新增配置版本管理,订单创建时锁定返佣配置 - 新增成本价历史记录,支持审计和纠纷处理 - 新增统计缓存系统(Redis + 异步任务),优化梯度返佣计算性能 - 删除冗余的梯度佣金独立 CRUD 接口(合并到分配配置中) - 归档 3 个已完成的 OpenSpec changes 并同步 8 个新 capabilities 到 main specs 技术细节: - 数据库迁移:000026_refactor_shop_package_allocation - 新增 Store:AllocationConfigStore, PriceHistoryStore, CommissionStatsStore - 新增 Service:BatchAllocationService, BatchPricingService, CommissionStatsService - 新增异步任务:统计更新、定时同步、周期归档 - 测试覆盖:批量操作集成测试、梯度佣金 CRUD 清理验证 影响: - API 变更:删除 4 个梯度 CRUD 接口(POST/GET/PUT/DELETE /:id/tiers) - API 新增:批量分配、批量调价接口 - 数据模型:重构 shop_series_allocation 表结构 - 性能优化:批量操作使用 CreateInBatches,统计使用 Redis 缓存 相关文档: - openspec/changes/archive/2026-01-28-refactor-shop-package-allocation/ - openspec/specs/agent-available-packages/ - openspec/specs/allocation-config-versioning/ - 等 8 个新 capability specs
This commit is contained in:
120
internal/task/commission_stats_archive.go
Normal file
120
internal/task/commission_stats_archive.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||||
pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm"
|
||||
)
|
||||
|
||||
type CommissionStatsArchiveHandler struct {
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewCommissionStatsArchiveHandler(
|
||||
db *gorm.DB,
|
||||
redis *redis.Client,
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore,
|
||||
logger *zap.Logger,
|
||||
) *CommissionStatsArchiveHandler {
|
||||
return &CommissionStatsArchiveHandler{
|
||||
db: db,
|
||||
redis: redis,
|
||||
statsStore: statsStore,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *CommissionStatsArchiveHandler) HandleCommissionStatsArchive(ctx context.Context, task *asynq.Task) error {
|
||||
ctx = pkggorm.SkipDataPermission(ctx)
|
||||
|
||||
now := time.Now()
|
||||
lastMonthStart := now.AddDate(0, -1, 0)
|
||||
lastMonthStart = time.Date(lastMonthStart.Year(), lastMonthStart.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
lastMonthEnd := now.AddDate(0, 0, -now.Day()).Add(24*time.Hour - time.Second)
|
||||
|
||||
var stats []model.ShopSeriesCommissionStats
|
||||
err := h.db.Where("period_start >= ? AND period_end <= ? AND status = ?",
|
||||
lastMonthStart, lastMonthEnd, model.StatsStatusActive).
|
||||
Find(&stats).Error
|
||||
|
||||
if err != nil {
|
||||
h.logger.Error("查询需要归档的统计记录失败", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if len(stats) == 0 {
|
||||
h.logger.Info("没有需要归档的统计记录")
|
||||
return nil
|
||||
}
|
||||
|
||||
archivedCount := 0
|
||||
for _, stat := range stats {
|
||||
period := stat.PeriodStart.Format("2006-01")
|
||||
redisKey := constants.RedisCommissionStatsKey(stat.AllocationID, period)
|
||||
|
||||
if err := h.archiveStats(ctx, &stat, redisKey); err != nil {
|
||||
h.logger.Error("归档统计失败",
|
||||
zap.Uint("allocation_id", stat.AllocationID),
|
||||
zap.String("period", period),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
archivedCount++
|
||||
}
|
||||
|
||||
h.logger.Info("统计归档完成",
|
||||
zap.Int("total", len(stats)),
|
||||
zap.Int("archived", archivedCount),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *CommissionStatsArchiveHandler) archiveStats(ctx context.Context, stats *model.ShopSeriesCommissionStats, redisKey string) error {
|
||||
return h.db.Transaction(func(tx *gorm.DB) error {
|
||||
exists, err := h.redis.Exists(ctx, redisKey).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exists > 0 {
|
||||
data, err := h.redis.HGetAll(ctx, redisKey).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) > 0 {
|
||||
if err := h.redis.Del(ctx, redisKey).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Model(stats).
|
||||
Where("id = ? AND version = ?", stats.ID, stats.Version).
|
||||
Updates(map[string]interface{}{
|
||||
"status": model.StatsStatusCompleted,
|
||||
"last_updated_at": time.Now(),
|
||||
"version": gorm.Expr("version + 1"),
|
||||
}).Error
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
155
internal/task/commission_stats_sync.go
Normal file
155
internal/task/commission_stats_sync.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/model"
|
||||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||||
pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm"
|
||||
)
|
||||
|
||||
type CommissionStatsSyncHandler struct {
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewCommissionStatsSyncHandler(
|
||||
db *gorm.DB,
|
||||
redis *redis.Client,
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore,
|
||||
logger *zap.Logger,
|
||||
) *CommissionStatsSyncHandler {
|
||||
return &CommissionStatsSyncHandler{
|
||||
db: db,
|
||||
redis: redis,
|
||||
statsStore: statsStore,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *CommissionStatsSyncHandler) HandleCommissionStatsSync(ctx context.Context, task *asynq.Task) error {
|
||||
ctx = pkggorm.SkipDataPermission(ctx)
|
||||
|
||||
lockKey := constants.RedisCommissionStatsLockKey()
|
||||
locked, err := h.redis.SetNX(ctx, lockKey, "1", 5*time.Minute).Result()
|
||||
if err != nil {
|
||||
h.logger.Error("获取同步锁失败", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if !locked {
|
||||
h.logger.Info("同步任务已在执行,跳过本次")
|
||||
return nil
|
||||
}
|
||||
defer h.redis.Del(ctx, lockKey)
|
||||
|
||||
pattern := "commission:stats:*"
|
||||
var cursor uint64
|
||||
syncCount := 0
|
||||
|
||||
for {
|
||||
keys, nextCursor, err := h.redis.Scan(ctx, cursor, pattern, 100).Result()
|
||||
if err != nil {
|
||||
h.logger.Error("扫描 Redis keys 失败", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if err := h.syncStatsFromRedis(ctx, key); err != nil {
|
||||
h.logger.Error("同步统计失败",
|
||||
zap.String("key", key),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
syncCount++
|
||||
}
|
||||
|
||||
cursor = nextCursor
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
h.logger.Info("统计同步完成", zap.Int("sync_count", syncCount))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *CommissionStatsSyncHandler) syncStatsFromRedis(ctx context.Context, redisKey string) error {
|
||||
parts := strings.Split(redisKey, ":")
|
||||
if len(parts) != 4 {
|
||||
return nil
|
||||
}
|
||||
|
||||
allocationID, err := strconv.ParseUint(parts[2], 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
period := parts[3]
|
||||
|
||||
data, err := h.redis.HGetAll(ctx, redisKey).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
totalCount, _ := strconv.ParseInt(data["total_count"], 10, 64)
|
||||
totalAmount, _ := strconv.ParseInt(data["total_amount"], 10, 64)
|
||||
|
||||
periodStart, periodEnd := parsePeriod(period)
|
||||
|
||||
return h.db.Transaction(func(tx *gorm.DB) error {
|
||||
var stats model.ShopSeriesCommissionStats
|
||||
err := tx.Where("allocation_id = ? AND period_start = ? AND period_end = ?",
|
||||
allocationID, periodStart, periodEnd).
|
||||
First(&stats).Error
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
stats = model.ShopSeriesCommissionStats{
|
||||
AllocationID: uint(allocationID),
|
||||
PeriodType: "monthly",
|
||||
PeriodStart: periodStart,
|
||||
PeriodEnd: periodEnd,
|
||||
TotalSalesCount: totalCount,
|
||||
TotalSalesAmount: totalAmount,
|
||||
Status: "active",
|
||||
LastUpdatedAt: time.Now(),
|
||||
}
|
||||
return tx.Create(&stats).Error
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Model(&stats).
|
||||
Where("version = ?", stats.Version).
|
||||
Updates(map[string]interface{}{
|
||||
"total_sales_count": totalCount,
|
||||
"total_sales_amount": totalAmount,
|
||||
"last_updated_at": time.Now(),
|
||||
"version": gorm.Expr("version + 1"),
|
||||
}).Error
|
||||
})
|
||||
}
|
||||
|
||||
func parsePeriod(period string) (time.Time, time.Time) {
|
||||
t, _ := time.Parse("2006-01", period)
|
||||
periodStart := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
periodEnd := periodStart.AddDate(0, 1, 0).Add(-time.Second)
|
||||
return periodStart, periodEnd
|
||||
}
|
||||
111
internal/task/commission_stats_update.go
Normal file
111
internal/task/commission_stats_update.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/bytedance/sonic"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
||||
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
||||
pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm"
|
||||
)
|
||||
|
||||
type CommissionStatsUpdatePayload struct {
|
||||
AllocationID uint `json:"allocation_id"`
|
||||
SalesCount int64 `json:"sales_count"`
|
||||
SalesAmount int64 `json:"sales_amount"`
|
||||
}
|
||||
|
||||
type CommissionStatsUpdateHandler struct {
|
||||
redis *redis.Client
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore
|
||||
allocationStore *postgres.ShopSeriesAllocationStore
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewCommissionStatsUpdateHandler(
|
||||
redis *redis.Client,
|
||||
statsStore *postgres.ShopSeriesCommissionStatsStore,
|
||||
allocationStore *postgres.ShopSeriesAllocationStore,
|
||||
logger *zap.Logger,
|
||||
) *CommissionStatsUpdateHandler {
|
||||
return &CommissionStatsUpdateHandler{
|
||||
redis: redis,
|
||||
statsStore: statsStore,
|
||||
allocationStore: allocationStore,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *CommissionStatsUpdateHandler) HandleCommissionStatsUpdate(ctx context.Context, task *asynq.Task) error {
|
||||
ctx = pkggorm.SkipDataPermission(ctx)
|
||||
|
||||
var payload CommissionStatsUpdatePayload
|
||||
if err := sonic.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
h.logger.Error("解析统计更新任务载荷失败",
|
||||
zap.Error(err),
|
||||
zap.String("task_id", task.ResultWriter().TaskID()),
|
||||
)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
allocation, err := h.allocationStore.GetByID(ctx, payload.AllocationID)
|
||||
if err != nil {
|
||||
h.logger.Error("获取分配记录失败",
|
||||
zap.Uint("allocation_id", payload.AllocationID),
|
||||
zap.Error(err),
|
||||
)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
if !allocation.EnableTierCommission {
|
||||
h.logger.Info("分配未启用梯度返佣,跳过统计更新",
|
||||
zap.Uint("allocation_id", payload.AllocationID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
period := getCurrentPeriod(now)
|
||||
redisKey := constants.RedisCommissionStatsKey(payload.AllocationID, period)
|
||||
|
||||
pipe := h.redis.Pipeline()
|
||||
pipe.HIncrBy(ctx, redisKey, "total_count", payload.SalesCount)
|
||||
pipe.HIncrBy(ctx, redisKey, "total_amount", payload.SalesAmount)
|
||||
|
||||
periodEnd := getPeriodEnd(now)
|
||||
expireAt := periodEnd.AddDate(0, 0, 7)
|
||||
pipe.ExpireAt(ctx, redisKey, expireAt)
|
||||
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
h.logger.Error("更新 Redis 统计失败",
|
||||
zap.Uint("allocation_id", payload.AllocationID),
|
||||
zap.String("period", period),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
h.logger.Info("统计更新成功",
|
||||
zap.Uint("allocation_id", payload.AllocationID),
|
||||
zap.String("period", period),
|
||||
zap.Int64("sales_count", payload.SalesCount),
|
||||
zap.Int64("sales_amount", payload.SalesAmount),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getCurrentPeriod(t time.Time) string {
|
||||
return t.Format("2006-01")
|
||||
}
|
||||
|
||||
func getPeriodEnd(t time.Time) time.Time {
|
||||
year, month, _ := t.Date()
|
||||
nextMonth := time.Date(year, month+1, 1, 0, 0, 0, 0, t.Location())
|
||||
return nextMonth.Add(-time.Second)
|
||||
}
|
||||
Reference in New Issue
Block a user