This commit is contained in:
2026-02-03 10:19:39 +08:00
parent 5a90caa619
commit ad6d43e0cd
19 changed files with 0 additions and 17535 deletions

View File

@@ -1,647 +0,0 @@
# 流程测试规范文档
> 本文档定义了业务流程测试的编写规范。AI 助手根据用户描述的业务流程自动生成测试脚本。
## 快速开始
### 环境准备
```bash
cd flow_tests
python3 -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
```
### 运行测试
```bash
# 运行所有测试
pytest
# 运行指定模块
pytest tests/test_account_flow.py -v
# 运行指定流程
pytest tests/test_account_flow.py::test_create_agent_flow -v
# 查看详细输出
pytest -v --tb=short
```
---
## 用户如何描述流程
### 描述格式(自然语言即可)
用户只需用自然语言描述业务流程AI 会自动:
1. 理解流程意图
2. 找到对应的后端接口
3. 生成测试脚本
4. 处理数据清理
### 描述示例
**示例 1简单流程**
> "测试创建代理商:平台管理员创建一个店铺,然后给这个店铺创建管理员账号,验证这个账号能登录"
**示例 2带条件的流程**
> "测试套餐分配:代理商只有被分配了套餐才能卖货。先创建店铺,不分配套餐时应该看不到任何可售套餐,分配后才能看到"
**示例 3异步流程**
> "测试设备导入:上传 CSV 文件导入设备,这是异步任务,需要等待任务完成后验证设备确实入库了"
**示例 4涉及第三方**
> "测试充值流程:用户下单充值,支付成功后卡片流量应该增加。支付回调可以直接模拟"
### 需要说明的要素
| 要素 | 必须 | 说明 |
|------|------|------|
| 操作角色 | 是 | 谁在操作(平台管理员/代理商/企业用户/普通用户) |
| 操作步骤 | 是 | 按顺序描述要做什么 |
| 预期结果 | 是 | 每步操作后应该发生什么 |
| 前置条件 | 否 | 如果有特殊前置条件需要说明 |
| 异步等待 | 否 | 如果涉及异步任务需要说明 |
---
## 测试框架结构
```
flow_tests/
├── AGENTS.md # 本规范文档
├── openapi.yaml # OpenAPI 接口文档AI 助手必读)
├── requirements.txt # Python 依赖
├── pytest.ini # pytest 配置
├── config/
│ ├── settings.py # 配置加载
│ ├── local.yaml # 本地环境配置
│ └── remote.yaml # 远程环境配置
├── core/
│ ├── __init__.py
│ ├── client.py # HTTP 客户端封装
│ ├── auth.py # 认证管理
│ ├── database.py # 数据库直连(验证用)
│ ├── cleanup.py # 数据清理追踪器
│ ├── mock.py # 第三方服务 Mock
│ └── wait.py # 异步任务等待器
├── fixtures/
│ ├── __init__.py
│ └── common.py # 通用 pytest fixtures
└── tests/
├── __init__.py
├── test_account_flow.py # 账号管理流程
├── test_shop_flow.py # 店铺管理流程
└── ... # 其他模块流程
```
---
## 核心组件说明
### 1. HTTP 客户端 (core/client.py)
```python
from core.client import APIClient
# 创建客户端
client = APIClient()
# 登录获取 token
client.login("admin", "password")
# 发起请求(自动带 token
resp = client.post("/api/admin/accounts", json={...})
resp = client.get("/api/admin/accounts/1")
# 断言响应
assert resp.ok() # code == 0
assert resp.code == 0
assert resp.data["id"] == 1
```
### 2. 数据清理追踪器 (core/cleanup.py)
**核心原则:只删除测试创建的数据,不影响原有数据**
```python
from core.cleanup import CleanupTracker
# 在 fixture 中初始化
tracker = CleanupTracker(db_connection)
# 记录创建的数据
account_id = create_account(...)
tracker.track("admin_accounts", account_id)
shop_id = create_shop(...)
tracker.track("shops", shop_id)
# 测试结束后自动清理(逆序删除,处理依赖)
tracker.cleanup() # 先删 account再删 shop
```
### 3. 认证管理 (core/auth.py)
```python
from core.auth import AuthManager
auth = AuthManager(client)
# 预置角色快速登录
auth.as_super_admin() # 超级管理员
auth.as_platform_admin() # 平台管理员
auth.as_agent(shop_id) # 代理商(需指定店铺)
auth.as_enterprise(ent_id) # 企业用户
# 自定义账号登录
auth.login("custom_account", "password")
```
### 4. 异步任务等待器 (core/wait.py)
```python
from core.wait import wait_for_task, wait_for_condition
# 等待异步任务完成
result = wait_for_task(
task_type="device_import",
task_id=task_id,
timeout=60,
poll_interval=2
)
# 等待条件满足
wait_for_condition(
condition=lambda: db.query("SELECT count(*) FROM devices WHERE batch_id = %s", batch_id) > 0,
timeout=30,
message="等待设备入库"
)
```
### 5. Mock 服务 (core/mock.py)
```python
from core.mock import MockService
mock = MockService(db_connection)
# 模拟支付回调
mock.payment_success(order_id, amount=100)
# 模拟短信验证码(直接写入数据库/Redis
mock.sms_code(phone="13800138000", code="123456")
# 模拟第三方 API 响应(如果后端支持 mock 模式)
mock.external_api("carrier_recharge", response={"success": True})
```
---
## 测试编写规范
### 基本结构
```python
"""
账号管理流程测试
测试场景:
1. 创建代理商账号流程
2. 账号权限验证流程
...
"""
import pytest
from core.client import APIClient
from core.auth import AuthManager
from core.cleanup import CleanupTracker
class TestAccountFlow:
"""账号管理流程"""
def test_create_agent_account_flow(self, client, auth, tracker, db):
"""
流程:创建代理商账号
步骤:
1. 平台管理员创建店铺
2. 给店铺创建管理员账号
3. 验证新账号能登录
4. 验证只能看到自己店铺的数据
"""
# === 1. 平台管理员创建店铺 ===
auth.as_platform_admin()
resp = client.post("/api/admin/shops", json={
"name": "测试代理商",
"contact": "张三",
"phone": "13800138000"
})
assert resp.ok(), f"创建店铺失败: {resp.msg}"
shop_id = resp.data["id"]
tracker.track("shops", shop_id)
# === 2. 创建店铺管理员账号 ===
resp = client.post("/api/admin/accounts", json={
"username": "test_agent_admin",
"password": "Test123456",
"shop_id": shop_id,
"role_ids": [2] # 假设 2 是店铺管理员角色
})
assert resp.ok(), f"创建账号失败: {resp.msg}"
account_id = resp.data["id"]
tracker.track("admin_accounts", account_id)
# === 3. 验证新账号能登录 ===
auth.login("test_agent_admin", "Test123456")
resp = client.get("/api/admin/auth/me")
assert resp.ok()
assert resp.data["shop_id"] == shop_id
# === 4. 验证只能看到自己店铺数据 ===
resp = client.get("/api/admin/shops")
assert resp.ok()
# 代理商只能看到自己的店铺
assert len(resp.data["list"]) == 1
assert resp.data["list"][0]["id"] == shop_id
```
### 命名规范
| 类型 | 规范 | 示例 |
|------|------|------|
| 文件名 | `test_{模块}_flow.py` | `test_account_flow.py` |
| 类名 | `Test{模块}Flow` | `TestAccountFlow` |
| 方法名 | `test_{流程描述}` | `test_create_agent_account_flow` |
| 方法文档 | 必须包含流程步骤 | 见上方示例 |
### Fixtures 使用
```python
# fixtures/common.py 提供以下通用 fixtures
@pytest.fixture
def client():
"""HTTP 客户端"""
return APIClient()
@pytest.fixture
def auth(client):
"""认证管理器"""
return AuthManager(client)
@pytest.fixture
def db():
"""数据库连接(只读验证用)"""
return get_db_connection()
@pytest.fixture
def tracker(db):
"""数据清理追踪器"""
t = CleanupTracker(db)
yield t
t.cleanup() # 测试结束自动清理
@pytest.fixture
def mock(db):
"""Mock 服务"""
return MockService(db)
```
---
## 异步任务测试规范
### 导入类任务设备导入、IoT卡导入
```python
def test_device_import_flow(self, client, auth, tracker, db):
"""
流程:设备批量导入
步骤:
1. 上传 CSV 文件
2. 创建导入任务
3. 等待任务完成
4. 验证设备入库
"""
auth.as_platform_admin()
# 1. 上传文件
with open("fixtures/devices.csv", "rb") as f:
resp = client.upload("/api/admin/storage/upload", file=f)
file_url = resp.data["url"]
# 2. 创建导入任务
resp = client.post("/api/admin/device-imports", json={
"file_url": file_url,
"carrier_id": 1
})
assert resp.ok()
task_id = resp.data["task_id"]
# 3. 等待任务完成
from core.wait import wait_for_task
result = wait_for_task("device_import", task_id, timeout=60)
assert result["status"] == "completed"
# 4. 验证设备入库
imported_count = db.scalar(
"SELECT count(*) FROM devices WHERE import_task_id = %s",
task_id
)
assert imported_count == 10 # CSV 中有 10 条
# 追踪清理
tracker.track_by_query("devices", f"import_task_id = {task_id}")
```
### 支付回调类任务
```python
def test_recharge_flow(self, client, auth, tracker, db, mock):
"""
流程:充值支付
步骤:
1. 用户创建充值订单
2. 模拟支付成功回调
3. 验证卡片流量增加
"""
auth.as_enterprise(enterprise_id=1)
# 1. 创建充值订单
resp = client.post("/api/h5/recharge/orders", json={
"card_id": 123,
"package_id": 456
})
assert resp.ok()
order_id = resp.data["order_id"]
tracker.track("orders", order_id)
# 获取支付前流量
before_data = db.scalar(
"SELECT data_balance FROM iot_cards WHERE id = 123"
)
# 2. 模拟支付回调
mock.payment_success(order_id, amount=50.00)
# 3. 验证流量增加
from core.wait import wait_for_condition
wait_for_condition(
condition=lambda: db.scalar(
"SELECT data_balance FROM iot_cards WHERE id = 123"
) > before_data,
timeout=10,
message="等待流量到账"
)
```
---
## 角色权限说明
测试时需要了解系统角色体系:
| 角色 | 说明 | 数据范围 |
|------|------|----------|
| 超级管理员 | 系统最高权限 | 全部数据 |
| 平台管理员 | 平台运营人员 | 全部数据(受权限配置限制) |
| 代理商管理员 | 店铺管理者 | 本店铺 + 下级店铺 |
| 代理商员工 | 店铺普通员工 | 本店铺 |
| 企业管理员 | 企业用户 | 本企业数据 |
---
## 环境配置
### config/local.yaml
```yaml
# 本地开发环境
api:
base_url: "http://localhost:3000"
timeout: 30
database:
host: "localhost"
port: 5432
name: "junhong_dev"
user: "postgres"
password: "postgres"
redis:
host: "localhost"
port: 6379
db: 0
# 预置测试账号
accounts:
super_admin:
username: "superadmin"
password: "Admin123456"
platform_admin:
username: "platform"
password: "Admin123456"
```
### config/remote.yaml
```yaml
# 远程测试环境
api:
base_url: "https://test-api.example.com"
timeout: 30
database:
host: "test-db.example.com"
port: 5432
name: "junhong_test"
user: "test_user"
password: "test_password"
redis:
host: "test-redis.example.com"
port: 6379
db: 0
```
### 切换环境
```bash
# 使用本地环境(默认)
pytest
# 使用远程环境
TEST_ENV=remote pytest
```
---
## 数据清理规则
### 清理原则
1. **只删除测试创建的数据** - 通过 tracker 追踪
2. **逆序删除** - 先删依赖方,再删被依赖方
3. **软删除优先** - 如果表支持软删除,使用软删除
4. **级联处理** - 自动处理关联数据
### 追踪方式
```python
# 方式1追踪单条记录
tracker.track("table_name", record_id)
# 方式2追踪多条记录
tracker.track_many("table_name", [id1, id2, id3])
# 方式3按条件追踪用于批量导入等场景
tracker.track_by_query("devices", "import_task_id = 123")
# 方式4追踪关联数据自动级联
tracker.track_with_relations("shops", shop_id, relations=[
("admin_accounts", "shop_id"),
("shop_packages", "shop_id")
])
```
### 清理顺序配置
```python
# core/cleanup.py 中定义表的依赖关系
TABLE_DEPENDENCIES = {
"admin_accounts": ["shops"], # accounts 依赖 shops
"shop_packages": ["shops", "packages"], # shop_packages 依赖 shops 和 packages
"orders": ["iot_cards", "packages"],
# ... 根据实际表结构配置
}
```
---
## 常见问题
### Q: 如何处理需要真实第三方服务的测试?
A: 两种方式:
1. 使用 Mock 模式(推荐):直接模拟回调或写入预期数据
2. 如果必须真实调用:在配置中标记为 `skip_in_mock_mode`,仅在集成环境运行
### Q: 测试数据影响了其他人怎么办?
A:
1. 本地环境:数据隔离,不影响他人
2. 远程环境:使用唯一标识(如 UUID创建数据确保清理
### Q: 异步任务超时怎么办?
A:
1. 检查任务是否真的启动了
2. 检查 worker 是否在运行
3. 增加超时时间(最后手段)
4. 查看任务日志定位问题
---
## 接口文档OpenAPI
**重要**生成测试时AI 助手必须首先查阅 `flow_tests/openapi.yaml` 获取准确的接口信息。
### 文件位置
```
flow_tests/openapi.yaml # OpenAPI 3.0 规范文档
```
### 文档结构
```yaml
# openapi.yaml 结构
components:
schemas: # 数据模型定义DTO
DtoCreateShopRequest:
properties:
shop_name: { type: string }
shop_code: { type: string }
# ...
required: [shop_name, shop_code]
paths: # API 路径定义
/api/admin/shops:
post:
summary: 创建店铺
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/DtoCreateShopRequest'
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/DtoShopResponse'
```
### AI 助手使用方式
1. **查找接口**:根据用户描述的流程,在 `paths` 中找到对应的 API 路径
2. **获取请求参数**:从 `requestBody.schema` 获取请求体结构
3. **获取响应格式**:从 `responses.200.schema` 获取响应体结构
4. **理解字段含义**:从 `components.schemas` 中查看字段的 `description`
### 示例:查找创建店铺接口
用户说:"创建一个店铺"
AI 助手应该:
1. 读取 `openapi.yaml`
2. 搜索 `shops` 相关路径 → 找到 `POST /api/admin/shops`
3. 查看 `DtoCreateShopRequest` 了解必填字段:`shop_name`, `shop_code`, `init_username`, `init_phone`, `init_password`
4. 生成正确的测试代码
```python
resp = client.post("/api/admin/shops", json={
"shop_name": "测试店铺",
"shop_code": "TEST001",
"init_username": "test_admin",
"init_phone": "13800138000",
"init_password": "Test123456",
})
```
---
## AI 助手工作流程
当用户描述业务流程后AI 助手按以下步骤工作:
1. **理解流程**:分析用户描述,提取操作角色、步骤、预期结果
2. **查阅接口文档**:读取 `flow_tests/openapi.yaml` 获取准确的接口路径、请求参数、响应格式
3. **生成测试**:按照本规范生成测试代码,使用 OpenAPI 文档中的字段定义
4. **补充清理**:添加数据追踪和清理逻辑
5. **运行验证**:执行测试确保通过
6. **报告结果**:告知用户测试结果,如发现问题则报告
### 接口查找优先级
| 优先级 | 来源 | 说明 |
|--------|------|------|
| 1 | `flow_tests/openapi.yaml` | **首选**,最准确的接口定义 |
| 2 | 项目代码 `internal/handler/` | OpenAPI 未覆盖时查找源码 |
| 3 | 项目代码 `internal/router/` | 确认路由注册 |
---
## 版本记录
| 版本 | 日期 | 说明 |
|------|------|------|
| 1.0 | 2026-02-02 | 初始版本 |
| 1.1 | 2026-02-02 | 增加 OpenAPI 接口文档规范 |

View File

@@ -1,37 +0,0 @@
# 本地开发环境配置
# 使用方式: TEST_ENV=local pytest (默认)
api:
base_url: "http://localhost:3000"
timeout: 30
database:
host: "localhost"
port: 5432
name: "junhong_dev"
user: "postgres"
password: "postgres"
redis:
host: "localhost"
port: 6379
db: 0
# 预置测试账号
# 根据实际系统配置修改
accounts:
super_admin:
username: "superadmin"
password: "Admin123456"
platform_admin:
username: "platform"
password: "Admin123456"
# Mock 配置
mock:
# 是否启用 Mock 模式(跳过真实第三方调用)
enabled: true
# 支付回调 Mock
payment:
auto_success: true
delay_seconds: 1

View File

@@ -1,34 +0,0 @@
# 远程测试环境配置
# 使用方式: TEST_ENV=remote pytest
api:
base_url: "https://test-api.example.com"
timeout: 30
database:
host: "test-db.example.com"
port: 5432
name: "junhong_test"
user: "test_user"
password: "test_password"
redis:
host: "test-redis.example.com"
port: 6379
db: 0
# 预置测试账号
accounts:
super_admin:
username: "superadmin"
password: "Admin123456"
platform_admin:
username: "platform"
password: "Admin123456"
# Mock 配置
mock:
enabled: true
payment:
auto_success: true
delay_seconds: 1

View File

@@ -1,95 +0,0 @@
"""
配置管理模块
支持多环境配置切换:
- TEST_ENV=local 使用本地配置(默认)
- TEST_ENV=remote 使用远程配置
"""
import os
from pathlib import Path
from typing import Any, Optional
import yaml
class Settings:
"""配置管理器"""
_instance: Optional['Settings'] = None
_config: dict = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._load_config()
return cls._instance
def _load_config(self):
"""加载配置文件"""
env = os.getenv("TEST_ENV", "local")
config_dir = Path(__file__).parent
config_file = config_dir / f"{env}.yaml"
if not config_file.exists():
raise FileNotFoundError(f"配置文件不存在: {config_file}")
with open(config_file, "r", encoding="utf-8") as f:
self._config = yaml.safe_load(f)
print(f"[配置] 已加载 {env} 环境配置")
def get(self, key: str, default: Any = None) -> Any:
"""
获取配置值,支持点号分隔的路径
示例:
settings.get("api.base_url")
settings.get("database.host")
"""
keys = key.split(".")
value = self._config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
if value is None:
return default
return value
@property
def api_base_url(self) -> str:
return self.get("api.base_url", "http://localhost:3000")
@property
def api_timeout(self) -> int:
return self.get("api.timeout", 30)
@property
def db_config(self) -> dict:
return {
"host": self.get("database.host", "localhost"),
"port": self.get("database.port", 5432),
"database": self.get("database.name", "junhong_dev"),
"user": self.get("database.user", "postgres"),
"password": self.get("database.password", "postgres"),
}
@property
def redis_config(self) -> dict:
return {
"host": self.get("redis.host", "localhost"),
"port": self.get("redis.port", 6379),
"db": self.get("redis.db", 0),
}
def get_account(self, role: str) -> dict:
"""获取预置账号信息"""
return self.get(f"accounts.{role}", {})
# 全局配置实例
settings = Settings()

View File

@@ -1,6 +0,0 @@
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
pytest_plugins = ["fixtures.common"]

View File

@@ -1,17 +0,0 @@
from .client import APIClient, APIResponse
from .auth import AuthManager
from .database import Database
from .cleanup import CleanupTracker
from .mock import MockService
from .wait import wait_for_task, wait_for_condition
__all__ = [
"APIClient",
"APIResponse",
"AuthManager",
"Database",
"CleanupTracker",
"MockService",
"wait_for_task",
"wait_for_condition",
]

View File

@@ -1,71 +0,0 @@
import logging
from typing import Optional
from config.settings import settings
from .client import APIClient
logger = logging.getLogger(__name__)
class AuthManager:
def __init__(self, client: APIClient):
self.client = client
self._current_role: Optional[str] = None
@property
def current_role(self) -> Optional[str]:
return self._current_role
def login(self, username: str, password: str) -> bool:
resp = self.client.login(username, password)
if resp.ok():
self._current_role = "custom"
return True
logger.error(f"登录失败: {resp.msg}")
return False
def logout(self):
self.client.clear_token()
self._current_role = None
def _login_preset_account(self, role: str) -> bool:
account = settings.get_account(role)
if not account:
raise ValueError(f"未配置 {role} 账号,请检查配置文件")
if self.login(account["username"], account["password"]):
self._current_role = role
return True
return False
def as_super_admin(self) -> 'AuthManager':
self._login_preset_account("super_admin")
return self
def as_platform_admin(self) -> 'AuthManager':
self._login_preset_account("platform_admin")
return self
def as_agent(self, shop_id: int, username: Optional[str] = None, password: Optional[str] = None) -> 'AuthManager':
if username and password:
self.login(username, password)
else:
account = settings.get_account(f"agent_{shop_id}")
if account:
self.login(account["username"], account["password"])
else:
raise ValueError(f"未配置 agent_{shop_id} 账号,请提供用户名密码或在配置文件中添加")
self._current_role = f"agent_{shop_id}"
return self
def as_enterprise(self, enterprise_id: int, username: Optional[str] = None, password: Optional[str] = None) -> 'AuthManager':
if username and password:
self.login(username, password)
else:
account = settings.get_account(f"enterprise_{enterprise_id}")
if account:
self.login(account["username"], account["password"])
else:
raise ValueError(f"未配置 enterprise_{enterprise_id} 账号,请提供用户名密码或在配置文件中添加")
self._current_role = f"enterprise_{enterprise_id}"
return self

View File

@@ -1,113 +0,0 @@
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from .database import Database
logger = logging.getLogger(__name__)
TABLE_DEPENDENCIES: Dict[str, List[str]] = {
"tb_account": ["tb_shop", "tb_enterprise"],
"tb_account_role": ["tb_account", "tb_role"],
"tb_shop": [],
"tb_enterprise": ["tb_shop"],
"tb_role": [],
"tb_permission": [],
"tb_role_permission": ["tb_role", "tb_permission"],
"tb_device": ["tb_shop"],
"tb_iot_card": ["tb_shop", "tb_device"],
"tb_package": [],
"tb_package_series": [],
"tb_order": ["tb_iot_card", "tb_package"],
"tb_shop_package": ["tb_shop", "tb_package"],
"tb_shop_package_series": ["tb_shop", "tb_package_series"],
}
SOFT_DELETE_TABLES = {
"tb_account", "tb_shop", "tb_enterprise", "tb_role",
"tb_device", "tb_iot_card", "tb_package", "tb_order",
}
class CleanupTracker:
def __init__(self, db: Optional[Database] = None):
self.db = db or Database()
self._tracked: Dict[str, List[int]] = defaultdict(list)
self._tracked_queries: List[Tuple[str, str]] = []
def track(self, table: str, record_id: int):
self._tracked[table].append(record_id)
logger.debug(f"追踪: {table}#{record_id}")
def track_many(self, table: str, record_ids: List[int]):
self._tracked[table].extend(record_ids)
logger.debug(f"追踪: {table}#{record_ids}")
def track_by_query(self, table: str, where_clause: str):
self._tracked_queries.append((table, where_clause))
logger.debug(f"追踪查询: {table} WHERE {where_clause}")
def track_with_relations(self, table: str, record_id: int, relations: List[Tuple[str, str]]):
self.track(table, record_id)
for rel_table, fk_column in relations:
ids = self.db.query(
f"SELECT id FROM {rel_table} WHERE {fk_column} = %s",
(record_id,)
)
for row in ids:
self.track(rel_table, row["id"])
def cleanup(self):
logger.info("开始清理测试数据...")
for table, where_clause in reversed(self._tracked_queries):
self._delete_by_query(table, where_clause)
sorted_tables = self._sort_by_dependency()
for table in sorted_tables:
ids = self._tracked.get(table, [])
if ids:
self._delete_records(table, ids)
logger.info("测试数据清理完成")
def _sort_by_dependency(self) -> List[str]:
tables = list(self._tracked.keys())
def get_order(t: str) -> int:
deps = TABLE_DEPENDENCIES.get(t, [])
if not deps:
return 0
return max(get_order(d) for d in deps if d in tables) + 1 if any(d in tables for d in deps) else 0
return sorted(tables, key=get_order, reverse=True)
def _delete_records(self, table: str, ids: List[int]):
if not ids:
return
placeholders = ",".join(["%s"] * len(ids))
if table in SOFT_DELETE_TABLES:
sql = f"UPDATE {table} SET deleted_at = NOW() WHERE id IN ({placeholders}) AND deleted_at IS NULL"
else:
sql = f"DELETE FROM {table} WHERE id IN ({placeholders})"
try:
count = self.db.execute(sql, tuple(ids))
logger.info(f"清理 {table}: {count} 条记录")
except Exception as e:
logger.error(f"清理 {table} 失败: {e}")
def _delete_by_query(self, table: str, where_clause: str):
if table in SOFT_DELETE_TABLES:
sql = f"UPDATE {table} SET deleted_at = NOW() WHERE {where_clause} AND deleted_at IS NULL"
else:
sql = f"DELETE FROM {table} WHERE {where_clause}"
try:
count = self.db.execute(sql)
logger.info(f"清理 {table} (查询): {count} 条记录")
except Exception as e:
logger.error(f"清理 {table} (查询) 失败: {e}")

View File

@@ -1,100 +0,0 @@
import logging
from dataclasses import dataclass
from typing import Any, Optional
import requests
from config.settings import settings
logger = logging.getLogger(__name__)
@dataclass
class APIResponse:
status_code: int
code: int
msg: str
data: Any
raw: dict
def ok(self) -> bool:
return self.code == 0
def __bool__(self) -> bool:
return self.ok()
class APIClient:
def __init__(self, base_url: Optional[str] = None):
self.base_url = base_url or settings.api_base_url
self.timeout = settings.api_timeout
self.token: Optional[str] = None
self.session = requests.Session()
def set_token(self, token: str):
self.token = token
self.session.headers["Authorization"] = f"Bearer {token}"
def clear_token(self):
self.token = None
self.session.headers.pop("Authorization", None)
def _request(self, method: str, path: str, **kwargs) -> APIResponse:
url = f"{self.base_url}{path}"
kwargs.setdefault("timeout", self.timeout)
logger.info(f"{method} {path}")
try:
resp = self.session.request(method, url, **kwargs)
except requests.exceptions.RequestException as e:
logger.error(f"请求失败: {e}")
return APIResponse(status_code=0, code=-1, msg=str(e), data=None, raw={})
try:
raw = resp.json()
except ValueError:
return APIResponse(
status_code=resp.status_code, code=-1,
msg="响应不是有效的 JSON", data=None, raw={}
)
return APIResponse(
status_code=resp.status_code,
code=raw.get("code", -1),
msg=raw.get("msg", ""),
data=raw.get("data"),
raw=raw,
)
def get(self, path: str, params: Optional[dict] = None, **kwargs) -> APIResponse:
return self._request("GET", path, params=params, **kwargs)
def post(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
return self._request("POST", path, json=json, **kwargs)
def put(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
return self._request("PUT", path, json=json, **kwargs)
def delete(self, path: str, **kwargs) -> APIResponse:
return self._request("DELETE", path, **kwargs)
def patch(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
return self._request("PATCH", path, json=json, **kwargs)
def upload(self, path: str, file, field_name: str = "file", **kwargs) -> APIResponse:
files = {field_name: file}
return self._request("POST", path, files=files, **kwargs)
def login(self, username: str, password: str, login_path: str = "/api/admin/auth/login") -> APIResponse:
resp = self.post(login_path, json={
"username": username,
"password": password,
})
if resp.ok() and resp.data:
token = resp.data.get("token") or resp.data.get("access_token")
if token:
self.set_token(token)
logger.info(f"登录成功: {username}")
return resp

View File

@@ -1,69 +0,0 @@
import logging
from typing import Any, List, Optional
import psycopg2
from psycopg2.extras import RealDictCursor
from config.settings import settings
logger = logging.getLogger(__name__)
class Database:
_instance: Optional['Database'] = None
_conn = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._connect()
return cls._instance
def _connect(self):
config = settings.db_config
self._conn = psycopg2.connect(
host=config["host"],
port=config["port"],
database=config["database"],
user=config["user"],
password=config["password"],
cursor_factory=RealDictCursor,
)
self._conn.autocommit = True
logger.info(f"数据库连接成功: {config['host']}:{config['port']}/{config['database']}")
def query(self, sql: str, params: tuple = ()) -> List[dict]:
with self._conn.cursor() as cur:
cur.execute(sql, params)
return cur.fetchall()
def query_one(self, sql: str, params: tuple = ()) -> Optional[dict]:
rows = self.query(sql, params)
return rows[0] if rows else None
def scalar(self, sql: str, params: tuple = ()) -> Any:
with self._conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
if row:
return list(row.values())[0]
return None
def execute(self, sql: str, params: tuple = ()) -> int:
with self._conn.cursor() as cur:
cur.execute(sql, params)
return cur.rowcount
def execute_many(self, sql: str, params_list: List[tuple]) -> int:
with self._conn.cursor() as cur:
cur.executemany(sql, params_list)
return cur.rowcount
def close(self):
if self._conn:
self._conn.close()
logger.info("数据库连接已关闭")
def get_db() -> Database:
return Database()

View File

@@ -1,74 +0,0 @@
import logging
import time
from typing import Any, Optional
import redis
from config.settings import settings
from .database import Database
logger = logging.getLogger(__name__)
class MockService:
def __init__(self, db: Optional[Database] = None):
self.db = db or Database()
self._init_redis()
def _init_redis(self):
config = settings.redis_config
self.redis = redis.Redis(
host=config["host"],
port=config["port"],
db=config["db"],
decode_responses=True,
)
def payment_success(self, order_id: int, amount: float, delay: float = 0):
if delay > 0:
time.sleep(delay)
self.db.execute(
"UPDATE tb_order SET status = %s, paid_at = NOW(), paid_amount = %s WHERE id = %s",
("paid", int(amount * 100), order_id)
)
logger.info(f"模拟支付成功: order_id={order_id}, amount={amount}")
def payment_failed(self, order_id: int, reason: str = "支付失败"):
self.db.execute(
"UPDATE tb_order SET status = %s, fail_reason = %s WHERE id = %s",
("failed", reason, order_id)
)
logger.info(f"模拟支付失败: order_id={order_id}, reason={reason}")
def sms_code(self, phone: str, code: str, expire_seconds: int = 300):
key = f"sms:code:{phone}"
self.redis.setex(key, expire_seconds, code)
logger.info(f"模拟短信验证码: phone={phone}, code={code}")
def task_complete(self, task_type: str, task_id: int, result: Any = None):
self.db.execute(
"UPDATE tb_async_task SET status = %s, result = %s, completed_at = NOW() WHERE task_type = %s AND id = %s",
("completed", str(result) if result else None, task_type, task_id)
)
logger.info(f"模拟任务完成: {task_type}#{task_id}")
def task_failed(self, task_type: str, task_id: int, error: str):
self.db.execute(
"UPDATE tb_async_task SET status = %s, error = %s, completed_at = NOW() WHERE task_type = %s AND id = %s",
("failed", error, task_type, task_id)
)
logger.info(f"模拟任务失败: {task_type}#{task_id}, error={error}")
def card_data_balance(self, card_id: int, balance_mb: int):
self.db.execute(
"UPDATE tb_iot_card SET data_balance = %s WHERE id = %s",
(balance_mb, card_id)
)
logger.info(f"模拟卡片流量: card_id={card_id}, balance={balance_mb}MB")
def external_api_response(self, api_name: str, response: dict):
key = f"mock:api:{api_name}"
import json
self.redis.setex(key, 300, json.dumps(response))
logger.info(f"模拟外部 API: {api_name}")

View File

@@ -1,99 +0,0 @@
import logging
import time
from typing import Any, Callable, Optional
from .database import Database
logger = logging.getLogger(__name__)
class TimeoutError(Exception):
pass
def wait_for_condition(
condition: Callable[[], bool],
timeout: float = 30,
poll_interval: float = 1,
message: str = "等待条件满足",
) -> bool:
start = time.time()
while time.time() - start < timeout:
try:
if condition():
logger.info(f"{message}: 成功 (耗时 {time.time() - start:.1f}s)")
return True
except Exception as e:
logger.debug(f"{message}: 检查失败 - {e}")
time.sleep(poll_interval)
raise TimeoutError(f"{message}: 超时 ({timeout}s)")
def wait_for_task(
task_type: str,
task_id: int,
timeout: float = 60,
poll_interval: float = 2,
db: Optional[Database] = None,
) -> dict:
db = db or Database()
start = time.time()
while time.time() - start < timeout:
row = db.query_one(
"SELECT status, result, error FROM tb_async_task WHERE task_type = %s AND id = %s",
(task_type, task_id)
)
if not row:
raise ValueError(f"任务不存在: {task_type}#{task_id}")
if row["status"] in ("completed", "failed"):
logger.info(f"任务完成: {task_type}#{task_id}, status={row['status']}, 耗时 {time.time() - start:.1f}s")
return dict(row)
logger.debug(f"等待任务: {task_type}#{task_id}, 当前状态={row['status']}")
time.sleep(poll_interval)
raise TimeoutError(f"任务超时: {task_type}#{task_id} ({timeout}s)")
def wait_for_db_condition(
sql: str,
params: tuple = (),
expected: Any = True,
timeout: float = 30,
poll_interval: float = 1,
db: Optional[Database] = None,
) -> Any:
db = db or Database()
def check():
result = db.scalar(sql, params)
if callable(expected):
return expected(result)
return result == expected
wait_for_condition(check, timeout, poll_interval, f"等待 SQL 条件: {sql[:50]}...")
return db.scalar(sql, params)
def wait_for_record_count(
table: str,
where_clause: str,
expected_count: int,
timeout: float = 30,
db: Optional[Database] = None,
) -> int:
db = db or Database()
sql = f"SELECT COUNT(*) FROM {table} WHERE {where_clause}"
def check():
count = db.scalar(sql)
return count >= expected_count
wait_for_condition(check, timeout, 1, f"等待 {table} 记录数 >= {expected_count}")
return db.scalar(sql)

View File

@@ -1,3 +0,0 @@
from .common import client, auth, db, tracker, mock
__all__ = ["client", "auth", "db", "tracker", "mock"]

View File

@@ -1,38 +0,0 @@
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.client import APIClient
from core.auth import AuthManager
from core.database import Database
from core.cleanup import CleanupTracker
from core.mock import MockService
@pytest.fixture(scope="function")
def client():
return APIClient()
@pytest.fixture(scope="function")
def auth(client):
return AuthManager(client)
@pytest.fixture(scope="module")
def db():
return Database()
@pytest.fixture(scope="function")
def tracker(db):
t = CleanupTracker(db)
yield t
t.cleanup()
@pytest.fixture(scope="function")
def mock(db):
return MockService(db)

File diff suppressed because it is too large Load Diff

View File

@@ -1,30 +0,0 @@
[pytest]
# 测试目录
testpaths = tests
# Python 文件匹配
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# 默认参数
addopts =
-v
--tb=short
--strict-markers
# 标记定义
markers =
slow: 标记为慢速测试
mock: 需要 Mock 服务的测试
integration: 集成测试(需要完整环境)
smoke: 冒烟测试
# 日志配置
log_cli = true
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)s] %(message)s
log_cli_date_format = %H:%M:%S
# 超时设置(需要 pytest-timeout 插件)
# timeout = 60

View File

@@ -1,22 +0,0 @@
# 流程测试依赖
# 测试框架
pytest>=7.4.0
pytest-html>=4.1.0 # HTML 报告
pytest-ordering>=0.6 # 测试排序
# HTTP 客户端
requests>=2.31.0
urllib3>=2.0.0
# 数据库
psycopg2-binary>=2.9.9 # PostgreSQL
redis>=5.0.0 # Redis
# 配置管理
pyyaml>=6.0.1
# 工具
python-dotenv>=1.0.0 # 环境变量
tabulate>=0.9.0 # 表格输出
colorama>=0.4.6 # 彩色输出

View File

@@ -1,61 +0,0 @@
"""
示例流程测试
本文件展示如何编写流程测试,供参考。
删除本文件后不影响测试框架运行。
"""
import pytest
class TestExampleFlow:
"""示例:账号登录流程"""
def test_admin_login_flow(self, client, auth):
"""
流程:超级管理员登录
步骤:
1. 使用超级管理员账号登录
2. 验证能获取用户信息
"""
# === 1. 登录 ===
auth.as_super_admin()
# === 2. 获取用户信息 ===
resp = client.get("/api/admin/auth/me")
# 如果接口存在,验证返回
if resp.status_code == 200:
assert resp.ok(), f"获取用户信息失败: {resp.msg}"
assert resp.data is not None
else:
pytest.skip("接口不存在,跳过测试")
@pytest.mark.skip(reason="示例测试,实际使用时删除此标记")
def test_create_shop_flow(self, client, auth, tracker):
"""
流程:创建店铺
步骤:
1. 平台管理员登录
2. 创建店铺
3. 验证店铺创建成功
"""
# === 1. 登录 ===
auth.as_platform_admin()
# === 2. 创建店铺 ===
resp = client.post("/api/admin/shops", json={
"shop_name": "测试店铺_流程测试",
"contact_name": "测试联系人",
"contact_phone": "13800138000",
})
assert resp.ok(), f"创建店铺失败: {resp.msg}"
shop_id = resp.data["id"]
tracker.track("tb_shop", shop_id)
# === 3. 验证店铺存在 ===
resp = client.get(f"/api/admin/shops/{shop_id}")
assert resp.ok()
assert resp.data["shop_name"] == "测试店铺_流程测试"