feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
## 变更概述 将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。 ## 数据库变更 - 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record - 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record - 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包 - 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备 ## 代码变更 - Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型 - Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存 - Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务 - Bootstrap 层:更新 Store 和 Service 依赖注入 - 常量层:按钱包类型重新组织常量和 Redis Key 生成函数 ## 技术特性 - 乐观锁:使用 version 字段防止并发冲突 - 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤 - 事务管理:所有余额变动使用事务保证 ACID - 缓存策略:Cache-Aside 模式,余额变动后删除缓存 ## 业务影响 - 代理钱包和卡钱包业务完全隔离,互不影响 - 为独立监控、优化、扩展打下基础 - 提升代理钱包的稳定性和独立性 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -19,18 +19,18 @@ import (
|
||||
// PackageActivationHandler 套餐激活检查处理器
|
||||
// 任务 19: 处理主套餐过期、加油包级联失效、待生效主套餐激活
|
||||
type PackageActivationHandler struct {
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
queueClient *asynq.Client
|
||||
packageUsageStore *postgres.PackageUsageStore
|
||||
activationService *packagepkg.ActivationService
|
||||
logger *zap.Logger
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
queueClient *asynq.Client
|
||||
packageUsageStore *postgres.PackageUsageStore
|
||||
activationService *packagepkg.ActivationService
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// PackageActivationPayload 套餐激活任务载荷
|
||||
type PackageActivationPayload struct {
|
||||
PackageUsageID uint `json:"package_usage_id"`
|
||||
CarrierType string `json:"carrier_type"` // "iot_card" 或 "device"
|
||||
CarrierType string `json:"carrier_type"` // "iot_card" 或 "device"
|
||||
CarrierID uint `json:"carrier_id"`
|
||||
ActivationType string `json:"activation_type"` // "queue" 或 "realname"
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
@@ -102,7 +102,7 @@ func (h *PackageActivationHandler) findExpiredMainPackages(ctx context.Context)
|
||||
Where("status = ?", constants.PackageUsageStatusActive).
|
||||
Where("expires_at <= ?", now).
|
||||
Where("master_usage_id IS NULL"). // 主套餐没有 master_usage_id
|
||||
Limit(1000). // 每次最多处理 1000 个,避免长事务
|
||||
Limit(1000). // 每次最多处理 1000 个,避免长事务
|
||||
Find(&packages).Error
|
||||
|
||||
return packages, err
|
||||
@@ -353,9 +353,9 @@ func (h *PackageActivationHandler) HandlePackageFirstActivation(ctx context.Cont
|
||||
// ActivationService 未注入,直接更新状态(备用逻辑)
|
||||
now := time.Now()
|
||||
if err := h.db.Model(&pkg).Updates(map[string]any{
|
||||
"status": constants.PackageUsageStatusActive,
|
||||
"activated_at": now,
|
||||
"pending_realname_activation": false,
|
||||
"status": constants.PackageUsageStatusActive,
|
||||
"activated_at": now,
|
||||
"pending_realname_activation": false,
|
||||
}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -20,13 +20,13 @@ import (
|
||||
// Scheduler 轮询调度器
|
||||
// 负责管理 IoT 卡的定期检查任务(实名、流量、套餐)
|
||||
type Scheduler struct {
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
queueClient *asynq.Client
|
||||
logger *zap.Logger
|
||||
configStore *postgres.PollingConfigStore
|
||||
iotCardStore *postgres.IotCardStore
|
||||
concurrencyStore *postgres.PollingConcurrencyConfigStore
|
||||
db *gorm.DB
|
||||
redis *redis.Client
|
||||
queueClient *asynq.Client
|
||||
logger *zap.Logger
|
||||
configStore *postgres.PollingConfigStore
|
||||
iotCardStore *postgres.IotCardStore
|
||||
concurrencyStore *postgres.PollingConcurrencyConfigStore
|
||||
|
||||
// 任务 19: 套餐激活检查处理器
|
||||
packageActivationHandler *PackageActivationHandler
|
||||
@@ -50,12 +50,12 @@ type Scheduler struct {
|
||||
// InitProgress 初始化进度
|
||||
type InitProgress struct {
|
||||
mu sync.RWMutex
|
||||
TotalCards int64 `json:"total_cards"` // 总卡数
|
||||
LoadedCards int64 `json:"loaded_cards"` // 已加载卡数
|
||||
StartTime time.Time `json:"start_time"` // 开始时间
|
||||
TotalCards int64 `json:"total_cards"` // 总卡数
|
||||
LoadedCards int64 `json:"loaded_cards"` // 已加载卡数
|
||||
StartTime time.Time `json:"start_time"` // 开始时间
|
||||
LastBatchTime time.Time `json:"last_batch_time"` // 最后一批处理时间
|
||||
Status string `json:"status"` // 状态: pending, running, completed, failed
|
||||
ErrorMessage string `json:"error_message"` // 错误信息
|
||||
Status string `json:"status"` // 状态: pending, running, completed, failed
|
||||
ErrorMessage string `json:"error_message"` // 错误信息
|
||||
}
|
||||
|
||||
// SchedulerConfig 调度器配置
|
||||
@@ -74,13 +74,13 @@ type SchedulerConfig struct {
|
||||
// 单 Worker 设计吞吐:50000 张/秒,支持多 Worker 水平扩展
|
||||
func DefaultSchedulerConfig() *SchedulerConfig {
|
||||
return &SchedulerConfig{
|
||||
ScheduleInterval: 1 * time.Second, // 1秒调度一次,提高响应速度
|
||||
InitBatchSize: 100000, // 10万张/批初始化
|
||||
ScheduleInterval: 1 * time.Second, // 1秒调度一次,提高响应速度
|
||||
InitBatchSize: 100000, // 10万张/批初始化
|
||||
InitBatchSleepDuration: 500 * time.Millisecond, // 500ms 间隔,加快初始化
|
||||
ConfigCacheTTL: 5 * time.Minute,
|
||||
CardCacheTTL: 7 * 24 * time.Hour,
|
||||
ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张
|
||||
MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张
|
||||
ScheduleBatchSize: 50000, // 每次取 5 万张,每秒可调度 5 万张
|
||||
MaxManualBatchSize: 1000, // 手动触发每次处理 1000 张
|
||||
}
|
||||
}
|
||||
|
||||
@@ -383,8 +383,8 @@ func (s *Scheduler) enqueueTask(ctx context.Context, taskType, cardID string, is
|
||||
}
|
||||
|
||||
task := asynq.NewTask(taskType, mustMarshal(payload),
|
||||
asynq.MaxRetry(0), // 不重试,失败后重新入队
|
||||
asynq.Timeout(30*time.Second), // 30秒超时
|
||||
asynq.MaxRetry(0), // 不重试,失败后重新入队
|
||||
asynq.Timeout(30*time.Second), // 30秒超时
|
||||
asynq.Queue(constants.QueueDefault),
|
||||
)
|
||||
|
||||
@@ -681,13 +681,13 @@ func (s *Scheduler) cacheCardInfo(ctx context.Context, card *model.IotCard, cach
|
||||
config := DefaultSchedulerConfig()
|
||||
|
||||
data := map[string]interface{}{
|
||||
"id": card.ID,
|
||||
"iccid": card.ICCID,
|
||||
"card_category": card.CardCategory,
|
||||
"real_name_status": card.RealNameStatus,
|
||||
"network_status": card.NetworkStatus,
|
||||
"carrier_id": card.CarrierID,
|
||||
"cached_at": cachedAt.Unix(),
|
||||
"id": card.ID,
|
||||
"iccid": card.ICCID,
|
||||
"card_category": card.CardCategory,
|
||||
"real_name_status": card.RealNameStatus,
|
||||
"network_status": card.NetworkStatus,
|
||||
"carrier_id": card.CarrierID,
|
||||
"cached_at": cachedAt.Unix(),
|
||||
}
|
||||
|
||||
pipe := s.redis.Pipeline()
|
||||
|
||||
Reference in New Issue
Block a user