customeragent-api / server /tests /test_websocket.py
anasraza526's picture
Clean deploy to Hugging Face
ac90985
#!/usr/bin/env python3
import asyncio
import json
from app.websocket import manager
async def test_send_message():
"""Test sending a message to all connected widgets"""
print("Active connections:", list(manager.active_connections.keys()))
test_message = {
"type": "admin_response",
"message": "Test message from server",
"sender": "Support Team"
}
# Send to all active connections
sent_count = 0
for conn_id, websocket in manager.active_connections.items():
try:
await websocket.send_text(json.dumps(test_message))
print(f"Sent test message to {conn_id}")
sent_count += 1
except Exception as e:
print(f"Failed to send to {conn_id}: {e}")
print(f"Sent test message to {sent_count} connections")
if __name__ == "__main__":
asyncio.run(test_send_message())