# https://redis.io/docs/clients/python/ from redis import Redis from dotenv import load_dotenv from uuid import uuid4 load_dotenv() import os import json from redis.commands.json.path import Path from typing import TypedDict, List, Tuple, Optional class User(TypedDict): username: str uid: str class Chat(TypedDict): patient: str messages: List[Tuple[str, str]] def get_client() -> Redis: client = Redis( host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], password=os.environ["REDIS_PASSWORD"], decode_responses=True, ) return client def create_user(client: Redis, user: User): maybe_user = get_user_by_username(client, user["username"]) if not maybe_user: uid = uuid4() user["uid"] = str(uid) client.json().set(f"user:by-uid:{uid}", "$", user, nx=True) client.set(f"user:uuid:by-username:{user['username']}", str(uid), nx=True) client.json().set(f"chats:by-user-uid:{uid}", "$", []) else: print(f"User already existed with username={user['username']}") def get_user_by_username(client: Redis, username: str) -> Optional[User]: uid = client.get(f"user:uuid:by-username:{username}") user = client.json().get(f"user:by-uid:{uid}") return user def get_user_chat_by_uid(client: Redis, uid: str) -> List[Chat]: chats = client.json().get(f"chats:by-user-uid:{uid}") return chats def add_chat_by_uid(client: Redis, chat: Chat, uid: str): client.json().arrappend(f"chats:by-user-uid:{uid}", "$", chat) if __name__ == "__main__": client = get_client() user = User(username="foo", uid="afa03747-a9ce-4200-964c-7816ec381fa9") chat = Chat(patient="foo", messages=[("a", "b"), ("c", "d")]) create_user(client, user) user = get_user_by_username(client, user['username']) add_chat_by_uid(client, chat, user['uid']) chats = get_user_chat_by_uid(client, user['uid']) print(chats)