Spaces:
Runtime error
Runtime error
File size: 1,814 Bytes
3ec4ff0 bc0521a 3ec4ff0 bc0521a d875f25 3ec4ff0 d875f25 3ec4ff0 bc0521a 3ec4ff0 d875f25 3ec4ff0 d875f25 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
"""FastAPI application factory."""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .routes import v1_router
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(v1_router)
@app.on_event("startup")
async def startup_event():
"""Run startup events."""
pass
@app.on_event("shutdown")
async def shutdown_event():
"""Run shutdown events."""
pass
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received: {data}")
await manager.broadcast(f"Message from client: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast("A client disconnected")
return app
# Create default app instance
app = create_app()
|