All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m2s
- 移除 RegisterDataPermissionCallback 和 SkipDataPermission 机制 - 在 Auth 中间件预计算 SubordinateShopIDs 并注入 Context - 新增 ApplyShopFilter/ApplyEnterpriseFilter/ApplyOwnerShopFilter 等 Helper 函数 - 所有 Store 层查询方法显式调用数据权限过滤函数 - 权限检查函数 CanManageShop/CanManageEnterprise 改为从 Context 获取数据 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
153 lines
3.6 KiB
Go
153 lines
3.6 KiB
Go
package task
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/break/junhong_cmp_fiber/internal/model"
|
|
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
|
|
"github.com/break/junhong_cmp_fiber/pkg/constants"
|
|
)
|
|
|
|
type CommissionStatsSyncHandler struct {
|
|
db *gorm.DB
|
|
redis *redis.Client
|
|
statsStore *postgres.ShopSeriesCommissionStatsStore
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewCommissionStatsSyncHandler(
|
|
db *gorm.DB,
|
|
redis *redis.Client,
|
|
statsStore *postgres.ShopSeriesCommissionStatsStore,
|
|
logger *zap.Logger,
|
|
) *CommissionStatsSyncHandler {
|
|
return &CommissionStatsSyncHandler{
|
|
db: db,
|
|
redis: redis,
|
|
statsStore: statsStore,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (h *CommissionStatsSyncHandler) HandleCommissionStatsSync(ctx context.Context, task *asynq.Task) error {
|
|
lockKey := constants.RedisCommissionStatsLockKey()
|
|
locked, err := h.redis.SetNX(ctx, lockKey, "1", 5*time.Minute).Result()
|
|
if err != nil {
|
|
h.logger.Error("获取同步锁失败", zap.Error(err))
|
|
return err
|
|
}
|
|
if !locked {
|
|
h.logger.Info("同步任务已在执行,跳过本次")
|
|
return nil
|
|
}
|
|
defer h.redis.Del(ctx, lockKey)
|
|
|
|
pattern := "commission:stats:*"
|
|
var cursor uint64
|
|
syncCount := 0
|
|
|
|
for {
|
|
keys, nextCursor, err := h.redis.Scan(ctx, cursor, pattern, 100).Result()
|
|
if err != nil {
|
|
h.logger.Error("扫描 Redis keys 失败", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
for _, key := range keys {
|
|
if err := h.syncStatsFromRedis(ctx, key); err != nil {
|
|
h.logger.Error("同步统计失败",
|
|
zap.String("key", key),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
syncCount++
|
|
}
|
|
|
|
cursor = nextCursor
|
|
if cursor == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
h.logger.Info("统计同步完成", zap.Int("sync_count", syncCount))
|
|
return nil
|
|
}
|
|
|
|
func (h *CommissionStatsSyncHandler) syncStatsFromRedis(ctx context.Context, redisKey string) error {
|
|
parts := strings.Split(redisKey, ":")
|
|
if len(parts) != 4 {
|
|
return nil
|
|
}
|
|
|
|
allocationID, err := strconv.ParseUint(parts[2], 10, 32)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
period := parts[3]
|
|
|
|
data, err := h.redis.HGetAll(ctx, redisKey).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return nil
|
|
}
|
|
|
|
totalCount, _ := strconv.ParseInt(data["total_count"], 10, 64)
|
|
totalAmount, _ := strconv.ParseInt(data["total_amount"], 10, 64)
|
|
|
|
periodStart, periodEnd := parsePeriod(period)
|
|
|
|
return h.db.Transaction(func(tx *gorm.DB) error {
|
|
var stats model.ShopSeriesCommissionStats
|
|
err := tx.Where("allocation_id = ? AND period_start = ? AND period_end = ?",
|
|
allocationID, periodStart, periodEnd).
|
|
First(&stats).Error
|
|
|
|
if err == gorm.ErrRecordNotFound {
|
|
stats = model.ShopSeriesCommissionStats{
|
|
AllocationID: uint(allocationID),
|
|
PeriodType: "monthly",
|
|
PeriodStart: periodStart,
|
|
PeriodEnd: periodEnd,
|
|
TotalSalesCount: totalCount,
|
|
TotalSalesAmount: totalAmount,
|
|
Status: "active",
|
|
LastUpdatedAt: time.Now(),
|
|
}
|
|
return tx.Create(&stats).Error
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Model(&stats).
|
|
Where("version = ?", stats.Version).
|
|
Updates(map[string]interface{}{
|
|
"total_sales_count": totalCount,
|
|
"total_sales_amount": totalAmount,
|
|
"last_updated_at": time.Now(),
|
|
"version": gorm.Expr("version + 1"),
|
|
}).Error
|
|
})
|
|
}
|
|
|
|
func parsePeriod(period string) (time.Time, time.Time) {
|
|
t, _ := time.Parse("2006-01", period)
|
|
periodStart := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
|
|
periodEnd := periodStart.AddDate(0, 1, 0).Add(-time.Second)
|
|
return periodStart, periodEnd
|
|
}
|