package task import ( "context" "strconv" "strings" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" ) type CommissionStatsSyncHandler struct { db *gorm.DB redis *redis.Client statsStore *postgres.ShopSeriesCommissionStatsStore logger *zap.Logger } func NewCommissionStatsSyncHandler( db *gorm.DB, redis *redis.Client, statsStore *postgres.ShopSeriesCommissionStatsStore, logger *zap.Logger, ) *CommissionStatsSyncHandler { return &CommissionStatsSyncHandler{ db: db, redis: redis, statsStore: statsStore, logger: logger, } } func (h *CommissionStatsSyncHandler) HandleCommissionStatsSync(ctx context.Context, task *asynq.Task) error { ctx = pkggorm.SkipDataPermission(ctx) lockKey := constants.RedisCommissionStatsLockKey() locked, err := h.redis.SetNX(ctx, lockKey, "1", 5*time.Minute).Result() if err != nil { h.logger.Error("获取同步锁失败", zap.Error(err)) return err } if !locked { h.logger.Info("同步任务已在执行,跳过本次") return nil } defer h.redis.Del(ctx, lockKey) pattern := "commission:stats:*" var cursor uint64 syncCount := 0 for { keys, nextCursor, err := h.redis.Scan(ctx, cursor, pattern, 100).Result() if err != nil { h.logger.Error("扫描 Redis keys 失败", zap.Error(err)) return err } for _, key := range keys { if err := h.syncStatsFromRedis(ctx, key); err != nil { h.logger.Error("同步统计失败", zap.String("key", key), zap.Error(err), ) continue } syncCount++ } cursor = nextCursor if cursor == 0 { break } } h.logger.Info("统计同步完成", zap.Int("sync_count", syncCount)) return nil } func (h *CommissionStatsSyncHandler) syncStatsFromRedis(ctx context.Context, redisKey string) error { parts := strings.Split(redisKey, ":") if len(parts) != 4 { return nil } allocationID, err := strconv.ParseUint(parts[2], 10, 32) if err != nil { return err } period := parts[3] data, err := h.redis.HGetAll(ctx, redisKey).Result() if err != nil { return err } if len(data) == 0 { return nil } totalCount, _ := strconv.ParseInt(data["total_count"], 10, 64) totalAmount, _ := strconv.ParseInt(data["total_amount"], 10, 64) periodStart, periodEnd := parsePeriod(period) return h.db.Transaction(func(tx *gorm.DB) error { var stats model.ShopSeriesCommissionStats err := tx.Where("allocation_id = ? AND period_start = ? AND period_end = ?", allocationID, periodStart, periodEnd). First(&stats).Error if err == gorm.ErrRecordNotFound { stats = model.ShopSeriesCommissionStats{ AllocationID: uint(allocationID), PeriodType: "monthly", PeriodStart: periodStart, PeriodEnd: periodEnd, TotalSalesCount: totalCount, TotalSalesAmount: totalAmount, Status: "active", LastUpdatedAt: time.Now(), } return tx.Create(&stats).Error } if err != nil { return err } return tx.Model(&stats). Where("version = ?", stats.Version). Updates(map[string]interface{}{ "total_sales_count": totalCount, "total_sales_amount": totalAmount, "last_updated_at": time.Now(), "version": gorm.Expr("version + 1"), }).Error }) } func parsePeriod(period string) (time.Time, time.Time) { t, _ := time.Parse("2006-01", period) periodStart := time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC) periodEnd := periodStart.AddDate(0, 1, 0).Add(-time.Second) return periodStart, periodEnd }