Files
junhong_cmp_fiber/internal/task/commission_stats_sync.go
huang 03a0960c4d
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m2s
refactor: 数据权限过滤从 GORM Callback 改为 Store 层显式调用
- 移除 RegisterDataPermissionCallback 和 SkipDataPermission 机制
- 在 Auth 中间件预计算 SubordinateShopIDs 并注入 Context
- 新增 ApplyShopFilter/ApplyEnterpriseFilter/ApplyOwnerShopFilter 等 Helper 函数
- 所有 Store 层查询方法显式调用数据权限过滤函数
- 权限检查函数 CanManageShop/CanManageEnterprise 改为从 Context 获取数据

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-26 16:38:52 +08:00

153 lines
3.6 KiB
Go

package task
import (
"context"
"strconv"
"strings"
"time"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
)
type CommissionStatsSyncHandler struct {
db *gorm.DB
redis *redis.Client
statsStore *postgres.ShopSeriesCommissionStatsStore
logger *zap.Logger
}
func NewCommissionStatsSyncHandler(
db *gorm.DB,
redis *redis.Client,
statsStore *postgres.ShopSeriesCommissionStatsStore,
logger *zap.Logger,
) *CommissionStatsSyncHandler {
return &CommissionStatsSyncHandler{
db: db,
redis: redis,
statsStore: statsStore,
logger: logger,
}
}
func (h *CommissionStatsSyncHandler) HandleCommissionStatsSync(ctx context.Context, task *asynq.Task) error {
lockKey := constants.RedisCommissionStatsLockKey()
locked, err := h.redis.SetNX(ctx, lockKey, "1", 5*time.Minute).Result()
if err != nil {
h.logger.Error("获取同步锁失败", zap.Error(err))
return err
}
if !locked {
h.logger.Info("同步任务已在执行,跳过本次")
return nil
}
defer h.redis.Del(ctx, lockKey)
pattern := "commission:stats:*"
var cursor uint64
syncCount := 0
for {
keys, nextCursor, err := h.redis.Scan(ctx, cursor, pattern, 100).Result()
if err != nil {
h.logger.Error("扫描 Redis keys 失败", zap.Error(err))
return err
}
for _, key := range keys {
if err := h.syncStatsFromRedis(ctx, key); err != nil {
h.logger.Error("同步统计失败",
zap.String("key", key),
zap.Error(err),
)
continue
}
syncCount++
}
cursor = nextCursor
if cursor == 0 {
break
}
}
h.logger.Info("统计同步完成", zap.Int("sync_count", syncCount))
return nil
}
func (h *CommissionStatsSyncHandler) syncStatsFromRedis(ctx context.Context, redisKey string) error {
parts := strings.Split(redisKey, ":")
if len(parts) != 4 {
return nil
}
allocationID, err := strconv.ParseUint(parts[2], 10, 32)
if err != nil {
return err
}
period := parts[3]
data, err := h.redis.HGetAll(ctx, redisKey).Result()
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
totalCount, _ := strconv.ParseInt(data["total_count"], 10, 64)
totalAmount, _ := strconv.ParseInt(data["total_amount"], 10, 64)
periodStart, periodEnd := parsePeriod(period)
return h.db.Transaction(func(tx *gorm.DB) error {
var stats model.ShopSeriesCommissionStats
err := tx.Where("allocation_id = ? AND period_start = ? AND period_end = ?",
allocationID, periodStart, periodEnd).
First(&stats).Error
if err == gorm.ErrRecordNotFound {
stats = model.ShopSeriesCommissionStats{
AllocationID: uint(allocationID),
PeriodType: "monthly",
PeriodStart: periodStart,
PeriodEnd: periodEnd,
TotalSalesCount: totalCount,
TotalSalesAmount: totalAmount,
Status: "active",
LastUpdatedAt: time.Now(),
}
return tx.Create(&stats).Error
}
if err != nil {
return err
}
return tx.Model(&stats).
Where("version = ?", stats.Version).
Updates(map[string]interface{}{
"total_sales_count": totalCount,
"total_sales_amount": totalAmount,
"last_updated_at": time.Now(),
"version": gorm.Expr("version + 1"),
}).Error
})
}
func parsePeriod(period string) (time.Time, time.Time) {
t, _ := time.Parse("2006-01", period)
periodStart := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
periodEnd := periodStart.AddDate(0, 1, 0).Add(-time.Second)
return periodStart, periodEnd
}