Files
junhong_cmp_fiber/scripts/benchmark/mock_gateway.go
huang 931e140e8e
All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m35s
feat: 实现 IoT 卡轮询系统(支持千万级卡规模)
实现功能:
- 实名状态检查轮询(可配置间隔)
- 卡流量检查轮询(支持跨月流量追踪)
- 套餐检查与超额自动停机
- 分布式并发控制(Redis 信号量)
- 手动触发轮询(单卡/批量/条件筛选)
- 数据清理配置与执行
- 告警规则与历史记录
- 实时监控统计(队列/性能/并发)

性能优化:
- Redis 缓存卡信息,减少 DB 查询
- Pipeline 批量写入 Redis
- 异步流量记录写入
- 渐进式初始化(10万卡/批)

压测工具(scripts/benchmark/):
- Mock Gateway 模拟上游服务
- 测试卡生成器
- 配置初始化脚本
- 实时监控脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 17:32:44 +08:00

264 lines
7.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// +build ignore
package main
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sync/atomic"
"time"
)
// 统计计数器
var (
totalRequests int64
successRequests int64
failedRequests int64
startTime time.Time
fastMode bool // 快速模式:低延迟
)
// GatewayResponse 模拟网关响应
type GatewayResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
TraceID string `json:"traceId"`
Data json.RawMessage `json:"data"`
}
func main() {
startTime = time.Now()
rand.Seed(time.Now().UnixNano())
// 检查是否启用快速模式
if os.Getenv("FAST_MODE") == "1" || os.Getenv("FAST_MODE") == "true" {
fastMode = true
fmt.Println("⚡ 快速模式已启用(延迟: 10-50ms")
} else {
fmt.Println("🐢 真实模式(延迟: 200ms-4s")
fmt.Println(" 提示: 设置 FAST_MODE=1 可启用快速模式")
}
// 实名查询接口(匹配 gateway client 的路径)
http.HandleFunc("/flow-card/realname-status", handleRealnameQuery)
// 流量查询接口
http.HandleFunc("/flow-card/flow", handleFlowQuery)
// 停机接口
http.HandleFunc("/flow-card/cardStop", handleStopCard)
// 复机接口
http.HandleFunc("/flow-card/cardStart", handleStartCard)
// 卡状态查询接口
http.HandleFunc("/flow-card/status", handleCardStatus)
// 统计接口
http.HandleFunc("/stats", handleStats)
fmt.Println("=== Mock Gateway 服务器启动 ===")
fmt.Println("监听端口: 8888")
fmt.Println("模拟响应时间: 200ms - 4s")
fmt.Println("")
fmt.Println("接口列表:")
fmt.Println(" POST /flow-card/realname-status - 实名查询")
fmt.Println(" POST /flow-card/flow - 流量查询")
fmt.Println(" POST /flow-card/status - 卡状态查询")
fmt.Println(" POST /flow-card/cardStop - 停机操作")
fmt.Println(" POST /flow-card/cardStart - 复机操作")
fmt.Println(" GET /stats - 查看统计")
fmt.Println("")
fmt.Println("按 Ctrl+C 停止服务器")
log.Fatal(http.ListenAndServe(":8888", nil))
}
// simulateLatency 模拟网络延迟
func simulateLatency() {
var delay time.Duration
if fastMode {
// 快速模式10-50ms
delay = time.Duration(10+rand.Intn(40)) * time.Millisecond
} else {
// 真实模式200ms - 4s
// 80% 概率 200-500ms正常
// 15% 概率 500ms-2s较慢
// 5% 概率 2s-4s很慢
r := rand.Float64()
if r < 0.80 {
delay = time.Duration(200+rand.Intn(300)) * time.Millisecond
} else if r < 0.95 {
delay = time.Duration(500+rand.Intn(1500)) * time.Millisecond
} else {
delay = time.Duration(2000+rand.Intn(2000)) * time.Millisecond
}
}
time.Sleep(delay)
}
// handleRealnameQuery 处理实名查询
func handleRealnameQuery(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&totalRequests, 1)
simulateLatency()
// 90% 成功10% 失败
if rand.Float64() < 0.90 {
atomic.AddInt64(&successRequests, 1)
// 随机返回实名状态
statuses := []string{"未实名", "实名中", "已实名"}
status := statuses[rand.Intn(3)]
resp := GatewayResponse{
Code: 200,
Msg: "success",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
Data: json.RawMessage(fmt.Sprintf(`{"status": "%s"}`, status)),
}
json.NewEncoder(w).Encode(resp)
} else {
atomic.AddInt64(&failedRequests, 1)
resp := GatewayResponse{
Code: 500,
Msg: "upstream error",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
}
json.NewEncoder(w).Encode(resp)
}
}
// handleFlowQuery 处理流量查询
func handleFlowQuery(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&totalRequests, 1)
simulateLatency()
if rand.Float64() < 0.90 {
atomic.AddInt64(&successRequests, 1)
// 随机返回流量数据(匹配 FlowUsageResp 结构)
usedFlow := rand.Intn(10000)
resp := GatewayResponse{
Code: 200,
Msg: "success",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
Data: json.RawMessage(fmt.Sprintf(`{"usedFlow": %d, "unit": "MB"}`, usedFlow)),
}
json.NewEncoder(w).Encode(resp)
} else {
atomic.AddInt64(&failedRequests, 1)
resp := GatewayResponse{
Code: 500,
Msg: "upstream error",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
}
json.NewEncoder(w).Encode(resp)
}
}
// handleStopCard 处理停机操作
func handleStopCard(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&totalRequests, 1)
simulateLatency()
if rand.Float64() < 0.95 {
atomic.AddInt64(&successRequests, 1)
resp := GatewayResponse{
Code: 200,
Msg: "success",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
Data: json.RawMessage(`{"result": "stopped"}`),
}
json.NewEncoder(w).Encode(resp)
} else {
atomic.AddInt64(&failedRequests, 1)
resp := GatewayResponse{
Code: 500,
Msg: "stop failed",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
}
json.NewEncoder(w).Encode(resp)
}
}
// handleStartCard 处理复机操作
func handleStartCard(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&totalRequests, 1)
simulateLatency()
if rand.Float64() < 0.95 {
atomic.AddInt64(&successRequests, 1)
resp := GatewayResponse{
Code: 200,
Msg: "success",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
Data: json.RawMessage(`{"result": "started"}`),
}
json.NewEncoder(w).Encode(resp)
} else {
atomic.AddInt64(&failedRequests, 1)
resp := GatewayResponse{
Code: 500,
Msg: "start failed",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
}
json.NewEncoder(w).Encode(resp)
}
}
// handleCardStatus 处理卡状态查询
func handleCardStatus(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&totalRequests, 1)
simulateLatency()
if rand.Float64() < 0.90 {
atomic.AddInt64(&successRequests, 1)
// 随机返回卡状态1-正常0-停机
cardStatus := rand.Intn(2)
resp := GatewayResponse{
Code: 200,
Msg: "success",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
Data: json.RawMessage(fmt.Sprintf(`{"status": %d}`, cardStatus)),
}
json.NewEncoder(w).Encode(resp)
} else {
atomic.AddInt64(&failedRequests, 1)
resp := GatewayResponse{
Code: 500,
Msg: "query failed",
TraceID: fmt.Sprintf("trace-%d", time.Now().UnixNano()),
}
json.NewEncoder(w).Encode(resp)
}
}
// handleStats 返回统计信息
func handleStats(w http.ResponseWriter, r *http.Request) {
elapsed := time.Since(startTime).Seconds()
total := atomic.LoadInt64(&totalRequests)
success := atomic.LoadInt64(&successRequests)
failed := atomic.LoadInt64(&failedRequests)
qps := float64(total) / elapsed
successRate := float64(0)
if total > 0 {
successRate = float64(success) * 100 / float64(total)
}
stats := map[string]interface{}{
"uptime_seconds": elapsed,
"total_requests": total,
"success_count": success,
"failed_count": failed,
"qps": qps,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}