|
from typing import Dict |
|
from uuid import UUID |
|
import asyncio |
|
from fastapi import WebSocket |
|
from types import SimpleNamespace |
|
from typing import Dict |
|
from typing import Union |
|
|
|
UserDataContent = Dict[UUID, Dict[str, Union[WebSocket, asyncio.Queue]]] |
|
|
|
|
|
class UserData: |
|
def __init__(self): |
|
self.data_content: Dict[UUID, UserDataContent] = {} |
|
|
|
async def create_user(self, user_id: UUID, websocket: WebSocket): |
|
self.data_content[user_id] = { |
|
"websocket": websocket, |
|
"queue": asyncio.Queue(), |
|
} |
|
await asyncio.sleep(1) |
|
|
|
def check_user(self, user_id: UUID) -> bool: |
|
return user_id in self.data_content |
|
|
|
async def update_data(self, user_id: UUID, new_data: SimpleNamespace): |
|
user_session = self.data_content[user_id] |
|
queue = user_session["queue"] |
|
while not queue.empty(): |
|
try: |
|
queue.get_nowait() |
|
except asyncio.QueueEmpty: |
|
continue |
|
await queue.put(new_data) |
|
|
|
async def get_latest_data(self, user_id: UUID) -> SimpleNamespace: |
|
user_session = self.data_content[user_id] |
|
queue = user_session["queue"] |
|
|
|
try: |
|
return await queue.get() |
|
except asyncio.QueueEmpty: |
|
return None |
|
|
|
def delete_user(self, user_id: UUID): |
|
user_session = self.data_content[user_id] |
|
queue = user_session["queue"] |
|
while not queue.empty(): |
|
try: |
|
queue.get_nowait() |
|
except asyncio.QueueEmpty: |
|
continue |
|
if user_id in self.data_content: |
|
del self.data_content[user_id] |
|
|
|
def get_user_count(self) -> int: |
|
return len(self.data_content) |
|
|
|
def get_websocket(self, user_id: UUID) -> WebSocket: |
|
return self.data_content[user_id]["websocket"] |
|
|
|
|
|
user_data = UserData() |
|
|