Quasar-Executo / test_websocket.py
KarlQuant's picture
Upload 2 files
6578cff verified
#!/usr/bin/env python3
"""
WebSocket Test Client β€” Monitor what the hub is broadcasting
No external dependencies beyond websockets (already installed)
Usage:
python3 test_websocket.py [--subscribe|--publish] [--space ASSET_NAME]
"""
import asyncio
import json
import sys
import websockets
from datetime import datetime
async def test_subscribe(hub_url: str):
"""Listen to what the hub is broadcasting."""
print(f"[*] Connecting to hub subscriber at {hub_url}/ws/subscribe")
try:
async with websockets.connect(f"{hub_url}/ws/subscribe") as ws:
print(f"[βœ“] Connected! Listening for metrics updates...\n")
count = 0
while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=10.0)
count += 1
data = json.loads(msg)
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] Message #{count}:")
print(f" {json.dumps(data, indent=2)}\n")
except asyncio.TimeoutError:
print("[!] No messages received for 10 seconds...")
print(" β†’ Asset spaces may not be connected yet")
sys.exit(1)
except Exception as e:
print(f"[βœ—] Connection failed: {e}")
print(f" Make sure hub is running and accessible at {hub_url}")
sys.exit(1)
async def test_publish(hub_url: str, space_name: str):
"""Send a test metric to the hub."""
print(f"[*] Connecting to hub publisher for space: {space_name}")
test_message = {
"training": {
"training_steps": 9999,
"actor_loss": 0.123,
"critic_loss": 0.456,
"avn_loss": 0.789,
"avn_accuracy": 0.95,
},
"voting": {
"dominant_signal": "BUY",
"buy_count": 42,
"sell_count": 18,
}
}
try:
uri = f"{hub_url}/ws/publish/{space_name}"
print(f"[*] Connecting to {uri}")
async with websockets.connect(uri) as ws:
print(f"[βœ“] Connected! Sending test message...")
await ws.send(json.dumps(test_message))
print(f"[βœ“] Sent:\n{json.dumps(test_message, indent=2)}")
# Keep connection open for 5 seconds
print(f"[*] Keeping connection open for 5 seconds...")
await asyncio.sleep(5)
print(f"[βœ“] Done!")
except Exception as e:
print(f"[βœ—] Error: {e}")
sys.exit(1)
async def main():
# Default hub URL (adjust if needed)
hub_url = "ws://127.0.0.1:7860"
if len(sys.argv) > 1:
if "--subscribe" in sys.argv:
print("=" * 60)
print("QUASAR Hub WebSocket Monitor (Subscribe Mode)")
print("=" * 60)
await test_subscribe(hub_url)
elif "--publish" in sys.argv:
space_name = "TEST_ASSET"
if "--space" in sys.argv:
idx = sys.argv.index("--space")
if idx + 1 < len(sys.argv):
space_name = sys.argv[idx + 1]
print("=" * 60)
print(f"QUASAR Hub WebSocket Test (Publish Mode)")
print("=" * 60)
await test_publish(hub_url, space_name)
else:
print_usage()
else:
print_usage()
def print_usage():
print("""
╔════════════════════════════════════════════════════════════════╗
β•‘ QUASAR WebSocket Test Tool v1.0 β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
USAGE:
# Monitor what hub is broadcasting (metrics from all spaces)
python3 test_websocket.py --subscribe
# Send a test metric to hub (publish as a space)
python3 test_websocket.py --publish --space TEST_ASSET
# Send test metrics with a different space name
python3 test_websocket.py --publish --space V100_1h
EXAMPLE WORKFLOW:
Terminal 1 (Monitor hub):
$ python3 test_websocket.py --subscribe
[βœ“] Connected! Listening for metrics updates...
Terminal 2 (Send test data):
$ python3 test_websocket.py --publish --space V100_1h
[βœ“] Connected! Sending test message...
[βœ“] Sent:
{
"training": {...},
"voting": {...}
}
Terminal 1 (should see the message):
[12:34:56] Message #1:
{
"space_name": "V100_1h",
"training": {...},
"voting": {...}
}
TROUBLESHOOTING:
"Connection refused" β†’ Hub not running on port 7860
$ curl http://127.0.0.1:7860/api/health
No messages on subscribe β†’ Asset spaces not connected
Check if asset spaces are running and sending data
"Module not found: websockets" β†’ Install it
$ pip install websockets
""")
if __name__ == "__main__":
asyncio.run(main())