package polling import ( "context" "sync" "time" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/errors" ) // CleanupService 数据清理服务 type CleanupService struct { configStore *postgres.DataCleanupConfigStore logStore *postgres.DataCleanupLogStore logger *zap.Logger mu sync.Mutex // 防止并发清理 isRunning bool } // NewCleanupService 创建数据清理服务实例 func NewCleanupService( configStore *postgres.DataCleanupConfigStore, logStore *postgres.DataCleanupLogStore, logger *zap.Logger, ) *CleanupService { return &CleanupService{ configStore: configStore, logStore: logStore, logger: logger, } } // CreateConfig 创建清理配置 func (s *CleanupService) CreateConfig(ctx context.Context, config *model.DataCleanupConfig) error { if config.TargetTable == "" { return errors.New(errors.CodeInvalidParam, "表名不能为空") } if config.RetentionDays < 7 { return errors.New(errors.CodeInvalidParam, "保留天数不能少于7天") } if config.BatchSize <= 0 { config.BatchSize = 10000 // 默认每批1万条 } config.Enabled = 1 // 默认启用 return s.configStore.Create(ctx, config) } // GetConfig 获取清理配置 func (s *CleanupService) GetConfig(ctx context.Context, id uint) (*model.DataCleanupConfig, error) { config, err := s.configStore.GetByID(ctx, id) if err != nil { return nil, errors.Wrap(errors.CodeNotFound, err, "清理配置不存在") } return config, nil } // ListConfigs 获取所有清理配置 func (s *CleanupService) ListConfigs(ctx context.Context) ([]*model.DataCleanupConfig, error) { return s.configStore.List(ctx) } // UpdateConfig 更新清理配置 func (s *CleanupService) UpdateConfig(ctx context.Context, id uint, updates map[string]any) error { config, err := s.configStore.GetByID(ctx, id) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在") } if retentionDays, ok := updates["retention_days"].(int); ok { if retentionDays < 7 { return errors.New(errors.CodeInvalidParam, "保留天数不能少于7天") } config.RetentionDays = retentionDays } if batchSize, ok := updates["batch_size"].(int); ok { if batchSize > 0 { config.BatchSize = batchSize } } if enabled, ok := updates["enabled"].(int); ok { config.Enabled = int16(enabled) } if desc, ok := updates["description"].(string); ok { config.Description = desc } if updatedBy, ok := updates["updated_by"].(uint); ok { config.UpdatedBy = &updatedBy } return s.configStore.Update(ctx, config) } // DeleteConfig 删除清理配置 func (s *CleanupService) DeleteConfig(ctx context.Context, id uint) error { _, err := s.configStore.GetByID(ctx, id) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在") } return s.configStore.Delete(ctx, id) } // ListLogs 获取清理日志列表 func (s *CleanupService) ListLogs(ctx context.Context, page, pageSize int, tableName string) ([]*model.DataCleanupLog, int64, error) { if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } return s.logStore.List(ctx, page, pageSize, tableName) } // CleanupPreview 清理预览 type CleanupPreview struct { TableName string `json:"table_name"` RetentionDays int `json:"retention_days"` RecordCount int64 `json:"record_count"` Description string `json:"description"` } // Preview 预览待清理数据 func (s *CleanupService) Preview(ctx context.Context) ([]*CleanupPreview, error) { configs, err := s.configStore.ListEnabled(ctx) if err != nil { return nil, err } previews := make([]*CleanupPreview, 0, len(configs)) for _, config := range configs { count, err := s.logStore.CountOldRecords(ctx, config.TargetTable, config.RetentionDays) if err != nil { s.logger.Warn("预览清理数据失败", zap.String("table", config.TargetTable), zap.Error(err)) continue } previews = append(previews, &CleanupPreview{ TableName: config.TargetTable, RetentionDays: config.RetentionDays, RecordCount: count, Description: config.Description, }) } return previews, nil } // CleanupProgress 清理进度 type CleanupProgress struct { IsRunning bool `json:"is_running"` CurrentTable string `json:"current_table,omitempty"` TotalTables int `json:"total_tables"` ProcessedTables int `json:"processed_tables"` TotalDeleted int64 `json:"total_deleted"` StartedAt *time.Time `json:"started_at,omitempty"` LastLog *model.DataCleanupLog `json:"last_log,omitempty"` } // GetProgress 获取清理进度 func (s *CleanupService) GetProgress(ctx context.Context) (*CleanupProgress, error) { s.mu.Lock() isRunning := s.isRunning s.mu.Unlock() // 获取最近的清理日志 logs, _, err := s.logStore.List(ctx, 1, 1, "") if err != nil { return nil, err } progress := &CleanupProgress{ IsRunning: isRunning, } if len(logs) > 0 { progress.LastLog = logs[0] if logs[0].Status == "running" { progress.CurrentTable = logs[0].TargetTable progress.StartedAt = &logs[0].StartedAt } } return progress, nil } // TriggerCleanup 手动触发清理 func (s *CleanupService) TriggerCleanup(ctx context.Context, tableName string, triggeredBy uint) error { s.mu.Lock() if s.isRunning { s.mu.Unlock() return errors.New(errors.CodeInvalidParam, "清理任务正在运行中") } s.isRunning = true s.mu.Unlock() defer func() { s.mu.Lock() s.isRunning = false s.mu.Unlock() }() var configs []*model.DataCleanupConfig var err error if tableName != "" { // 清理指定表 config, err := s.configStore.GetByTableName(ctx, tableName) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在") } configs = []*model.DataCleanupConfig{config} } else { // 清理所有启用的表 configs, err = s.configStore.ListEnabled(ctx) if err != nil { return err } } for _, config := range configs { if err := s.cleanupTable(ctx, config, "manual", &triggeredBy); err != nil { s.logger.Error("清理表失败", zap.String("table", config.TargetTable), zap.Error(err)) // 继续处理其他表 } } return nil } // RunScheduledCleanup 运行定时清理任务 func (s *CleanupService) RunScheduledCleanup(ctx context.Context) error { s.mu.Lock() if s.isRunning { s.mu.Unlock() s.logger.Info("清理任务正在运行中,跳过本次调度") return nil } s.isRunning = true s.mu.Unlock() defer func() { s.mu.Lock() s.isRunning = false s.mu.Unlock() }() configs, err := s.configStore.ListEnabled(ctx) if err != nil { return err } s.logger.Info("开始定时清理任务", zap.Int("config_count", len(configs))) for _, config := range configs { if err := s.cleanupTable(ctx, config, "scheduled", nil); err != nil { s.logger.Error("定时清理表失败", zap.String("table", config.TargetTable), zap.Error(err)) // 继续处理其他表 } } s.logger.Info("定时清理任务完成") return nil } // cleanupTable 清理指定表 func (s *CleanupService) cleanupTable(ctx context.Context, config *model.DataCleanupConfig, cleanupType string, triggeredBy *uint) error { startTime := time.Now() // 创建清理日志 log := &model.DataCleanupLog{ TargetTable: config.TargetTable, CleanupType: cleanupType, RetentionDays: config.RetentionDays, Status: "running", StartedAt: startTime, TriggeredBy: triggeredBy, } if err := s.logStore.Create(ctx, log); err != nil { return err } var totalDeleted int64 var lastErr error // 分批删除 cleanupLoop: for { deleted, err := s.logStore.DeleteOldRecords(ctx, config.TargetTable, config.RetentionDays, config.BatchSize) if err != nil { lastErr = err break } totalDeleted += deleted s.logger.Debug("清理进度", zap.String("table", config.TargetTable), zap.Int64("batch_deleted", deleted), zap.Int64("total_deleted", totalDeleted)) if deleted < int64(config.BatchSize) { // 没有更多数据需要删除 break } // 检查 context 是否已取消 select { case <-ctx.Done(): lastErr = ctx.Err() break cleanupLoop default: } } // 更新清理日志 endTime := time.Now() log.CompletedAt = &endTime log.DeletedCount = totalDeleted log.DurationMs = endTime.Sub(startTime).Milliseconds() if lastErr != nil { log.Status = "failed" log.ErrorMessage = lastErr.Error() } else { log.Status = "success" } if err := s.logStore.Update(ctx, log); err != nil { s.logger.Error("更新清理日志失败", zap.Error(err)) } s.logger.Info("清理表完成", zap.String("table", config.TargetTable), zap.Int64("deleted_count", totalDeleted), zap.Int64("duration_ms", log.DurationMs), zap.String("status", log.Status)) return lastErr }