Files
junhong_cmp_fiber/internal/service/polling/cleanup_service.go
huang 18daeae65a
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
## 变更概述
将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。

## 数据库变更
- 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record
- 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record
- 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包
- 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备

## 代码变更
- Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型
- Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存
- Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务
- Bootstrap 层:更新 Store 和 Service 依赖注入
- 常量层:按钱包类型重新组织常量和 Redis Key 生成函数

## 技术特性
- 乐观锁:使用 version 字段防止并发冲突
- 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤
- 事务管理:所有余额变动使用事务保证 ACID
- 缓存策略:Cache-Aside 模式,余额变动后删除缓存

## 业务影响
- 代理钱包和卡钱包业务完全隔离,互不影响
- 为独立监控、优化、扩展打下基础
- 提升代理钱包的稳定性和独立性

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-25 09:51:00 +08:00

348 lines
8.8 KiB
Go

package polling
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/errors"
)
// CleanupService 数据清理服务
type CleanupService struct {
configStore *postgres.DataCleanupConfigStore
logStore *postgres.DataCleanupLogStore
logger *zap.Logger
mu sync.Mutex // 防止并发清理
isRunning bool
}
// NewCleanupService 创建数据清理服务实例
func NewCleanupService(
configStore *postgres.DataCleanupConfigStore,
logStore *postgres.DataCleanupLogStore,
logger *zap.Logger,
) *CleanupService {
return &CleanupService{
configStore: configStore,
logStore: logStore,
logger: logger,
}
}
// CreateConfig 创建清理配置
func (s *CleanupService) CreateConfig(ctx context.Context, config *model.DataCleanupConfig) error {
if config.TargetTable == "" {
return errors.New(errors.CodeInvalidParam, "表名不能为空")
}
if config.RetentionDays < 7 {
return errors.New(errors.CodeInvalidParam, "保留天数不能少于7天")
}
if config.BatchSize <= 0 {
config.BatchSize = 10000 // 默认每批1万条
}
config.Enabled = 1 // 默认启用
return s.configStore.Create(ctx, config)
}
// GetConfig 获取清理配置
func (s *CleanupService) GetConfig(ctx context.Context, id uint) (*model.DataCleanupConfig, error) {
config, err := s.configStore.GetByID(ctx, id)
if err != nil {
return nil, errors.Wrap(errors.CodeNotFound, err, "清理配置不存在")
}
return config, nil
}
// ListConfigs 获取所有清理配置
func (s *CleanupService) ListConfigs(ctx context.Context) ([]*model.DataCleanupConfig, error) {
return s.configStore.List(ctx)
}
// UpdateConfig 更新清理配置
func (s *CleanupService) UpdateConfig(ctx context.Context, id uint, updates map[string]any) error {
config, err := s.configStore.GetByID(ctx, id)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在")
}
if retentionDays, ok := updates["retention_days"].(int); ok {
if retentionDays < 7 {
return errors.New(errors.CodeInvalidParam, "保留天数不能少于7天")
}
config.RetentionDays = retentionDays
}
if batchSize, ok := updates["batch_size"].(int); ok {
if batchSize > 0 {
config.BatchSize = batchSize
}
}
if enabled, ok := updates["enabled"].(int); ok {
config.Enabled = int16(enabled)
}
if desc, ok := updates["description"].(string); ok {
config.Description = desc
}
if updatedBy, ok := updates["updated_by"].(uint); ok {
config.UpdatedBy = &updatedBy
}
return s.configStore.Update(ctx, config)
}
// DeleteConfig 删除清理配置
func (s *CleanupService) DeleteConfig(ctx context.Context, id uint) error {
_, err := s.configStore.GetByID(ctx, id)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在")
}
return s.configStore.Delete(ctx, id)
}
// ListLogs 获取清理日志列表
func (s *CleanupService) ListLogs(ctx context.Context, page, pageSize int, tableName string) ([]*model.DataCleanupLog, int64, error) {
if page < 1 {
page = 1
}
if pageSize < 1 || pageSize > 100 {
pageSize = 20
}
return s.logStore.List(ctx, page, pageSize, tableName)
}
// CleanupPreview 清理预览
type CleanupPreview struct {
TableName string `json:"table_name"`
RetentionDays int `json:"retention_days"`
RecordCount int64 `json:"record_count"`
Description string `json:"description"`
}
// Preview 预览待清理数据
func (s *CleanupService) Preview(ctx context.Context) ([]*CleanupPreview, error) {
configs, err := s.configStore.ListEnabled(ctx)
if err != nil {
return nil, err
}
previews := make([]*CleanupPreview, 0, len(configs))
for _, config := range configs {
count, err := s.logStore.CountOldRecords(ctx, config.TargetTable, config.RetentionDays)
if err != nil {
s.logger.Warn("预览清理数据失败",
zap.String("table", config.TargetTable),
zap.Error(err))
continue
}
previews = append(previews, &CleanupPreview{
TableName: config.TargetTable,
RetentionDays: config.RetentionDays,
RecordCount: count,
Description: config.Description,
})
}
return previews, nil
}
// CleanupProgress 清理进度
type CleanupProgress struct {
IsRunning bool `json:"is_running"`
CurrentTable string `json:"current_table,omitempty"`
TotalTables int `json:"total_tables"`
ProcessedTables int `json:"processed_tables"`
TotalDeleted int64 `json:"total_deleted"`
StartedAt *time.Time `json:"started_at,omitempty"`
LastLog *model.DataCleanupLog `json:"last_log,omitempty"`
}
// GetProgress 获取清理进度
func (s *CleanupService) GetProgress(ctx context.Context) (*CleanupProgress, error) {
s.mu.Lock()
isRunning := s.isRunning
s.mu.Unlock()
// 获取最近的清理日志
logs, _, err := s.logStore.List(ctx, 1, 1, "")
if err != nil {
return nil, err
}
progress := &CleanupProgress{
IsRunning: isRunning,
}
if len(logs) > 0 {
progress.LastLog = logs[0]
if logs[0].Status == "running" {
progress.CurrentTable = logs[0].TargetTable
progress.StartedAt = &logs[0].StartedAt
}
}
return progress, nil
}
// TriggerCleanup 手动触发清理
func (s *CleanupService) TriggerCleanup(ctx context.Context, tableName string, triggeredBy uint) error {
s.mu.Lock()
if s.isRunning {
s.mu.Unlock()
return errors.New(errors.CodeInvalidParam, "清理任务正在运行中")
}
s.isRunning = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.isRunning = false
s.mu.Unlock()
}()
var configs []*model.DataCleanupConfig
var err error
if tableName != "" {
// 清理指定表
config, err := s.configStore.GetByTableName(ctx, tableName)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "清理配置不存在")
}
configs = []*model.DataCleanupConfig{config}
} else {
// 清理所有启用的表
configs, err = s.configStore.ListEnabled(ctx)
if err != nil {
return err
}
}
for _, config := range configs {
if err := s.cleanupTable(ctx, config, "manual", &triggeredBy); err != nil {
s.logger.Error("清理表失败",
zap.String("table", config.TargetTable),
zap.Error(err))
// 继续处理其他表
}
}
return nil
}
// RunScheduledCleanup 运行定时清理任务
func (s *CleanupService) RunScheduledCleanup(ctx context.Context) error {
s.mu.Lock()
if s.isRunning {
s.mu.Unlock()
s.logger.Info("清理任务正在运行中,跳过本次调度")
return nil
}
s.isRunning = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.isRunning = false
s.mu.Unlock()
}()
configs, err := s.configStore.ListEnabled(ctx)
if err != nil {
return err
}
s.logger.Info("开始定时清理任务", zap.Int("config_count", len(configs)))
for _, config := range configs {
if err := s.cleanupTable(ctx, config, "scheduled", nil); err != nil {
s.logger.Error("定时清理表失败",
zap.String("table", config.TargetTable),
zap.Error(err))
// 继续处理其他表
}
}
s.logger.Info("定时清理任务完成")
return nil
}
// cleanupTable 清理指定表
func (s *CleanupService) cleanupTable(ctx context.Context, config *model.DataCleanupConfig, cleanupType string, triggeredBy *uint) error {
startTime := time.Now()
// 创建清理日志
log := &model.DataCleanupLog{
TargetTable: config.TargetTable,
CleanupType: cleanupType,
RetentionDays: config.RetentionDays,
Status: "running",
StartedAt: startTime,
TriggeredBy: triggeredBy,
}
if err := s.logStore.Create(ctx, log); err != nil {
return err
}
var totalDeleted int64
var lastErr error
// 分批删除
cleanupLoop:
for {
deleted, err := s.logStore.DeleteOldRecords(ctx, config.TargetTable, config.RetentionDays, config.BatchSize)
if err != nil {
lastErr = err
break
}
totalDeleted += deleted
s.logger.Debug("清理进度",
zap.String("table", config.TargetTable),
zap.Int64("batch_deleted", deleted),
zap.Int64("total_deleted", totalDeleted))
if deleted < int64(config.BatchSize) {
// 没有更多数据需要删除
break
}
// 检查 context 是否已取消
select {
case <-ctx.Done():
lastErr = ctx.Err()
break cleanupLoop
default:
}
}
// 更新清理日志
endTime := time.Now()
log.CompletedAt = &endTime
log.DeletedCount = totalDeleted
log.DurationMs = endTime.Sub(startTime).Milliseconds()
if lastErr != nil {
log.Status = "failed"
log.ErrorMessage = lastErr.Error()
} else {
log.Status = "success"
}
if err := s.logStore.Update(ctx, log); err != nil {
s.logger.Error("更新清理日志失败", zap.Error(err))
}
s.logger.Info("清理表完成",
zap.String("table", config.TargetTable),
zap.Int64("deleted_count", totalDeleted),
zap.Int64("duration_ms", log.DurationMs),
zap.String("status", log.Status))
return lastErr
}