Spaces:
Paused
Paused
| from typing import Optional | |
| from fastapi.websockets import WebSocket | |
| from websockets import ConnectionClosedError | |
| class Accelerator: | |
| def __del__(self): self.ws.close() | |
| ws: Optional[WebSocket] = None | |
| def connected(self): return self.ws != None | |
| async def connect(self, ws: WebSocket): | |
| await ws.accept() | |
| self.ws = ws | |
| async def accelerate(self, input): | |
| try: | |
| await self.ws.send_text(input) | |
| return await self.ws.receive_text() | |
| except: | |
| self.ws = None | |
| return None | |