package task import ( "context" "time" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" ) type CommissionStatsArchiveHandler struct { db *gorm.DB redis *redis.Client statsStore *postgres.ShopSeriesCommissionStatsStore logger *zap.Logger } func NewCommissionStatsArchiveHandler( db *gorm.DB, redis *redis.Client, statsStore *postgres.ShopSeriesCommissionStatsStore, logger *zap.Logger, ) *CommissionStatsArchiveHandler { return &CommissionStatsArchiveHandler{ db: db, redis: redis, statsStore: statsStore, logger: logger, } } func (h *CommissionStatsArchiveHandler) HandleCommissionStatsArchive(ctx context.Context, task *asynq.Task) error { now := time.Now() lastMonthStart := now.AddDate(0, -1, 0) lastMonthStart = time.Date(lastMonthStart.Year(), lastMonthStart.Month(), 1, 0, 0, 0, 0, time.UTC) lastMonthEnd := now.AddDate(0, 0, -now.Day()).Add(24*time.Hour - time.Second) var stats []model.ShopSeriesCommissionStats err := h.db.Where("period_start >= ? AND period_end <= ? AND status = ?", lastMonthStart, lastMonthEnd, model.StatsStatusActive). Find(&stats).Error if err != nil { h.logger.Error("查询需要归档的统计记录失败", zap.Error(err)) return err } if len(stats) == 0 { h.logger.Info("没有需要归档的统计记录") return nil } archivedCount := 0 for _, stat := range stats { period := stat.PeriodStart.Format("2006-01") redisKey := constants.RedisCommissionStatsKey(stat.AllocationID, period) if err := h.archiveStats(ctx, &stat, redisKey); err != nil { h.logger.Error("归档统计失败", zap.Uint("allocation_id", stat.AllocationID), zap.String("period", period), zap.Error(err), ) continue } archivedCount++ } h.logger.Info("统计归档完成", zap.Int("total", len(stats)), zap.Int("archived", archivedCount), ) return nil } func (h *CommissionStatsArchiveHandler) archiveStats(ctx context.Context, stats *model.ShopSeriesCommissionStats, redisKey string) error { return h.db.Transaction(func(tx *gorm.DB) error { exists, err := h.redis.Exists(ctx, redisKey).Result() if err != nil { return err } if exists > 0 { data, err := h.redis.HGetAll(ctx, redisKey).Result() if err != nil { return err } if len(data) > 0 { if err := h.redis.Del(ctx, redisKey).Err(); err != nil { return err } } } err = tx.Model(stats). Where("id = ? AND version = ?", stats.ID, stats.Version). Updates(map[string]interface{}{ "status": model.StatsStatusCompleted, "last_updated_at": time.Now(), "version": gorm.Expr("version + 1"), }).Error if err != nil { return err } return nil }) }