Files
junhong_cmp_fiber/internal/service/polling/manual_trigger_service.go
huang 03a0960c4d
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 7m2s
refactor: 数据权限过滤从 GORM Callback 改为 Store 层显式调用
- 移除 RegisterDataPermissionCallback 和 SkipDataPermission 机制
- 在 Auth 中间件预计算 SubordinateShopIDs 并注入 Context
- 新增 ApplyShopFilter/ApplyEnterpriseFilter/ApplyOwnerShopFilter 等 Helper 函数
- 所有 Store 层查询方法显式调用数据权限过滤函数
- 权限检查函数 CanManageShop/CanManageEnterprise 改为从 Context 获取数据

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-26 16:38:52 +08:00

470 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package polling
import (
"context"
"encoding/json"
"time"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"github.com/break/junhong_cmp_fiber/internal/model"
"github.com/break/junhong_cmp_fiber/internal/store/postgres"
"github.com/break/junhong_cmp_fiber/pkg/constants"
"github.com/break/junhong_cmp_fiber/pkg/errors"
"github.com/break/junhong_cmp_fiber/pkg/middleware"
)
// ManualTriggerService 手动触发服务
type ManualTriggerService struct {
logStore *postgres.PollingManualTriggerLogStore
iotCardStore *postgres.IotCardStore
redis *redis.Client
logger *zap.Logger
}
// NewManualTriggerService 创建手动触发服务实例
func NewManualTriggerService(
logStore *postgres.PollingManualTriggerLogStore,
iotCardStore *postgres.IotCardStore,
redis *redis.Client,
logger *zap.Logger,
) *ManualTriggerService {
return &ManualTriggerService{
logStore: logStore,
iotCardStore: iotCardStore,
redis: redis,
logger: logger,
}
}
// TriggerSingle 单卡手动触发
func (s *ManualTriggerService) TriggerSingle(ctx context.Context, cardID uint, taskType string, triggeredBy uint) error {
// 验证任务类型
if !isValidTaskType(taskType) {
return errors.New(errors.CodeInvalidParam, "无效的任务类型")
}
// 权限验证:检查用户是否有权管理该卡
if err := s.canManageCard(ctx, cardID); err != nil {
return err
}
// 检查每日触发限制
todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy)
if err != nil {
return err
}
if todayCount >= 100 { // 每日最多触发100次
return errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限")
}
// 检查去重
dedupeKey := constants.RedisPollingManualDedupeKey(taskType)
added, err := s.redis.SAdd(ctx, dedupeKey, cardID).Result()
if err != nil {
return err
}
if added == 0 {
return errors.New(errors.CodeInvalidParam, "该卡已在手动触发队列中")
}
// 设置去重 key 过期时间1小时
s.redis.Expire(ctx, dedupeKey, time.Hour)
// 创建触发日志
cardIDsJSON, _ := json.Marshal([]uint{cardID})
triggerLog := &model.PollingManualTriggerLog{
TaskType: taskType,
TriggerType: "single",
CardIDs: string(cardIDsJSON),
TotalCount: 1,
Status: "processing",
TriggeredBy: triggeredBy,
TriggeredAt: time.Now(),
}
if err := s.logStore.Create(ctx, triggerLog); err != nil {
return err
}
// 加入手动触发队列(使用 List优先级高于定时轮询
queueKey := constants.RedisPollingManualQueueKey(taskType)
if err := s.redis.LPush(ctx, queueKey, cardID).Err(); err != nil {
return err
}
// 更新日志状态
_ = s.logStore.UpdateProgress(ctx, triggerLog.ID, 1, 1, 0)
_ = s.logStore.UpdateStatus(ctx, triggerLog.ID, "completed")
s.logger.Info("单卡手动触发成功",
zap.Uint("card_id", cardID),
zap.String("task_type", taskType),
zap.Uint("triggered_by", triggeredBy))
return nil
}
// TriggerBatch 批量手动触发
func (s *ManualTriggerService) TriggerBatch(ctx context.Context, cardIDs []uint, taskType string, triggeredBy uint) (*model.PollingManualTriggerLog, error) {
// 验证任务类型
if !isValidTaskType(taskType) {
return nil, errors.New(errors.CodeInvalidParam, "无效的任务类型")
}
// 单次最多1000张卡
if len(cardIDs) > 1000 {
return nil, errors.New(errors.CodeInvalidParam, "单次最多触发1000张卡")
}
// 权限验证:检查用户是否有权管理所有卡
if err := s.canManageCards(ctx, cardIDs); err != nil {
return nil, err
}
// 检查每日触发限制
todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy)
if err != nil {
return nil, err
}
if todayCount >= 100 {
return nil, errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限")
}
// 创建触发日志
cardIDsJSON, _ := json.Marshal(cardIDs)
triggerLog := &model.PollingManualTriggerLog{
TaskType: taskType,
TriggerType: "batch",
CardIDs: string(cardIDsJSON),
TotalCount: len(cardIDs),
Status: "processing",
TriggeredBy: triggeredBy,
TriggeredAt: time.Now(),
}
if err := s.logStore.Create(ctx, triggerLog); err != nil {
return nil, err
}
// 异步处理批量触发
go s.processBatchTrigger(context.Background(), triggerLog.ID, cardIDs, taskType)
return triggerLog, nil
}
// processBatchTrigger 异步处理批量触发
func (s *ManualTriggerService) processBatchTrigger(ctx context.Context, logID uint, cardIDs []uint, taskType string) {
dedupeKey := constants.RedisPollingManualDedupeKey(taskType)
queueKey := constants.RedisPollingManualQueueKey(taskType)
var processedCount, successCount, failedCount int
for _, cardID := range cardIDs {
// 检查去重
added, err := s.redis.SAdd(ctx, dedupeKey, cardID).Result()
if err != nil {
failedCount++
processedCount++
continue
}
if added == 0 {
// 已在队列中,跳过
failedCount++
processedCount++
continue
}
// 加入队列
if err := s.redis.LPush(ctx, queueKey, cardID).Err(); err != nil {
failedCount++
} else {
successCount++
}
processedCount++
// 每处理100条更新一次进度
if processedCount%100 == 0 {
_ = s.logStore.UpdateProgress(ctx, logID, processedCount, successCount, failedCount)
}
}
// 设置去重 key 过期时间
s.redis.Expire(ctx, dedupeKey, time.Hour)
// 更新最终状态
_ = s.logStore.UpdateProgress(ctx, logID, processedCount, successCount, failedCount)
_ = s.logStore.UpdateStatus(ctx, logID, "completed")
s.logger.Info("批量手动触发完成",
zap.Uint("log_id", logID),
zap.Int("total", len(cardIDs)),
zap.Int("success", successCount),
zap.Int("failed", failedCount))
}
// ConditionFilter 条件筛选参数
type ConditionFilter struct {
CardStatus string `json:"card_status,omitempty"` // 卡状态
CarrierCode string `json:"carrier_code,omitempty"` // 运营商代码
CardType string `json:"card_type,omitempty"` // 卡类型
ShopID *uint `json:"shop_id,omitempty"` // 店铺ID
PackageIDs []uint `json:"package_ids,omitempty"` // 套餐ID列表
EnablePolling *bool `json:"enable_polling,omitempty"` // 是否启用轮询
Limit int `json:"limit,omitempty"` // 限制数量
}
// TriggerByCondition 条件筛选触发
func (s *ManualTriggerService) TriggerByCondition(ctx context.Context, filter *ConditionFilter, taskType string, triggeredBy uint) (*model.PollingManualTriggerLog, error) {
// 验证任务类型
if !isValidTaskType(taskType) {
return nil, errors.New(errors.CodeInvalidParam, "无效的任务类型")
}
// 设置默认限制
if filter.Limit <= 0 || filter.Limit > 1000 {
filter.Limit = 1000
}
// 权限验证:代理只能筛选自己管理的店铺的卡
if err := s.applyShopPermissionFilter(ctx, filter); err != nil {
return nil, err
}
// 检查每日触发限制
todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy)
if err != nil {
return nil, err
}
if todayCount >= 100 {
return nil, errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限")
}
// 查询符合条件的卡(已应用权限过滤)
cardIDs, err := s.queryCardsByCondition(ctx, filter)
if err != nil {
return nil, err
}
if len(cardIDs) == 0 {
return nil, errors.New(errors.CodeInvalidParam, "没有符合条件的卡")
}
// 创建触发日志
filterJSON, _ := json.Marshal(filter)
cardIDsJSON, _ := json.Marshal(cardIDs)
triggerLog := &model.PollingManualTriggerLog{
TaskType: taskType,
TriggerType: "by_condition",
CardIDs: string(cardIDsJSON),
ConditionFilter: string(filterJSON),
TotalCount: len(cardIDs),
Status: "processing",
TriggeredBy: triggeredBy,
TriggeredAt: time.Now(),
}
if err := s.logStore.Create(ctx, triggerLog); err != nil {
return nil, err
}
// 异步处理批量触发
go s.processBatchTrigger(context.Background(), triggerLog.ID, cardIDs, taskType)
return triggerLog, nil
}
// queryCardsByCondition 根据条件查询卡ID
func (s *ManualTriggerService) queryCardsByCondition(ctx context.Context, filter *ConditionFilter) ([]uint, error) {
// 构建查询条件并查询卡
queryFilter := &postgres.IotCardQueryFilter{
ShopID: filter.ShopID,
EnablePolling: filter.EnablePolling,
Limit: filter.Limit,
}
// 映射其他过滤条件
if filter.CardStatus != "" {
queryFilter.CardStatus = &filter.CardStatus
}
if filter.CarrierCode != "" {
queryFilter.CarrierCode = &filter.CarrierCode
}
if filter.CardType != "" {
queryFilter.CardType = &filter.CardType
}
// 调用 IotCardStore 查询
cardIDs, err := s.iotCardStore.QueryIDsByFilter(ctx, queryFilter)
if err != nil {
return nil, errors.Wrap(errors.CodeInternalError, err, "查询符合条件的卡失败")
}
return cardIDs, nil
}
// GetStatus 获取触发状态
func (s *ManualTriggerService) GetStatus(ctx context.Context, logID uint) (*model.PollingManualTriggerLog, error) {
return s.logStore.GetByID(ctx, logID)
}
// ListHistory 获取触发历史
func (s *ManualTriggerService) ListHistory(ctx context.Context, page, pageSize int, taskType string, triggeredBy *uint) ([]*model.PollingManualTriggerLog, int64, error) {
if page < 1 {
page = 1
}
if pageSize < 1 || pageSize > 100 {
pageSize = 20
}
return s.logStore.List(ctx, page, pageSize, taskType, triggeredBy)
}
// CancelTrigger 取消触发任务
func (s *ManualTriggerService) CancelTrigger(ctx context.Context, logID uint, triggeredBy uint) error {
log, err := s.logStore.GetByID(ctx, logID)
if err != nil {
return errors.Wrap(errors.CodeNotFound, err, "触发任务不存在")
}
if log.TriggeredBy != triggeredBy {
return errors.New(errors.CodeForbidden, "无权限取消该任务")
}
if log.Status != "pending" && log.Status != "processing" {
return errors.New(errors.CodeInvalidParam, "任务已完成或已取消")
}
return s.logStore.UpdateStatus(ctx, logID, "cancelled")
}
// GetRunningTasks 获取正在运行的任务
func (s *ManualTriggerService) GetRunningTasks(ctx context.Context, triggeredBy uint) ([]*model.PollingManualTriggerLog, error) {
return s.logStore.GetRunning(ctx, triggeredBy)
}
// GetQueueSize 获取手动触发队列大小
func (s *ManualTriggerService) GetQueueSize(ctx context.Context, taskType string) (int64, error) {
queueKey := constants.RedisPollingManualQueueKey(taskType)
return s.redis.LLen(ctx, queueKey).Result()
}
func isValidTaskType(taskType string) bool {
switch taskType {
case constants.TaskTypePollingRealname,
constants.TaskTypePollingCarddata,
constants.TaskTypePollingPackage:
return true
default:
return false
}
}
// canManageCard 检查用户是否有权管理单张卡
func (s *ManualTriggerService) canManageCard(ctx context.Context, cardID uint) error {
userType := middleware.GetUserTypeFromContext(ctx)
// 超级管理员和平台用户跳过权限检查
if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform {
return nil
}
// 企业账号禁止手动触发
if userType == constants.UserTypeEnterprise {
return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询")
}
// 代理账号只能管理自己店铺及下级店铺的卡
card, err := s.iotCardStore.GetByID(ctx, cardID)
if err != nil {
return errors.Wrap(errors.CodeForbidden, err, "无权限操作该资源或资源不存在")
}
// 平台卡ShopID为nil代理不能管理
if card.ShopID == nil {
return errors.New(errors.CodeForbidden, "无权限操作平台卡")
}
// 检查代理是否有权管理该店铺
return middleware.CanManageShop(ctx, *card.ShopID)
}
// canManageCards 检查用户是否有权管理多张卡
func (s *ManualTriggerService) canManageCards(ctx context.Context, cardIDs []uint) error {
userType := middleware.GetUserTypeFromContext(ctx)
// 超级管理员和平台用户跳过权限检查
if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform {
return nil
}
// 企业账号禁止手动触发
if userType == constants.UserTypeEnterprise {
return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询")
}
// 从 Context 获取预计算的下级店铺 ID 列表
subordinateIDs := middleware.GetSubordinateShopIDs(ctx)
if subordinateIDs == nil {
// 平台用户/超管不受限制,但这里不应该进入(前面已经检查过用户类型)
return errors.New(errors.CodeForbidden, "无权限操作")
}
// 构建可管理的店铺ID集合
allowedShopIDs := make(map[uint]bool)
for _, id := range subordinateIDs {
allowedShopIDs[id] = true
}
// 批量查询卡信息
cards, err := s.iotCardStore.GetByIDs(ctx, cardIDs)
if err != nil {
return errors.Wrap(errors.CodeForbidden, err, "查询卡信息失败")
}
// 验证所有卡都在可管理范围内
for _, card := range cards {
if card.ShopID == nil {
return errors.New(errors.CodeForbidden, "无权限操作平台卡")
}
if !allowedShopIDs[*card.ShopID] {
return errors.New(errors.CodeForbidden, "包含无权限操作的卡")
}
}
return nil
}
// applyShopPermissionFilter 应用店铺权限过滤(代理只能筛选自己管理的卡)
func (s *ManualTriggerService) applyShopPermissionFilter(ctx context.Context, filter *ConditionFilter) error {
userType := middleware.GetUserTypeFromContext(ctx)
// 超级管理员和平台用户不需要限制
if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform {
return nil
}
// 企业账号禁止手动触发
if userType == constants.UserTypeEnterprise {
return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询")
}
// 代理账号:限制只能查询自己店铺及下级店铺的卡
currentShopID := middleware.GetShopIDFromContext(ctx)
if currentShopID == 0 {
return errors.New(errors.CodeForbidden, "无权限操作")
}
// 如果用户指定了 ShopID验证是否在可管理范围内
if filter.ShopID != nil {
if err := middleware.CanManageShop(ctx, *filter.ShopID); err != nil {
return err
}
// 已指定有效的 ShopID无需修改
return nil
}
// 用户未指定 ShopID限制为当前用户的店铺代理只能查自己店铺的卡
// 注意:这里限制为当前店铺,而不是所有下级店铺,以避免返回过多数据
filter.ShopID = &currentShopID
return nil
}