package polling import ( "context" "time" "github.com/redis/go-redis/v9" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" ) // ConcurrencyService 并发控制服务 type ConcurrencyService struct { store *postgres.PollingConcurrencyConfigStore redis *redis.Client } // NewConcurrencyService 创建并发控制服务实例 func NewConcurrencyService(store *postgres.PollingConcurrencyConfigStore, redis *redis.Client) *ConcurrencyService { return &ConcurrencyService{ store: store, redis: redis, } } // ConcurrencyStatus 并发状态 type ConcurrencyStatus struct { TaskType string `json:"task_type"` TaskTypeName string `json:"task_type_name"` MaxConcurrency int `json:"max_concurrency"` Current int64 `json:"current"` Available int64 `json:"available"` Utilization float64 `json:"utilization"` } // List 获取所有并发控制配置及当前状态 func (s *ConcurrencyService) List(ctx context.Context) ([]*ConcurrencyStatus, error) { configs, err := s.store.List(ctx) if err != nil { return nil, errors.Wrap(errors.CodeInternalError, err, "获取并发配置列表失败") } result := make([]*ConcurrencyStatus, 0, len(configs)) for _, cfg := range configs { status := &ConcurrencyStatus{ TaskType: cfg.TaskType, TaskTypeName: s.getTaskTypeName(cfg.TaskType), MaxConcurrency: cfg.MaxConcurrency, } // 从 Redis 获取当前并发数 currentKey := constants.RedisPollingConcurrencyCurrentKey(cfg.TaskType) current, err := s.redis.Get(ctx, currentKey).Int64() if err != nil && err != redis.Nil { current = 0 } status.Current = current status.Available = int64(cfg.MaxConcurrency) - current if status.Available < 0 { status.Available = 0 } if cfg.MaxConcurrency > 0 { status.Utilization = float64(current) / float64(cfg.MaxConcurrency) * 100 } result = append(result, status) } return result, nil } // GetByTaskType 根据任务类型获取并发配置及状态 func (s *ConcurrencyService) GetByTaskType(ctx context.Context, taskType string) (*ConcurrencyStatus, error) { cfg, err := s.store.GetByTaskType(ctx, taskType) if err != nil { return nil, errors.Wrap(errors.CodeNotFound, err, "并发配置不存在") } status := &ConcurrencyStatus{ TaskType: cfg.TaskType, TaskTypeName: s.getTaskTypeName(cfg.TaskType), MaxConcurrency: cfg.MaxConcurrency, } // 从 Redis 获取当前并发数 currentKey := constants.RedisPollingConcurrencyCurrentKey(cfg.TaskType) current, err := s.redis.Get(ctx, currentKey).Int64() if err != nil && err != redis.Nil { current = 0 } status.Current = current status.Available = int64(cfg.MaxConcurrency) - current if status.Available < 0 { status.Available = 0 } if cfg.MaxConcurrency > 0 { status.Utilization = float64(current) / float64(cfg.MaxConcurrency) * 100 } return status, nil } // UpdateMaxConcurrency 更新最大并发数 func (s *ConcurrencyService) UpdateMaxConcurrency(ctx context.Context, taskType string, maxConcurrency int, updatedBy uint) error { // 验证参数 if maxConcurrency < 1 || maxConcurrency > 1000 { return errors.New(errors.CodeInvalidParam, "并发数必须在 1-1000 之间") } // 验证任务类型存在 _, err := s.store.GetByTaskType(ctx, taskType) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "任务类型不存在") } // 更新数据库 if err := s.store.UpdateMaxConcurrency(ctx, taskType, maxConcurrency, updatedBy); err != nil { return errors.Wrap(errors.CodeInternalError, err, "更新并发配置失败") } // 同步更新 Redis 配置缓存 configKey := constants.RedisPollingConcurrencyConfigKey(taskType) if err := s.redis.Set(ctx, configKey, maxConcurrency, 24*time.Hour).Err(); err != nil { // Redis 更新失败不影响主流程,下次读取会从数据库重新加载 } return nil } // ResetConcurrency 重置并发计数(用于信号量修复) func (s *ConcurrencyService) ResetConcurrency(ctx context.Context, taskType string) error { // 验证任务类型存在 _, err := s.store.GetByTaskType(ctx, taskType) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "任务类型不存在") } // 重置 Redis 当前计数为 0 currentKey := constants.RedisPollingConcurrencyCurrentKey(taskType) if err := s.redis.Set(ctx, currentKey, 0, 24*time.Hour).Err(); err != nil { return errors.Wrap(errors.CodeInternalError, err, "重置并发计数失败") } return nil } // InitFromDB 从数据库初始化 Redis 并发配置 func (s *ConcurrencyService) InitFromDB(ctx context.Context) error { configs, err := s.store.List(ctx) if err != nil { return errors.Wrap(errors.CodeInternalError, err, "获取并发配置失败") } for _, cfg := range configs { configKey := constants.RedisPollingConcurrencyConfigKey(cfg.TaskType) if err := s.redis.Set(ctx, configKey, cfg.MaxConcurrency, 24*time.Hour).Err(); err != nil { // 忽略单个配置同步失败 continue } } return nil } // SyncConfigToRedis 同步单个配置到 Redis func (s *ConcurrencyService) SyncConfigToRedis(ctx context.Context, config *model.PollingConcurrencyConfig) error { configKey := constants.RedisPollingConcurrencyConfigKey(config.TaskType) return s.redis.Set(ctx, configKey, config.MaxConcurrency, 24*time.Hour).Err() } // getTaskTypeName 获取任务类型的中文名称 func (s *ConcurrencyService) getTaskTypeName(taskType string) string { switch taskType { case constants.TaskTypePollingRealname: return "实名检查" case constants.TaskTypePollingCarddata: return "流量检查" case constants.TaskTypePollingPackage: return "套餐检查" default: return taskType } }