from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware import uvicorn from collections import defaultdict, deque import json import os import asyncio import logging from typing import Dict, List, Optional import time # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # CORS for frontend app.add_middleware( CORSMiddleware, allow_origins=["https://webrtc-call-chi.vercel.app/", "http://localhost:5173", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # In-memory state active_users: Dict[str, WebSocket] = {} user_status: Dict[str, str] = {} # idle, calling, in_call call_sessions: Dict[str, Dict] = {} # Track active calls visitors_total = 0 recent_latencies: deque = deque(maxlen=100) # Config ice_servers = [ {"urls": "stun:stun.l.google.com:19302"}, {"urls": "stun:stun1.l.google.com:19302"} ] @app.get("/") async def root(): return { "status": "FastAPI WebRTC backend running", "version": "2.0", "endpoints": ["/stats", "/config", "/ws"] } @app.get("/stats") async def stats(): avg_latency = sum(recent_latencies) / len(recent_latencies) if recent_latencies else 0 in_call_count = sum(1 for status in user_status.values() if status == "in_call") response = { "activeCount": len(active_users), "visitorCount": visitors_total, "latency": int(avg_latency), "inCallCount": in_call_count, "activeCalls": len(call_sessions) } logger.info(f"Stats requested: {response}") return response @app.get("/config") async def config(): return {"iceServers": ice_servers} @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): global visitors_total visitors_total += 1 await websocket.accept() user = None try: while True: data = await websocket.receive_text() msg = json.loads(data) msg_type = msg.get('type') logger.info(f"Received message: {msg_type} from {user}") if msg_type == 'join': username = msg.get('username', '').strip() # Validate username if not username or len(username) > 20: await websocket.send_json({ "type": "join_error", "reason": "Invalid username (1-20 characters required)" }) continue if username in active_users: await websocket.send_json({ "type": "join_error", "reason": "Username already taken" }) continue # Join successful user = username active_users[username] = websocket user_status[username] = "idle" await websocket.send_json({ "type": "join_ok", "username": username, "users": list(active_users.keys()) }) await broadcast_user_list() logger.info(f"User {username} joined successfully") elif user is None: await websocket.send_json({ "type": "error", "reason": "Must join with username first" }) continue elif msg_type == 'call_request': target = msg.get('to') await handle_call_request(user, target, websocket) elif msg_type == 'call_response': await handle_call_response(msg, user) elif msg_type in ['offer', 'answer', 'ice']: await handle_webrtc_signal(msg, user) elif msg_type == 'call_end': await handle_call_end(user, msg.get('to')) elif msg_type == 'ping': timestamp = msg.get('ts', time.time() * 1000) await websocket.send_json({ "type": "pong", "ts": timestamp }) # Calculate latency if 'ts' in msg: latency = int(time.time() * 1000) - msg['ts'] recent_latencies.append(latency) else: await websocket.send_json({ "type": "error", "reason": f"Unknown message type: {msg_type}" }) except WebSocketDisconnect: await handle_user_disconnect(user) except Exception as e: logger.error(f"WebSocket error for user {user}: {e}") await handle_user_disconnect(user) async def handle_call_request(caller: str, target: str, caller_ws: WebSocket): """Handle incoming call request""" if not target or target not in active_users: await caller_ws.send_json({ "type": "call_error", "reason": "User not found or offline" }) return if target == caller: await caller_ws.send_json({ "type": "call_error", "reason": "Cannot call yourself" }) return caller_status = user_status.get(caller, "idle") target_status = user_status.get(target, "idle") if caller_status != "idle": await caller_ws.send_json({ "type": "call_error", "reason": "You are already in a call" }) return if target_status != "idle": await caller_ws.send_json({ "type": "call_error", "reason": f"{target} is busy (already in a call)" }) return # Update status user_status[caller] = "calling" user_status[target] = "calling" # Send call notification to target target_ws = active_users[target] await target_ws.send_json({ "type": "incoming_call", "from": caller, "message": f"{caller} wants to connect with you" }) # Notify caller that request was sent await caller_ws.send_json({ "type": "call_sent", "to": target, "message": f"Calling {target}..." }) logger.info(f"Call request: {caller} -> {target}") async def handle_call_response(msg: dict, responder: str): """Handle call accept/reject response""" caller = msg.get('caller') accepted = msg.get('accepted', False) if not caller or caller not in active_users: return caller_ws = active_users[caller] responder_ws = active_users[responder] if accepted: # Create call session call_id = f"{caller}_{responder}_{int(time.time())}" call_sessions[call_id] = { "participants": [caller, responder], "started_at": time.time() } user_status[caller] = "in_call" user_status[responder] = "in_call" # Notify both parties await caller_ws.send_json({ "type": "call_accepted", "by": responder, "call_id": call_id }) await responder_ws.send_json({ "type": "call_started", "with": caller, "call_id": call_id }) logger.info(f"Call accepted: {caller} <-> {responder}") else: # Call rejected user_status[caller] = "idle" user_status[responder] = "idle" await caller_ws.send_json({ "type": "call_rejected", "by": responder, "reason": msg.get('reason', 'Call declined') }) logger.info(f"Call rejected: {caller} -> {responder}") await broadcast_user_list() async def handle_webrtc_signal(msg: dict, sender: str): """Forward WebRTC signals between peers""" target = msg.get('to') if target and target in active_users: target_ws = active_users[target] await target_ws.send_json({ **msg, "from": sender }) logger.debug(f"Forwarded {msg['type']} signal: {sender} -> {target}") async def handle_call_end(user: str, target: str = None): """Handle call termination""" # Find the call session call_to_end = None for call_id, session in call_sessions.items(): if user in session["participants"]: call_to_end = call_id if not target: # Find the other participant target = next((p for p in session["participants"] if p != user), None) break if call_to_end: participants = call_sessions[call_to_end]["participants"] del call_sessions[call_to_end] # Reset status for all participants for participant in participants: if participant in user_status: user_status[participant] = "idle" # Notify other participant(s) for participant in participants: if participant != user and participant in active_users: await active_users[participant].send_json({ "type": "call_ended", "by": user, "message": f"{user} has left the call" }) logger.info(f"Call ended by {user}, participants: {participants}") await broadcast_user_list() async def handle_user_disconnect(user: str): """Handle user disconnection""" if user and user in active_users: # End any active calls await handle_call_end(user) # Remove user del active_users[user] if user in user_status: del user_status[user] logger.info(f"User {user} disconnected") await broadcast_user_list() async def broadcast_user_list(): """Broadcast updated user list to all connected users""" if not active_users: return user_list_data = [] for user, status in user_status.items(): user_list_data.append({ "username": user, "status": status }) message = { "type": "user_list_update", "users": user_list_data, "total": len(active_users) } disconnected_users = [] for username, websocket in active_users.items(): try: await websocket.send_json(message) except Exception as e: logger.error(f"Failed to send to {username}: {e}") disconnected_users.append(username) # Clean up disconnected users for username in disconnected_users: await handle_user_disconnect(username) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) logger.info(f"Starting server on port {port}") uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")