package polling import ( "context" "encoding/json" "time" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" "github.com/break/junhong_cmp_fiber/pkg/middleware" ) // ManualTriggerService 手动触发服务 type ManualTriggerService struct { logStore *postgres.PollingManualTriggerLogStore iotCardStore *postgres.IotCardStore redis *redis.Client logger *zap.Logger } // NewManualTriggerService 创建手动触发服务实例 func NewManualTriggerService( logStore *postgres.PollingManualTriggerLogStore, iotCardStore *postgres.IotCardStore, redis *redis.Client, logger *zap.Logger, ) *ManualTriggerService { return &ManualTriggerService{ logStore: logStore, iotCardStore: iotCardStore, redis: redis, logger: logger, } } // TriggerSingle 单卡手动触发 func (s *ManualTriggerService) TriggerSingle(ctx context.Context, cardID uint, taskType string, triggeredBy uint) error { // 验证任务类型 if !isValidTaskType(taskType) { return errors.New(errors.CodeInvalidParam, "无效的任务类型") } // 权限验证:检查用户是否有权管理该卡 if err := s.canManageCard(ctx, cardID); err != nil { return err } // 检查每日触发限制 todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy) if err != nil { return err } if todayCount >= 100 { // 每日最多触发100次 return errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限") } // 检查去重 dedupeKey := constants.RedisPollingManualDedupeKey(taskType) added, err := s.redis.SAdd(ctx, dedupeKey, cardID).Result() if err != nil { return err } if added == 0 { return errors.New(errors.CodeInvalidParam, "该卡已在手动触发队列中") } // 设置去重 key 过期时间(1小时) s.redis.Expire(ctx, dedupeKey, time.Hour) // 创建触发日志 cardIDsJSON, _ := json.Marshal([]uint{cardID}) triggerLog := &model.PollingManualTriggerLog{ TaskType: taskType, TriggerType: "single", CardIDs: string(cardIDsJSON), TotalCount: 1, Status: "processing", TriggeredBy: triggeredBy, TriggeredAt: time.Now(), } if err := s.logStore.Create(ctx, triggerLog); err != nil { return err } // 加入手动触发队列(使用 List,优先级高于定时轮询) queueKey := constants.RedisPollingManualQueueKey(taskType) if err := s.redis.LPush(ctx, queueKey, cardID).Err(); err != nil { return err } // 更新日志状态 _ = s.logStore.UpdateProgress(ctx, triggerLog.ID, 1, 1, 0) _ = s.logStore.UpdateStatus(ctx, triggerLog.ID, "completed") s.logger.Info("单卡手动触发成功", zap.Uint("card_id", cardID), zap.String("task_type", taskType), zap.Uint("triggered_by", triggeredBy)) return nil } // TriggerBatch 批量手动触发 func (s *ManualTriggerService) TriggerBatch(ctx context.Context, cardIDs []uint, taskType string, triggeredBy uint) (*model.PollingManualTriggerLog, error) { // 验证任务类型 if !isValidTaskType(taskType) { return nil, errors.New(errors.CodeInvalidParam, "无效的任务类型") } // 单次最多1000张卡 if len(cardIDs) > 1000 { return nil, errors.New(errors.CodeInvalidParam, "单次最多触发1000张卡") } // 权限验证:检查用户是否有权管理所有卡 if err := s.canManageCards(ctx, cardIDs); err != nil { return nil, err } // 检查每日触发限制 todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy) if err != nil { return nil, err } if todayCount >= 100 { return nil, errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限") } // 创建触发日志 cardIDsJSON, _ := json.Marshal(cardIDs) triggerLog := &model.PollingManualTriggerLog{ TaskType: taskType, TriggerType: "batch", CardIDs: string(cardIDsJSON), TotalCount: len(cardIDs), Status: "processing", TriggeredBy: triggeredBy, TriggeredAt: time.Now(), } if err := s.logStore.Create(ctx, triggerLog); err != nil { return nil, err } // 异步处理批量触发 go s.processBatchTrigger(context.Background(), triggerLog.ID, cardIDs, taskType) return triggerLog, nil } // processBatchTrigger 异步处理批量触发 func (s *ManualTriggerService) processBatchTrigger(ctx context.Context, logID uint, cardIDs []uint, taskType string) { dedupeKey := constants.RedisPollingManualDedupeKey(taskType) queueKey := constants.RedisPollingManualQueueKey(taskType) var processedCount, successCount, failedCount int for _, cardID := range cardIDs { // 检查去重 added, err := s.redis.SAdd(ctx, dedupeKey, cardID).Result() if err != nil { failedCount++ processedCount++ continue } if added == 0 { // 已在队列中,跳过 failedCount++ processedCount++ continue } // 加入队列 if err := s.redis.LPush(ctx, queueKey, cardID).Err(); err != nil { failedCount++ } else { successCount++ } processedCount++ // 每处理100条更新一次进度 if processedCount%100 == 0 { _ = s.logStore.UpdateProgress(ctx, logID, processedCount, successCount, failedCount) } } // 设置去重 key 过期时间 s.redis.Expire(ctx, dedupeKey, time.Hour) // 更新最终状态 _ = s.logStore.UpdateProgress(ctx, logID, processedCount, successCount, failedCount) _ = s.logStore.UpdateStatus(ctx, logID, "completed") s.logger.Info("批量手动触发完成", zap.Uint("log_id", logID), zap.Int("total", len(cardIDs)), zap.Int("success", successCount), zap.Int("failed", failedCount)) } // ConditionFilter 条件筛选参数 type ConditionFilter struct { CardStatus string `json:"card_status,omitempty"` // 卡状态 CarrierCode string `json:"carrier_code,omitempty"` // 运营商代码 CardType string `json:"card_type,omitempty"` // 卡类型 ShopID *uint `json:"shop_id,omitempty"` // 店铺ID PackageIDs []uint `json:"package_ids,omitempty"` // 套餐ID列表 EnablePolling *bool `json:"enable_polling,omitempty"` // 是否启用轮询 Limit int `json:"limit,omitempty"` // 限制数量 } // TriggerByCondition 条件筛选触发 func (s *ManualTriggerService) TriggerByCondition(ctx context.Context, filter *ConditionFilter, taskType string, triggeredBy uint) (*model.PollingManualTriggerLog, error) { // 验证任务类型 if !isValidTaskType(taskType) { return nil, errors.New(errors.CodeInvalidParam, "无效的任务类型") } // 设置默认限制 if filter.Limit <= 0 || filter.Limit > 1000 { filter.Limit = 1000 } // 权限验证:代理只能筛选自己管理的店铺的卡 if err := s.applyShopPermissionFilter(ctx, filter); err != nil { return nil, err } // 检查每日触发限制 todayCount, err := s.logStore.CountTodayTriggers(ctx, triggeredBy) if err != nil { return nil, err } if todayCount >= 100 { return nil, errors.New(errors.CodeInvalidParam, "已达到每日触发次数上限") } // 查询符合条件的卡(已应用权限过滤) cardIDs, err := s.queryCardsByCondition(ctx, filter) if err != nil { return nil, err } if len(cardIDs) == 0 { return nil, errors.New(errors.CodeInvalidParam, "没有符合条件的卡") } // 创建触发日志 filterJSON, _ := json.Marshal(filter) cardIDsJSON, _ := json.Marshal(cardIDs) triggerLog := &model.PollingManualTriggerLog{ TaskType: taskType, TriggerType: "by_condition", CardIDs: string(cardIDsJSON), ConditionFilter: string(filterJSON), TotalCount: len(cardIDs), Status: "processing", TriggeredBy: triggeredBy, TriggeredAt: time.Now(), } if err := s.logStore.Create(ctx, triggerLog); err != nil { return nil, err } // 异步处理批量触发 go s.processBatchTrigger(context.Background(), triggerLog.ID, cardIDs, taskType) return triggerLog, nil } // queryCardsByCondition 根据条件查询卡ID func (s *ManualTriggerService) queryCardsByCondition(ctx context.Context, filter *ConditionFilter) ([]uint, error) { // 构建查询条件并查询卡 queryFilter := &postgres.IotCardQueryFilter{ ShopID: filter.ShopID, EnablePolling: filter.EnablePolling, Limit: filter.Limit, } // 映射其他过滤条件 if filter.CardStatus != "" { queryFilter.CardStatus = &filter.CardStatus } if filter.CarrierCode != "" { queryFilter.CarrierCode = &filter.CarrierCode } if filter.CardType != "" { queryFilter.CardType = &filter.CardType } // 调用 IotCardStore 查询 cardIDs, err := s.iotCardStore.QueryIDsByFilter(ctx, queryFilter) if err != nil { return nil, errors.Wrap(errors.CodeInternalError, err, "查询符合条件的卡失败") } return cardIDs, nil } // GetStatus 获取触发状态 func (s *ManualTriggerService) GetStatus(ctx context.Context, logID uint) (*model.PollingManualTriggerLog, error) { return s.logStore.GetByID(ctx, logID) } // ListHistory 获取触发历史 func (s *ManualTriggerService) ListHistory(ctx context.Context, page, pageSize int, taskType string, triggeredBy *uint) ([]*model.PollingManualTriggerLog, int64, error) { if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } return s.logStore.List(ctx, page, pageSize, taskType, triggeredBy) } // CancelTrigger 取消触发任务 func (s *ManualTriggerService) CancelTrigger(ctx context.Context, logID uint, triggeredBy uint) error { log, err := s.logStore.GetByID(ctx, logID) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "触发任务不存在") } if log.TriggeredBy != triggeredBy { return errors.New(errors.CodeForbidden, "无权限取消该任务") } if log.Status != "pending" && log.Status != "processing" { return errors.New(errors.CodeInvalidParam, "任务已完成或已取消") } return s.logStore.UpdateStatus(ctx, logID, "cancelled") } // GetRunningTasks 获取正在运行的任务 func (s *ManualTriggerService) GetRunningTasks(ctx context.Context, triggeredBy uint) ([]*model.PollingManualTriggerLog, error) { return s.logStore.GetRunning(ctx, triggeredBy) } // GetQueueSize 获取手动触发队列大小 func (s *ManualTriggerService) GetQueueSize(ctx context.Context, taskType string) (int64, error) { queueKey := constants.RedisPollingManualQueueKey(taskType) return s.redis.LLen(ctx, queueKey).Result() } func isValidTaskType(taskType string) bool { switch taskType { case constants.TaskTypePollingRealname, constants.TaskTypePollingCarddata, constants.TaskTypePollingPackage: return true default: return false } } // canManageCard 检查用户是否有权管理单张卡 func (s *ManualTriggerService) canManageCard(ctx context.Context, cardID uint) error { userType := middleware.GetUserTypeFromContext(ctx) // 超级管理员和平台用户跳过权限检查 if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform { return nil } // 企业账号禁止手动触发 if userType == constants.UserTypeEnterprise { return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询") } // 代理账号只能管理自己店铺及下级店铺的卡 card, err := s.iotCardStore.GetByID(ctx, cardID) if err != nil { return errors.Wrap(errors.CodeForbidden, err, "无权限操作该资源或资源不存在") } // 平台卡(ShopID为nil)代理不能管理 if card.ShopID == nil { return errors.New(errors.CodeForbidden, "无权限操作平台卡") } // 检查代理是否有权管理该店铺 return middleware.CanManageShop(ctx, *card.ShopID) } // canManageCards 检查用户是否有权管理多张卡 func (s *ManualTriggerService) canManageCards(ctx context.Context, cardIDs []uint) error { userType := middleware.GetUserTypeFromContext(ctx) // 超级管理员和平台用户跳过权限检查 if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform { return nil } // 企业账号禁止手动触发 if userType == constants.UserTypeEnterprise { return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询") } // 从 Context 获取预计算的下级店铺 ID 列表 subordinateIDs := middleware.GetSubordinateShopIDs(ctx) if subordinateIDs == nil { // 平台用户/超管不受限制,但这里不应该进入(前面已经检查过用户类型) return errors.New(errors.CodeForbidden, "无权限操作") } // 构建可管理的店铺ID集合 allowedShopIDs := make(map[uint]bool) for _, id := range subordinateIDs { allowedShopIDs[id] = true } // 批量查询卡信息 cards, err := s.iotCardStore.GetByIDs(ctx, cardIDs) if err != nil { return errors.Wrap(errors.CodeForbidden, err, "查询卡信息失败") } // 验证所有卡都在可管理范围内 for _, card := range cards { if card.ShopID == nil { return errors.New(errors.CodeForbidden, "无权限操作平台卡") } if !allowedShopIDs[*card.ShopID] { return errors.New(errors.CodeForbidden, "包含无权限操作的卡") } } return nil } // applyShopPermissionFilter 应用店铺权限过滤(代理只能筛选自己管理的卡) func (s *ManualTriggerService) applyShopPermissionFilter(ctx context.Context, filter *ConditionFilter) error { userType := middleware.GetUserTypeFromContext(ctx) // 超级管理员和平台用户不需要限制 if userType == constants.UserTypeSuperAdmin || userType == constants.UserTypePlatform { return nil } // 企业账号禁止手动触发 if userType == constants.UserTypeEnterprise { return errors.New(errors.CodeForbidden, "企业账号无权限手动触发轮询") } // 代理账号:限制只能查询自己店铺及下级店铺的卡 currentShopID := middleware.GetShopIDFromContext(ctx) if currentShopID == 0 { return errors.New(errors.CodeForbidden, "无权限操作") } // 如果用户指定了 ShopID,验证是否在可管理范围内 if filter.ShopID != nil { if err := middleware.CanManageShop(ctx, *filter.ShopID); err != nil { return err } // 已指定有效的 ShopID,无需修改 return nil } // 用户未指定 ShopID,限制为当前用户的店铺(代理只能查自己店铺的卡) // 注意:这里限制为当前店铺,而不是所有下级店铺,以避免返回过多数据 filter.ShopID = ¤tShopID return nil }