All checks were successful
构建并部署到测试环境(无 SSH) / build-and-deploy (push) Successful in 6m39s
- 新增店铺角色管理 API 和数据模型 - 实现角色继承和权限检查逻辑 - 添加流程测试框架和集成测试 - 更新权限服务和账号管理逻辑 - 添加数据库迁移脚本 - 归档 OpenSpec 变更文档 Ultraworked with Sisyphus
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
import requests
|
|
|
|
from config.settings import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class APIResponse:
|
|
status_code: int
|
|
code: int
|
|
msg: str
|
|
data: Any
|
|
raw: dict
|
|
|
|
def ok(self) -> bool:
|
|
return self.code == 0
|
|
|
|
def __bool__(self) -> bool:
|
|
return self.ok()
|
|
|
|
|
|
class APIClient:
|
|
def __init__(self, base_url: Optional[str] = None):
|
|
self.base_url = base_url or settings.api_base_url
|
|
self.timeout = settings.api_timeout
|
|
self.token: Optional[str] = None
|
|
self.session = requests.Session()
|
|
|
|
def set_token(self, token: str):
|
|
self.token = token
|
|
self.session.headers["Authorization"] = f"Bearer {token}"
|
|
|
|
def clear_token(self):
|
|
self.token = None
|
|
self.session.headers.pop("Authorization", None)
|
|
|
|
def _request(self, method: str, path: str, **kwargs) -> APIResponse:
|
|
url = f"{self.base_url}{path}"
|
|
kwargs.setdefault("timeout", self.timeout)
|
|
logger.info(f"{method} {path}")
|
|
|
|
try:
|
|
resp = self.session.request(method, url, **kwargs)
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"请求失败: {e}")
|
|
return APIResponse(status_code=0, code=-1, msg=str(e), data=None, raw={})
|
|
|
|
try:
|
|
raw = resp.json()
|
|
except ValueError:
|
|
return APIResponse(
|
|
status_code=resp.status_code, code=-1,
|
|
msg="响应不是有效的 JSON", data=None, raw={}
|
|
)
|
|
|
|
return APIResponse(
|
|
status_code=resp.status_code,
|
|
code=raw.get("code", -1),
|
|
msg=raw.get("msg", ""),
|
|
data=raw.get("data"),
|
|
raw=raw,
|
|
)
|
|
|
|
def get(self, path: str, params: Optional[dict] = None, **kwargs) -> APIResponse:
|
|
return self._request("GET", path, params=params, **kwargs)
|
|
|
|
def post(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
|
|
return self._request("POST", path, json=json, **kwargs)
|
|
|
|
def put(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
|
|
return self._request("PUT", path, json=json, **kwargs)
|
|
|
|
def delete(self, path: str, **kwargs) -> APIResponse:
|
|
return self._request("DELETE", path, **kwargs)
|
|
|
|
def patch(self, path: str, json: Optional[dict] = None, **kwargs) -> APIResponse:
|
|
return self._request("PATCH", path, json=json, **kwargs)
|
|
|
|
def upload(self, path: str, file, field_name: str = "file", **kwargs) -> APIResponse:
|
|
files = {field_name: file}
|
|
return self._request("POST", path, files=files, **kwargs)
|
|
|
|
def login(self, username: str, password: str, login_path: str = "/api/admin/auth/login") -> APIResponse:
|
|
resp = self.post(login_path, json={
|
|
"username": username,
|
|
"password": password,
|
|
})
|
|
|
|
if resp.ok() and resp.data:
|
|
token = resp.data.get("token") or resp.data.get("access_token")
|
|
if token:
|
|
self.set_token(token)
|
|
logger.info(f"登录成功: {username}")
|
|
|
|
return resp
|