| |
|
| | import httpx
|
| | import asyncio
|
| | import json
|
| |
|
| |
|
| |
|
| | async def test_pool_service():
|
| | base_url = "http://localhost:8019"
|
| |
|
| | async with httpx.AsyncClient() as client:
|
| |
|
| | try:
|
| | resp = await client.get(base_url, timeout=5)
|
| | print(f"根路径测试: {resp.status_code}")
|
| | print(f"响应: {resp.json()}")
|
| | except Exception as e:
|
| | print(f"根路径测试失败: {e}")
|
| |
|
| |
|
| | try:
|
| | resp = await client.get(f"{base_url}/api/status", timeout=5)
|
| | print(f"\n状态接口测试: {resp.status_code}")
|
| | print(f"响应: {json.dumps(resp.json(), indent=2)}")
|
| | except Exception as e:
|
| | print(f"状态接口测试失败: {e}")
|
| |
|
| |
|
| | try:
|
| | resp = await client.post(
|
| | f"{base_url}/api/accounts/allocate",
|
| | json={"count": 1, "session_duration": 1800},
|
| | timeout=10
|
| | )
|
| | print(f"\n分配账号测试: {resp.status_code}")
|
| | if resp.status_code == 200:
|
| | data = resp.json()
|
| | print(f"成功分配,会话ID: {data.get('session_id')}")
|
| | print(f"账号数量: {len(data.get('accounts', []))}")
|
| | else:
|
| | print(f"分配失败: {resp.text}")
|
| | except Exception as e:
|
| | print(f"分配账号测试失败: {e}")
|
| |
|
| |
|
| | async def main():
|
| | await test_pool_service()
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | asyncio.run(main())
|
| |
|