package polling import ( "bytes" "context" "fmt" "net/http" "strings" "sync" "time" "github.com/bytedance/sonic" "github.com/redis/go-redis/v9" "go.uber.org/zap" "github.com/break/junhong_cmp_fiber/internal/model" "github.com/break/junhong_cmp_fiber/internal/store/postgres" "github.com/break/junhong_cmp_fiber/pkg/constants" "github.com/break/junhong_cmp_fiber/pkg/errors" ) // AlertService 告警服务 type AlertService struct { ruleStore *postgres.PollingAlertRuleStore historyStore *postgres.PollingAlertHistoryStore redis *redis.Client logger *zap.Logger } // NewAlertService 创建告警服务实例 func NewAlertService( ruleStore *postgres.PollingAlertRuleStore, historyStore *postgres.PollingAlertHistoryStore, redis *redis.Client, logger *zap.Logger, ) *AlertService { return &AlertService{ ruleStore: ruleStore, historyStore: historyStore, redis: redis, logger: logger, } } // CreateRule 创建告警规则 func (s *AlertService) CreateRule(ctx context.Context, rule *model.PollingAlertRule) error { // 验证参数 if rule.RuleName == "" { return errors.New(errors.CodeInvalidParam, "规则名称不能为空") } if rule.MetricType == "" { return errors.New(errors.CodeInvalidParam, "指标类型不能为空") } if rule.TaskType == "" { return errors.New(errors.CodeInvalidParam, "任务类型不能为空") } rule.Status = 1 // 默认启用 if rule.CooldownMinutes == 0 { rule.CooldownMinutes = 5 // 默认5分钟冷却期 } if rule.Operator == "" { rule.Operator = ">" // 默认大于 } return s.ruleStore.Create(ctx, rule) } // GetRule 获取告警规则 func (s *AlertService) GetRule(ctx context.Context, id uint) (*model.PollingAlertRule, error) { rule, err := s.ruleStore.GetByID(ctx, id) if err != nil { return nil, errors.Wrap(errors.CodeNotFound, err, "告警规则不存在") } return rule, nil } // ListRules 获取告警规则列表 func (s *AlertService) ListRules(ctx context.Context) ([]*model.PollingAlertRule, error) { return s.ruleStore.List(ctx) } // UpdateRule 更新告警规则 func (s *AlertService) UpdateRule(ctx context.Context, id uint, updates map[string]interface{}) error { rule, err := s.ruleStore.GetByID(ctx, id) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "告警规则不存在") } if name, ok := updates["rule_name"].(string); ok && name != "" { rule.RuleName = name } if threshold, ok := updates["threshold"].(float64); ok { rule.Threshold = threshold } if level, ok := updates["alert_level"].(string); ok { rule.AlertLevel = level } if status, ok := updates["status"].(int); ok { rule.Status = int16(status) } if cooldown, ok := updates["cooldown_minutes"].(int); ok { rule.CooldownMinutes = cooldown } if channels, ok := updates["notification_channels"].(string); ok { rule.NotificationChannels = channels } return s.ruleStore.Update(ctx, rule) } // DeleteRule 删除告警规则 func (s *AlertService) DeleteRule(ctx context.Context, id uint) error { _, err := s.ruleStore.GetByID(ctx, id) if err != nil { return errors.Wrap(errors.CodeNotFound, err, "告警规则不存在") } return s.ruleStore.Delete(ctx, id) } // ListHistory 获取告警历史 func (s *AlertService) ListHistory(ctx context.Context, page, pageSize int, ruleID *uint) ([]*model.PollingAlertHistory, int64, error) { if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } return s.historyStore.List(ctx, page, pageSize, ruleID) } // CheckAlerts 检查告警(定时调用) func (s *AlertService) CheckAlerts(ctx context.Context) error { rules, err := s.ruleStore.ListEnabled(ctx) if err != nil { return err } for _, rule := range rules { if err := s.checkRule(ctx, rule); err != nil { s.logger.Warn("检查告警规则失败", zap.Uint("rule_id", rule.ID), zap.String("rule_name", rule.RuleName), zap.Error(err)) } } return nil } // checkRule 检查单个规则 func (s *AlertService) checkRule(ctx context.Context, rule *model.PollingAlertRule) error { // 检查冷却期 if s.isInCooldown(ctx, rule) { return nil } // 获取当前指标值 currentValue, err := s.getMetricValue(ctx, rule.TaskType, rule.MetricType) if err != nil { return err } // 判断是否触发告警 triggered := false switch rule.Operator { case ">": triggered = currentValue > rule.Threshold case ">=": triggered = currentValue >= rule.Threshold case "<": triggered = currentValue < rule.Threshold case "<=": triggered = currentValue <= rule.Threshold case "==": triggered = currentValue == rule.Threshold default: triggered = currentValue > rule.Threshold } if triggered { return s.triggerAlert(ctx, rule, currentValue) } return nil } // isInCooldown 检查是否在冷却期 func (s *AlertService) isInCooldown(ctx context.Context, rule *model.PollingAlertRule) bool { if rule.CooldownMinutes <= 0 { return false } history, err := s.historyStore.GetLatestByRuleID(ctx, rule.ID) if err != nil { return false // 没有历史记录,不在冷却期 } cooldownEnd := history.CreatedAt.Add(time.Duration(rule.CooldownMinutes) * time.Minute) return time.Now().Before(cooldownEnd) } // getMetricValue 获取指标值 func (s *AlertService) getMetricValue(ctx context.Context, taskType, metricType string) (float64, error) { statsKey := constants.RedisPollingStatsKey(taskType) data, err := s.redis.HGetAll(ctx, statsKey).Result() if err != nil { return 0, err } switch metricType { case "queue_size": // 获取队列大小 var queueKey string switch taskType { case constants.TaskTypePollingRealname: queueKey = constants.RedisPollingQueueRealnameKey() case constants.TaskTypePollingCarddata: queueKey = constants.RedisPollingQueueCarddataKey() case constants.TaskTypePollingPackage: queueKey = constants.RedisPollingQueuePackageKey() } size, _ := s.redis.ZCard(ctx, queueKey).Result() return float64(size), nil case "success_rate": success := parseInt64(data["success_count_1h"]) failure := parseInt64(data["failure_count_1h"]) total := success + failure if total == 0 { return 100, nil // 无数据时认为成功率 100% } return float64(success) / float64(total) * 100, nil case "avg_duration": success := parseInt64(data["success_count_1h"]) failure := parseInt64(data["failure_count_1h"]) total := success + failure duration := parseInt64(data["total_duration_1h"]) if total == 0 { return 0, nil } return float64(duration) / float64(total), nil case "concurrency": currentKey := constants.RedisPollingConcurrencyCurrentKey(taskType) current, _ := s.redis.Get(ctx, currentKey).Int64() return float64(current), nil default: return 0, errors.New(errors.CodeInvalidParam, "未知的指标类型") } } // triggerAlert 触发告警 func (s *AlertService) triggerAlert(ctx context.Context, rule *model.PollingAlertRule, currentValue float64) error { // 创建告警历史记录 alertMessage := s.buildAlertMessage(rule, currentValue) history := &model.PollingAlertHistory{ RuleID: rule.ID, TaskType: rule.TaskType, MetricType: rule.MetricType, AlertLevel: rule.AlertLevel, Threshold: rule.Threshold, CurrentValue: currentValue, AlertMessage: alertMessage, NotificationChannels: rule.NotificationChannels, NotificationStatus: "pending", } if err := s.historyStore.Create(ctx, history); err != nil { return err } s.logger.Warn("触发告警", zap.Uint("rule_id", rule.ID), zap.String("rule_name", rule.RuleName), zap.String("task_type", rule.TaskType), zap.String("metric_type", rule.MetricType), zap.String("level", rule.AlertLevel), zap.Float64("threshold", rule.Threshold), zap.Float64("current_value", currentValue)) // 发送通知(邮件、短信、Webhook 等) s.sendNotifications(ctx, rule, history, alertMessage) return nil } // sendNotifications 发送告警通知到配置的渠道 func (s *AlertService) sendNotifications(ctx context.Context, rule *model.PollingAlertRule, history *model.PollingAlertHistory, message string) { channels := parseNotificationChannels(rule.NotificationChannels) if len(channels) == 0 { s.logger.Debug("未配置通知渠道,跳过通知发送", zap.Uint("rule_id", rule.ID)) return } var wg sync.WaitGroup var successCount, failCount int var mu sync.Mutex for _, channel := range channels { wg.Add(1) go func(ch string) { defer wg.Done() var err error switch ch { case "email": err = s.sendEmailNotification(ctx, rule, message) case "sms": err = s.sendSMSNotification(ctx, rule, message) case "webhook": err = s.sendWebhookNotification(ctx, rule, history) default: s.logger.Warn("未知的通知渠道", zap.String("channel", ch)) return } mu.Lock() if err != nil { failCount++ s.logger.Error("发送通知失败", zap.String("channel", ch), zap.Uint("rule_id", rule.ID), zap.Error(err)) } else { successCount++ s.logger.Info("发送通知成功", zap.String("channel", ch), zap.Uint("rule_id", rule.ID)) } mu.Unlock() }(channel) } wg.Wait() // 更新通知状态 var status string if successCount > 0 && failCount == 0 { status = "sent" } else if successCount > 0 { status = "partial" } else { status = "failed" } if err := s.historyStore.UpdateNotificationStatus(ctx, history.ID, status); err != nil { s.logger.Warn("更新通知状态失败", zap.Uint("history_id", history.ID), zap.Error(err)) } } // parseNotificationChannels 解析通知渠道配置 // 格式: "email,sms,webhook" 或 JSON 数组 func parseNotificationChannels(channels string) []string { if channels == "" { return nil } // 尝试解析为 JSON 数组 var result []string if err := sonic.UnmarshalString(channels, &result); err == nil { return result } // 按逗号分割 parts := strings.Split(channels, ",") result = make([]string, 0, len(parts)) for _, p := range parts { p = strings.TrimSpace(p) if p != "" { result = append(result, p) } } return result } // getWebhookURLFromConfig 从配置中解析 Webhook URL // 配置格式: {"webhook_url": "https://example.com/webhook"} func getWebhookURLFromConfig(config string) string { if config == "" { return "" } var cfg map[string]any if err := sonic.UnmarshalString(config, &cfg); err != nil { return "" } if url, ok := cfg["webhook_url"].(string); ok { return url } return "" } // sendEmailNotification 发送邮件通知 func (s *AlertService) sendEmailNotification(_ context.Context, rule *model.PollingAlertRule, message string) error { // TODO: 集成邮件服务 // 当前仅记录日志,实际发送需要配置 SMTP 服务 s.logger.Info("邮件通知(待实现)", zap.Uint("rule_id", rule.ID), zap.String("message", message)) return nil } // sendSMSNotification 发送短信通知 func (s *AlertService) sendSMSNotification(_ context.Context, rule *model.PollingAlertRule, message string) error { // TODO: 集成短信服务 // 当前仅记录日志,实际发送需要配置短信网关 s.logger.Info("短信通知(待实现)", zap.Uint("rule_id", rule.ID), zap.String("message", message)) return nil } // sendWebhookNotification 发送 Webhook 通知 func (s *AlertService) sendWebhookNotification(ctx context.Context, rule *model.PollingAlertRule, history *model.PollingAlertHistory) error { // 从规则配置中获取 Webhook URL webhookURL := getWebhookURLFromConfig(rule.NotificationConfig) if webhookURL == "" { s.logger.Debug("未配置 Webhook URL,跳过发送", zap.Uint("rule_id", rule.ID)) return nil } // 构建告警数据 payload := map[string]any{ "rule_id": rule.ID, "rule_name": rule.RuleName, "task_type": rule.TaskType, "metric_type": rule.MetricType, "alert_level": rule.AlertLevel, "threshold": rule.Threshold, "current_value": history.CurrentValue, "message": history.AlertMessage, "triggered_at": time.Now().Format(time.RFC3339), } jsonData, err := sonic.Marshal(payload) if err != nil { return fmt.Errorf("序列化告警数据失败: %w", err) } // 发送 HTTP POST 请求 req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewReader(jsonData)) if err != nil { return fmt.Errorf("创建请求失败: %w", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { return fmt.Errorf("发送请求失败: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("Webhook 返回错误状态码: %d", resp.StatusCode) } s.logger.Info("Webhook 通知发送成功", zap.Uint("rule_id", rule.ID), zap.String("url", webhookURL), zap.Int("status_code", resp.StatusCode)) return nil } // buildAlertMessage 构建告警消息 func (s *AlertService) buildAlertMessage(rule *model.PollingAlertRule, currentValue float64) string { taskTypeName := s.getTaskTypeName(rule.TaskType) metricTypeName := s.getMetricTypeName(rule.MetricType) return taskTypeName + "的" + metricTypeName + "已触发告警: " + "当前值 " + formatFloat(currentValue) + ", 阈值 " + formatFloat(rule.Threshold) } func (s *AlertService) getTaskTypeName(taskType string) string { switch taskType { case constants.TaskTypePollingRealname: return "实名检查" case constants.TaskTypePollingCarddata: return "流量检查" case constants.TaskTypePollingPackage: return "套餐检查" default: return taskType } } func (s *AlertService) getMetricTypeName(metricType string) string { switch metricType { case "queue_size": return "队列积压" case "success_rate": return "成功率" case "avg_duration": return "平均耗时" case "concurrency": return "并发数" default: return metricType } } func formatFloat(f float64) string { // 简单格式化,保留2位小数 return string(rune(int(f))) + "." + string(rune(int(f*100)%100)) }