Files
junhong_cmp_fiber/internal/service/package/reset_service.go
huang 18daeae65a
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
## 变更概述
将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。

## 数据库变更
- 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record
- 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record
- 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包
- 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备

## 代码变更
- Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型
- Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存
- Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务
- Bootstrap 层:更新 Store 和 Service 依赖注入
- 常量层:按钱包类型重新组织常量和 Redis Key 生成函数

## 技术特性
- 乐观锁:使用 version 字段防止并发冲突
- 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤
- 事务管理:所有余额变动使用事务保证 ACID
- 缓存策略:Cache-Aside 模式,余额变动后删除缓存

## 业务影响
- 代理钱包和卡钱包业务完全隔离,互不影响
- 为独立监控、优化、扩展打下基础
- 提升代理钱包的稳定性和独立性

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-25 09:51:00 +08:00

224 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package packagepkg
import (
"context"
"time"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
)
type ResetService struct {
db *gorm.DB
redis *redis.Client
packageUsageStore *postgres.PackageUsageStore
logger *zap.Logger
}
func NewResetService(
db *gorm.DB,
redis *redis.Client,
packageUsageStore *postgres.PackageUsageStore,
logger *zap.Logger,
) *ResetService {
return &ResetService{
db: db,
redis: redis,
packageUsageStore: packageUsageStore,
logger: logger,
}
}
// ResetDailyUsage 任务 11.2-11.3: 重置日流量
func (s *ResetService) ResetDailyUsage(ctx context.Context) error {
return s.resetDailyUsageWithDB(ctx, s.db)
}
// resetDailyUsageWithDB 内部方法,支持传入 DB/TX
func (s *ResetService) resetDailyUsageWithDB(ctx context.Context, db *gorm.DB) error {
now := time.Now()
return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 查询需要重置的套餐
var packages []*model.PackageUsage
err := tx.Where("data_reset_cycle = ?", constants.PackageDataResetDaily).
Where("next_reset_at <= ?", now).
Where("status IN ?", []int{constants.PackageUsageStatusActive, constants.PackageUsageStatusDepleted}).
Find(&packages).Error
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "查询待重置套餐失败")
}
if len(packages) == 0 {
s.logger.Info("没有需要重置的日流量套餐")
return nil
}
// 批量重置
packageIDs := make([]uint, len(packages))
for i, pkg := range packages {
packageIDs[i] = pkg.ID
}
// 计算下次重置时间(明天 00:00:00
nextReset := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
// 批量更新
updates := map[string]interface{}{
"data_usage_mb": 0,
"last_reset_at": now,
"next_reset_at": nextReset,
"status": constants.PackageUsageStatusActive, // 重置后恢复为生效中
}
if err := tx.Model(&model.PackageUsage{}).
Where("id IN ?", packageIDs).
Updates(updates).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "批量重置日流量失败")
}
s.logger.Info("日流量重置完成",
zap.Int("count", len(packages)),
zap.Time("next_reset_at", nextReset))
return nil
})
}
// ResetMonthlyUsage 任务 11.4-11.5: 重置月流量
func (s *ResetService) ResetMonthlyUsage(ctx context.Context) error {
return s.resetMonthlyUsageWithDB(ctx, s.db)
}
// resetMonthlyUsageWithDB 内部方法,支持传入 DB/TX
func (s *ResetService) resetMonthlyUsageWithDB(ctx context.Context, db *gorm.DB) error {
now := time.Now()
return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 查询需要重置的套餐
var packages []*model.PackageUsage
err := tx.Where("data_reset_cycle = ?", constants.PackageDataResetMonthly).
Where("next_reset_at <= ?", now).
Where("status IN ?", []int{constants.PackageUsageStatusActive, constants.PackageUsageStatusDepleted}).
Find(&packages).Error
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "查询待重置套餐失败")
}
if len(packages) == 0 {
s.logger.Info("没有需要重置的月流量套餐")
return nil
}
// 按套餐分组处理(根据套餐周期类型计算下次重置时间)
for _, usage := range packages {
// 查询套餐信息,获取 calendar_type
var pkg model.Package
if err := tx.First(&pkg, usage.PackageID).Error; err != nil {
s.logger.Error("查询套餐信息失败",
zap.Uint("usage_id", usage.ID),
zap.Uint("package_id", usage.PackageID),
zap.Error(err))
continue
}
// 计算下次重置时间(基于套餐周期类型)
// 自然月套餐每月1号重置
// 按天套餐每30天重置
activatedAt := usage.ActivatedAt
if activatedAt.IsZero() {
activatedAt = now // 兜底处理
}
nextResetAt := CalculateNextResetTime(constants.PackageDataResetMonthly, pkg.CalendarType, now, activatedAt)
if nextResetAt == nil {
s.logger.Warn("计算下次重置时间失败",
zap.Uint("usage_id", usage.ID))
continue
}
// 更新套餐
updates := map[string]interface{}{
"data_usage_mb": 0,
"last_reset_at": now,
"next_reset_at": *nextResetAt,
"status": constants.PackageUsageStatusActive, // 重置后恢复为生效中
}
if err := tx.Model(usage).Updates(updates).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "重置月流量失败")
}
s.logger.Info("月流量已重置",
zap.Uint("usage_id", usage.ID),
zap.String("calendar_type", pkg.CalendarType),
zap.Time("next_reset_at", *nextResetAt))
}
return nil
})
}
// ResetYearlyUsage 任务 11.6-11.7: 重置年流量
func (s *ResetService) ResetYearlyUsage(ctx context.Context) error {
return s.resetYearlyUsageWithDB(ctx, s.db)
}
// resetYearlyUsageWithDB 内部方法,支持传入 DB/TX
func (s *ResetService) resetYearlyUsageWithDB(ctx context.Context, db *gorm.DB) error {
now := time.Now()
return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 查询需要重置的套餐
var packages []*model.PackageUsage
err := tx.Where("data_reset_cycle = ?", constants.PackageDataResetYearly).
Where("next_reset_at <= ?", now).
Where("status IN ?", []int{constants.PackageUsageStatusActive, constants.PackageUsageStatusDepleted}).
Find(&packages).Error
if err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "查询待重置套餐失败")
}
if len(packages) == 0 {
s.logger.Info("没有需要重置的年流量套餐")
return nil
}
// 批量重置
packageIDs := make([]uint, len(packages))
for i, pkg := range packages {
packageIDs[i] = pkg.ID
}
// 计算下次重置时间(明年 1月1日 00:00:00
nextReset := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, now.Location())
// 批量更新
updates := map[string]interface{}{
"data_usage_mb": 0,
"last_reset_at": now,
"next_reset_at": nextReset,
"status": constants.PackageUsageStatusActive, // 重置后恢复为生效中
}
if err := tx.Model(&model.PackageUsage{}).
Where("id IN ?", packageIDs).
Updates(updates).Error; err != nil {
return errors.Wrap(errors.CodeDatabaseError, err, "批量重置年流量失败")
}
s.logger.Info("年流量重置完成",
zap.Int("count", len(packages)),
zap.Time("next_reset_at", nextReset))
return nil
})
}