Files
junhong_cmp_fiber/internal/service/polling/concurrency_service.go
huang 18daeae65a
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m17s
feat: 钱包系统分离 - 代理钱包与卡钱包完全隔离
## 变更概述
将统一钱包系统拆分为代理钱包和卡钱包两个独立系统,实现数据表和代码层面的完全隔离。

## 数据库变更
- 新增 6 张表:tb_agent_wallet、tb_agent_wallet_transaction、tb_agent_recharge_record、tb_card_wallet、tb_card_wallet_transaction、tb_card_recharge_record
- 删除 3 张旧表:tb_wallet、tb_wallet_transaction、tb_recharge_record
- 代理钱包:按 (shop_id, wallet_type) 唯一标识,支持主钱包和分佣钱包
- 卡钱包:按 (resource_type, resource_id) 唯一标识,支持物联网卡和设备

## 代码变更
- Model 层:新增 AgentWallet、AgentWalletTransaction、AgentRechargeRecord、CardWallet、CardWalletTransaction、CardRechargeRecord 模型
- Store 层:新增 6 个独立 Store,支持事务、乐观锁、Redis 缓存
- Service 层:重构 commission_calculation、commission_withdrawal、order、recharge 等 8 个服务
- Bootstrap 层:更新 Store 和 Service 依赖注入
- 常量层:按钱包类型重新组织常量和 Redis Key 生成函数

## 技术特性
- 乐观锁:使用 version 字段防止并发冲突
- 多租户:支持 shop_id_tag 和 enterprise_id_tag 过滤
- 事务管理:所有余额变动使用事务保证 ACID
- 缓存策略:Cache-Aside 模式,余额变动后删除缓存

## 业务影响
- 代理钱包和卡钱包业务完全隔离,互不影响
- 为独立监控、优化、扩展打下基础
- 提升代理钱包的稳定性和独立性

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-25 09:51:00 +08:00

189 lines
5.7 KiB
Go

package polling
import (
"context"
"time"
"github.com/redis/go-redis/v9"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
)
// ConcurrencyService 并发控制服务
type ConcurrencyService struct {
store *postgres.PollingConcurrencyConfigStore
redis *redis.Client
}
// NewConcurrencyService 创建并发控制服务实例
func NewConcurrencyService(store *postgres.PollingConcurrencyConfigStore, redis *redis.Client) *ConcurrencyService {
return &ConcurrencyService{
store: store,
redis: redis,
}
}
// ConcurrencyStatus 并发状态
type ConcurrencyStatus struct {
TaskType string `json:"task_type"`
TaskTypeName string `json:"task_type_name"`
MaxConcurrency int `json:"max_concurrency"`
Current int64 `json:"current"`
Available int64 `json:"available"`
Utilization float64 `json:"utilization"`
}
// List 获取所有并发控制配置及当前状态
func (s *ConcurrencyService) List(ctx context.Context) ([]*ConcurrencyStatus, error) {
configs, err := s.store.List(ctx)
if err != nil {
return nil, errors.Wrap(errors.CodeInternalError, err, "获取并发配置列表失败")
}
result := make([]*ConcurrencyStatus, 0, len(configs))
for _, cfg := range configs {
status := &ConcurrencyStatus{
TaskType: cfg.TaskType,
TaskTypeName: s.getTaskTypeName(cfg.TaskType),
MaxConcurrency: cfg.MaxConcurrency,
}
// 从 Redis 获取当前并发数
currentKey := constants.RedisPollingConcurrencyCurrentKey(cfg.TaskType)
current, err := s.redis.Get(ctx, currentKey).Int64()
if err != nil && err != redis.Nil {
current = 0
}
status.Current = current
status.Available = int64(cfg.MaxConcurrency) - current
if status.Available < 0 {
status.Available = 0
}
if cfg.MaxConcurrency > 0 {
status.Utilization = float64(current) / float64(cfg.MaxConcurrency) * 100
}
result = append(result, status)
}
return result, nil
}
// GetByTaskType 根据任务类型获取并发配置及状态
func (s *ConcurrencyService) GetByTaskType(ctx context.Context, taskType string) (*ConcurrencyStatus, error) {
cfg, err := s.store.GetByTaskType(ctx, taskType)
if err != nil {
return nil, errors.Wrap(errors.CodeNotFound, err, "并发配置不存在")
}
status := &ConcurrencyStatus{
TaskType: cfg.TaskType,
TaskTypeName: s.getTaskTypeName(cfg.TaskType),
MaxConcurrency: cfg.MaxConcurrency,
}
// 从 Redis 获取当前并发数
currentKey := constants.RedisPollingConcurrencyCurrentKey(cfg.TaskType)
current, err := s.redis.Get(ctx, currentKey).Int64()
if err != nil && err != redis.Nil {
current = 0
}
status.Current = current
status.Available = int64(cfg.MaxConcurrency) - current
if status.Available < 0 {
status.Available = 0
}
if cfg.MaxConcurrency > 0 {
status.Utilization = float64(current) / float64(cfg.MaxConcurrency) * 100
}
return status, nil
}
// UpdateMaxConcurrency 更新最大并发数
func (s *ConcurrencyService) UpdateMaxConcurrency(ctx context.Context, taskType string, maxConcurrency int, updatedBy uint) error {
// 验证参数
if maxConcurrency < 1 || maxConcurrency > 1000 {
return errors.New(errors.CodeInvalidParam, "并发数必须在 1-1000 之间")
}
// 验证任务类型存在
_, err := s.store.GetByTaskType(ctx, taskType)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "任务类型不存在")
}
// 更新数据库
if err := s.store.UpdateMaxConcurrency(ctx, taskType, maxConcurrency, updatedBy); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "更新并发配置失败")
}
// 同步更新 Redis 配置缓存
configKey := constants.RedisPollingConcurrencyConfigKey(taskType)
if err := s.redis.Set(ctx, configKey, maxConcurrency, 24*time.Hour).Err(); err != nil {
// Redis 更新失败不影响主流程,下次读取会从数据库重新加载
}
return nil
}
// ResetConcurrency 重置并发计数(用于信号量修复)
func (s *ConcurrencyService) ResetConcurrency(ctx context.Context, taskType string) error {
// 验证任务类型存在
_, err := s.store.GetByTaskType(ctx, taskType)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "任务类型不存在")
}
// 重置 Redis 当前计数为 0
currentKey := constants.RedisPollingConcurrencyCurrentKey(taskType)
if err := s.redis.Set(ctx, currentKey, 0, 24*time.Hour).Err(); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "重置并发计数失败")
}
return nil
}
// InitFromDB 从数据库初始化 Redis 并发配置
func (s *ConcurrencyService) InitFromDB(ctx context.Context) error {
configs, err := s.store.List(ctx)
if err != nil {
return errors.Wrap(errors.CodeInternalError, err, "获取并发配置失败")
}
for _, cfg := range configs {
configKey := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType)
if err := s.redis.Set(ctx, configKey, cfg.MaxConcurrency, 24*time.Hour).Err(); err != nil {
// 忽略单个配置同步失败
continue
}
}
return nil
}
// SyncConfigToRedis 同步单个配置到 Redis
func (s *ConcurrencyService) SyncConfigToRedis(ctx context.Context, config *model.PollingConcurrencyConfig) error {
configKey := constants.RedisPollingConcurrencyConfigKey(config.TaskType)
return s.redis.Set(ctx, configKey, config.MaxConcurrency, 24*time.Hour).Err()
}
// getTaskTypeName 获取任务类型的中文名称
func (s *ConcurrencyService) getTaskTypeName(taskType string) string {
switch taskType {
case constants.TaskTypePollingRealname:
return "实名检查"
case constants.TaskTypePollingCarddata:
return "流量检查"
case constants.TaskTypePollingPackage:
return "套餐检查"
default:
return taskType
}
}