tsunami / src /tsuwave /api /websocket.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""WebSocket real-time communication"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, WebSocketException
from typing import Set, Dict, Any
import json
import asyncio
import logging
from datetime import datetime
router = APIRouter()
# Store active connections
active_connections: Set[WebSocket] = set()
connection_info: Dict[WebSocket, dict] = {}
logger = logging.getLogger(__name__)
class ConnectionManager:
"""Manage WebSocket connections"""
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.connection_info: Dict[WebSocket, dict] = {}
async def connect(self, websocket: WebSocket, client_info: dict):
await websocket.accept()
self.active_connections.add(websocket)
self.connection_info[websocket] = {
"connected_at": datetime.utcnow().isoformat(),
"client_info": client_info,
"subscriptions": set()
}
logger.info(f"WebSocket connected: {client_info}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
if websocket in self.connection_info:
del self.connection_info[websocket]
logger.info("WebSocket disconnected")
async def send_personal_message(self, message: dict, websocket: WebSocket):
try:
await websocket.send_json(message)
except:
self.disconnect(websocket)
async def broadcast(self, message: dict):
"""Broadcast message to all connected clients"""
disconnected = set()
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
disconnected.add(connection)
# Clean up disconnected
for conn in disconnected:
self.disconnect(conn)
def subscribe(self, websocket: WebSocket, topic: str):
"""Subscribe client to a topic"""
if websocket in self.connection_info:
self.connection_info[websocket]["subscriptions"].add(topic)
def unsubscribe(self, websocket: WebSocket, topic: str):
"""Unsubscribe client from a topic"""
if websocket in self.connection_info:
self.connection_info[websocket]["subscriptions"].discard(topic)
def get_subscribers(self, topic: str) -> Set[WebSocket]:
"""Get all clients subscribed to a topic"""
subscribers = set()
for ws, info in self.connection_info.items():
if topic in info["subscriptions"]:
subscribers.add(ws)
return subscribers
manager = ConnectionManager()
@router.websocket("/v1/realtime")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time data"""
# Get client info from query parameters
client_info = {
"user_agent": websocket.headers.get("user-agent", "unknown"),
"client_id": websocket.query_params.get("client_id", "anonymous"),
"protocol": websocket.headers.get("sec-websocket-protocol", "none")
}
await manager.connect(websocket, client_info)
try:
# Send initial connection confirmation
await manager.send_personal_message({
"type": "connection",
"status": "connected",
"timestamp": datetime.utcnow().isoformat(),
"message": "Connected to TSU-WAVE real-time stream"
}, websocket)
# Handle incoming messages
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
await handle_message(websocket, message)
except json.JSONDecodeError:
await manager.send_personal_message({
"type": "error",
"message": "Invalid JSON format"
}, websocket)
except Exception as e:
await manager.send_personal_message({
"type": "error",
"message": str(e)
}, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
manager.disconnect(websocket)
async def handle_message(websocket: WebSocket, message: dict):
"""Handle incoming WebSocket messages"""
msg_type = message.get("type", "")
if msg_type == "subscribe":
# Subscribe to topics
topics = message.get("topics", [])
for topic in topics:
manager.subscribe(websocket, topic)
await manager.send_personal_message({
"type": "subscribed",
"topics": topics,
"timestamp": datetime.utcnow().isoformat()
}, websocket)
elif msg_type == "unsubscribe":
# Unsubscribe from topics
topics = message.get("topics", [])
for topic in topics:
manager.unsubscribe(websocket, topic)
await manager.send_personal_message({
"type": "unsubscribed",
"topics": topics
}, websocket)
elif msg_type == "ping":
# Ping-pong for connection testing
await manager.send_personal_message({
"type": "pong",
"timestamp": datetime.utcnow().isoformat()
}, websocket)
elif msg_type == "request":
# Request specific data
data_type = message.get("data_type")
if data_type == "parameters":
await send_parameters(websocket, message.get("zone", "global"))
elif data_type == "chi":
await send_chi(websocket, message.get("zone", "global"))
elif data_type == "event":
await send_event(websocket, message.get("event_id"))
else:
await manager.send_personal_message({
"type": "error",
"message": f"Unknown data type: {data_type}"
}, websocket)
else:
await manager.send_personal_message({
"type": "error",
"message": f"Unknown message type: {msg_type}"
}, websocket)
async def send_parameters(websocket: WebSocket, zone: str):
"""Send parameter data to client"""
# Placeholder for parameter data
await manager.send_personal_message({
"type": "parameters",
"zone": zone,
"timestamp": datetime.utcnow().isoformat(),
"data": {
"wcc": 1.31,
"kpr": 1.44,
"hfsi": 0.63,
"becf": 4.80,
"sdb": 1.20,
"sbsp": 0.67,
"smvi": 0.29
}
}, websocket)
async def send_chi(websocket: WebSocket, zone: str):
"""Send CHI data to client"""
await manager.send_personal_message({
"type": "chi",
"zone": zone,
"timestamp": datetime.utcnow().isoformat(),
"data": {
"value": 0.72,
"status": "HIGH",
"contributions": {
"wcc": 0.12,
"kpr": 0.19,
"hfsi": 0.24,
"becf": 0.21,
"sdb": 0.08,
"sbsp": 0.11,
"smvi": 0.05
}
}
}, websocket)
async def send_event(websocket: WebSocket, event_id: str):
"""Send event data to client"""
await manager.send_personal_message({
"type": "event",
"event_id": event_id,
"timestamp": datetime.utcnow().isoformat(),
"data": {
"name": "Tōhoku 2011",
"magnitude": 9.0,
"runup": 40.5,
"status": "archived"
}
}, websocket)
# Background task for broadcasting updates
async def broadcast_updates():
"""Broadcast periodic updates to all connected clients"""
while True:
await asyncio.sleep(5) # Update every 5 seconds
if manager.active_connections:
# Get subscribers for each topic
chi_subscribers = manager.get_subscribers("chi")
param_subscribers = manager.get_subscribers("parameters")
# Send updates to appropriate subscribers
if chi_subscribers:
update = {
"type": "chi_update",
"timestamp": datetime.utcnow().isoformat(),
"data": {
"global": 0.72,
"hilo_bay": 0.65,
"miyako": 0.91
}
}
for ws in chi_subscribers:
try:
await ws.send_json(update)
except:
pass
if param_subscribers:
update = {
"type": "parameters_update",
"timestamp": datetime.utcnow().isoformat(),
"data": {
"wcc": 1.31 + 0.01,
"kpr": 1.44 + 0.02,
"hfsi": 0.63 - 0.01
}
}
for ws in param_subscribers:
try:
await ws.send_json(update)
except:
pass
# Start background task on router initialization
@router.on_event("startup")
async def startup_event():
asyncio.create_task(broadcast_updates())