package task import ( "context" "time" "github.com/bytedance/sonic" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" pkggorm "github.com/break/junhong_cmp_fiber/pkg/gorm" ) type CommissionStatsUpdatePayload struct { AllocationID uint `json:"allocation_id"` SalesCount int64 `json:"sales_count"` SalesAmount int64 `json:"sales_amount"` } type CommissionStatsUpdateHandler struct { redis *redis.Client statsStore *postgres.ShopSeriesCommissionStatsStore allocationStore *postgres.ShopPackageAllocationStore logger *zap.Logger } func NewCommissionStatsUpdateHandler( redis *redis.Client, statsStore *postgres.ShopSeriesCommissionStatsStore, allocationStore *postgres.ShopPackageAllocationStore, logger *zap.Logger, ) *CommissionStatsUpdateHandler { return &CommissionStatsUpdateHandler{ redis: redis, statsStore: statsStore, allocationStore: allocationStore, logger: logger, } } func (h *CommissionStatsUpdateHandler) HandleCommissionStatsUpdate(ctx context.Context, task *asynq.Task) error { ctx = pkggorm.SkipDataPermission(ctx) var payload CommissionStatsUpdatePayload if err := sonic.Unmarshal(task.Payload(), &payload); err != nil { h.logger.Error("解析统计更新任务载荷失败", zap.Error(err), zap.String("task_id", task.ResultWriter().TaskID()), ) return asynq.SkipRetry } now := time.Now() period := getCurrentPeriod(now) redisKey := constants.RedisCommissionStatsKey(payload.AllocationID, period) pipe := h.redis.Pipeline() pipe.HIncrBy(ctx, redisKey, "total_count", payload.SalesCount) pipe.HIncrBy(ctx, redisKey, "total_amount", payload.SalesAmount) periodEnd := getPeriodEnd(now) expireAt := periodEnd.AddDate(0, 0, 7) pipe.ExpireAt(ctx, redisKey, expireAt) if _, err := pipe.Exec(ctx); err != nil { h.logger.Error("更新 Redis 统计失败", zap.Uint("allocation_id", payload.AllocationID), zap.String("period", period), zap.Error(err), ) return err } h.logger.Info("统计更新成功", zap.Uint("allocation_id", payload.AllocationID), zap.String("period", period), zap.Int64("sales_count", payload.SalesCount), zap.Int64("sales_amount", payload.SalesAmount), ) return nil } func getCurrentPeriod(t time.Time) string { return t.Format("2006-01") } func getPeriodEnd(t time.Time) time.Time { year, month, _ := t.Date() nextMonth := time.Date(year, month+1, 1, 0, 0, 0, 0, t.Location()) return nextMonth.Add(-time.Second) }