Files
junhong_cmp_fiber/internal/service/polling/config_service.go
huang 931e140e8e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
feat: 实现 IoT 卡轮询系统(支持千万级卡规模)
实现功能:
- 实名状态检查轮询(可配置间隔)
- 卡流量检查轮询(支持跨月流量追踪)
- 套餐检查与超额自动停机
- 分布式并发控制(Redis 信号量)
- 手动触发轮询(单卡/批量/条件筛选)
- 数据清理配置与执行
- 告警规则与历史记录
- 实时监控统计(队列/性能/并发)

性能优化:
- Redis 缓存卡信息,减少 DB 查询
- Pipeline 批量写入 Redis
- 异步流量记录写入
- 渐进式初始化(10万卡/批)

压测工具(scripts/benchmark/):
- Mock Gateway 模拟上游服务
- 测试卡生成器
- 配置初始化脚本
- 实时监控脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 17:32:44 +08:00

253 lines
7.9 KiB
Go

package polling
import (
"context"
"time"
"gorm.io/gorm"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/model/dto"
"github.com/break/junhong_cmp_fiber/internal/store"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/middleware"
)
// ConfigService 轮询配置服务
type ConfigService struct {
configStore *postgres.PollingConfigStore
}
// NewConfigService 创建轮询配置服务实例
func NewConfigService(configStore *postgres.PollingConfigStore) *ConfigService {
return &ConfigService{configStore: configStore}
}
// Create 创建轮询配置
func (s *ConfigService) Create(ctx context.Context, req *dto.CreatePollingConfigRequest) (*dto.PollingConfigResponse, error) {
currentUserID := middleware.GetUserIDFromContext(ctx)
if currentUserID == 0 {
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
}
// 验证配置名称唯一性
existing, _ := s.configStore.GetByName(ctx, req.ConfigName)
if existing != nil {
return nil, errors.New(errors.CodeInvalidParam, "配置名称已存在")
}
// 验证检查间隔(至少一个不为空)
if req.RealnameCheckInterval == nil && req.CarddataCheckInterval == nil && req.PackageCheckInterval == nil {
return nil, errors.New(errors.CodeInvalidParam, "至少需要配置一种检查间隔")
}
config := &model.PollingConfig{
ConfigName: req.ConfigName,
CardCondition: req.CardCondition,
CardCategory: req.CardCategory,
CarrierID: req.CarrierID,
Priority: req.Priority,
RealnameCheckInterval: req.RealnameCheckInterval,
CarddataCheckInterval: req.CarddataCheckInterval,
PackageCheckInterval: req.PackageCheckInterval,
Status: 1, // 默认启用
Description: req.Description,
CreatedBy: &currentUserID,
UpdatedBy: &currentUserID,
}
if err := s.configStore.Create(ctx, config); err != nil {
return nil, errors.Wrap(errors.CodeInternalError, err, "创建轮询配置失败")
}
return s.toResponse(config), nil
}
// Get 获取轮询配置详情
func (s *ConfigService) Get(ctx context.Context, id uint) (*dto.PollingConfigResponse, error) {
config, err := s.configStore.GetByID(ctx, id)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, errors.New(errors.CodePollingConfigNotFound, "轮询配置不存在")
}
return nil, errors.Wrap(errors.CodeInternalError, err, "获取轮询配置失败")
}
return s.toResponse(config), nil
}
// Update 更新轮询配置
func (s *ConfigService) Update(ctx context.Context, id uint, req *dto.UpdatePollingConfigRequest) (*dto.PollingConfigResponse, error) {
currentUserID := middleware.GetUserIDFromContext(ctx)
if currentUserID == 0 {
return nil, errors.New(errors.CodeUnauthorized, "未授权访问")
}
config, err := s.configStore.GetByID(ctx, id)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, errors.New(errors.CodePollingConfigNotFound, "轮询配置不存在")
}
return nil, errors.Wrap(errors.CodeInternalError, err, "获取轮询配置失败")
}
// 更新字段
if req.ConfigName != nil {
// 检查名称唯一性
existing, _ := s.configStore.GetByName(ctx, *req.ConfigName)
if existing != nil && existing.ID != id {
return nil, errors.New(errors.CodeInvalidParam, "配置名称已存在")
}
config.ConfigName = *req.ConfigName
}
if req.CardCondition != nil {
config.CardCondition = *req.CardCondition
}
if req.CardCategory != nil {
config.CardCategory = *req.CardCategory
}
if req.CarrierID != nil {
config.CarrierID = req.CarrierID
}
if req.Priority != nil {
config.Priority = *req.Priority
}
if req.RealnameCheckInterval != nil {
config.RealnameCheckInterval = req.RealnameCheckInterval
}
if req.CarddataCheckInterval != nil {
config.CarddataCheckInterval = req.CarddataCheckInterval
}
if req.PackageCheckInterval != nil {
config.PackageCheckInterval = req.PackageCheckInterval
}
if req.Description != nil {
config.Description = *req.Description
}
config.UpdatedBy = &currentUserID
if err := s.configStore.Update(ctx, config); err != nil {
return nil, errors.Wrap(errors.CodeInternalError, err, "更新轮询配置失败")
}
return s.toResponse(config), nil
}
// Delete 删除轮询配置
func (s *ConfigService) Delete(ctx context.Context, id uint) error {
_, err := s.configStore.GetByID(ctx, id)
if err != nil {
if err == gorm.ErrRecordNotFound {
return errors.New(errors.CodePollingConfigNotFound, "轮询配置不存在")
}
return errors.Wrap(errors.CodeInternalError, err, "获取轮询配置失败")
}
if err := s.configStore.Delete(ctx, id); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "删除轮询配置失败")
}
return nil
}
// List 列表查询轮询配置
func (s *ConfigService) List(ctx context.Context, req *dto.PollingConfigListRequest) ([]*dto.PollingConfigResponse, int64, error) {
opts := &store.QueryOptions{
Page: req.Page,
PageSize: req.PageSize,
OrderBy: "priority ASC, id DESC",
}
if opts.Page == 0 {
opts.Page = 1
}
if opts.PageSize == 0 {
opts.PageSize = constants.DefaultPageSize
}
filters := make(map[string]interface{})
if req.Status != nil {
filters["status"] = *req.Status
}
if req.CardCondition != nil {
filters["card_condition"] = *req.CardCondition
}
if req.CardCategory != nil {
filters["card_category"] = *req.CardCategory
}
if req.CarrierID != nil {
filters["carrier_id"] = *req.CarrierID
}
if req.ConfigName != nil {
filters["config_name"] = *req.ConfigName
}
configs, total, err := s.configStore.List(ctx, opts, filters)
if err != nil {
return nil, 0, errors.Wrap(errors.CodeInternalError, err, "查询轮询配置列表失败")
}
responses := make([]*dto.PollingConfigResponse, len(configs))
for i, c := range configs {
responses[i] = s.toResponse(c)
}
return responses, total, nil
}
// UpdateStatus 更新配置状态(启用/禁用)
func (s *ConfigService) UpdateStatus(ctx context.Context, id uint, status int16) error {
currentUserID := middleware.GetUserIDFromContext(ctx)
if currentUserID == 0 {
return errors.New(errors.CodeUnauthorized, "未授权访问")
}
_, err := s.configStore.GetByID(ctx, id)
if err != nil {
if err == gorm.ErrRecordNotFound {
return errors.New(errors.CodePollingConfigNotFound, "轮询配置不存在")
}
return errors.Wrap(errors.CodeInternalError, err, "获取轮询配置失败")
}
if err := s.configStore.UpdateStatus(ctx, id, status, currentUserID); err != nil {
return errors.Wrap(errors.CodeInternalError, err, "更新轮询配置状态失败")
}
return nil
}
// ListEnabled 获取所有启用的配置
func (s *ConfigService) ListEnabled(ctx context.Context) ([]*dto.PollingConfigResponse, error) {
configs, err := s.configStore.ListEnabled(ctx)
if err != nil {
return nil, errors.Wrap(errors.CodeInternalError, err, "获取启用配置失败")
}
responses := make([]*dto.PollingConfigResponse, len(configs))
for i, c := range configs {
responses[i] = s.toResponse(c)
}
return responses, nil
}
// toResponse 转换为响应 DTO
func (s *ConfigService) toResponse(c *model.PollingConfig) *dto.PollingConfigResponse {
return &dto.PollingConfigResponse{
ID: c.ID,
ConfigName: c.ConfigName,
CardCondition: c.CardCondition,
CardCategory: c.CardCategory,
CarrierID: c.CarrierID,
Priority: c.Priority,
RealnameCheckInterval: c.RealnameCheckInterval,
CarddataCheckInterval: c.CarddataCheckInterval,
PackageCheckInterval: c.PackageCheckInterval,
Status: c.Status,
Description: c.Description,
CreatedAt: c.CreatedAt.Format(time.RFC3339),
UpdatedAt: c.UpdatedAt.Format(time.RFC3339),
}
}