Spaces:
Running
on
A10G
Running
on
A10G
from typing import Dict | |
from uuid import UUID | |
import asyncio | |
from fastapi import WebSocket | |
from types import SimpleNamespace | |
from typing import Dict | |
from typing import Union | |
UserDataContent = Dict[UUID, Dict[str, Union[WebSocket, asyncio.Queue]]] | |
class UserData: | |
def __init__(self): | |
self.data_content: Dict[UUID, UserDataContent] = {} | |
async def create_user(self, user_id: UUID, websocket: WebSocket): | |
self.data_content[user_id] = { | |
"websocket": websocket, | |
"queue": asyncio.Queue(), | |
} | |
await asyncio.sleep(1) | |
def check_user(self, user_id: UUID) -> bool: | |
return user_id in self.data_content | |
async def update_data(self, user_id: UUID, new_data: SimpleNamespace): | |
user_session = self.data_content[user_id] | |
queue = user_session["queue"] | |
while not queue.empty(): | |
try: | |
queue.get_nowait() | |
except asyncio.QueueEmpty: | |
continue | |
await queue.put(new_data) | |
async def get_latest_data(self, user_id: UUID) -> SimpleNamespace: | |
user_session = self.data_content[user_id] | |
queue = user_session["queue"] | |
try: | |
return await queue.get() | |
except asyncio.QueueEmpty: | |
return None | |
def delete_user(self, user_id: UUID): | |
user_session = self.data_content[user_id] | |
queue = user_session["queue"] | |
while not queue.empty(): | |
try: | |
queue.get_nowait() | |
except asyncio.QueueEmpty: | |
continue | |
if user_id in self.data_content: | |
del self.data_content[user_id] | |
def get_user_count(self) -> int: | |
return len(self.data_content) | |
def get_websocket(self, user_id: UUID) -> WebSocket: | |
return self.data_content[user_id]["websocket"] | |
user_data = UserData() | |