import os from fastapi import Depends, FastAPI import sqlite3 import requests import uvicorn from pathlib import Path import json app = FastAPI() LIVEBLOCKS_SECRET = os.environ.get("LIVEBLOCKS_SECRET") DB_PATH = Path("rooms.db") if not DB_PATH.exists(): print("Creating database") print("DB_PATH", DB_PATH) db = sqlite3.connect(DB_PATH) with open(Path("schema.sql"), "r") as f: db.executescript(f.read()) db.commit() db.close() def get_db(): db = sqlite3.connect(Path("./rooms.db"), check_same_thread=False) print("Connected to database") db.row_factory = sqlite3.Row try: yield db except Exception: db.rollback() finally: db.close() app = FastAPI() rooms = ["room-" + str(i) for i in range(0, 41)] @app.get("/") async def read_root(db: sqlite3.Connection = Depends(get_db)): out = db.execute("SELECT * FROM rooms").fetchall() print(out) return out @app.get("/create-rooms") async def create_room(db: sqlite3.Connection = Depends(get_db)): for room_id in rooms: print(room_id) createRoom(room_id, db) all = db.execute("SELECT * FROM rooms").fetchall() return all def createRoom(room_id, db): payload = {"id": room_id, "defaultAccesses": ["room:write"]} response = requests.post(f"https://api.liveblocks.io/v2/rooms", headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}, json=payload) # if response.status_code == 200: data = response.json() print(data) if "error" in data and data["error"] == "ROOM_ALREADY_EXISTS": print("Room already exists") cursor = db.cursor() cursor.execute("INSERT INTO rooms (room_id) VALUES (?)", (room_id,)) db.commit() print("Room created") print("Created room", room_id) return True def generateAuthToken(): response = requests.get(f"https://liveblocks.io/api/authorize", headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}) if response.status_code == 200: data = response.json() return data["token"] else: raise Exception(response.status_code, response.text) def get_room_count(room_id: str, jwtToken: str = ''): print("Getting room count" + room_id) response = requests.get( f"https://liveblocks.net/api/v1/room/{room_id}/users", headers={"Authorization": f"Bearer {jwtToken}", "Content-Type": "application/json"}) if response.status_code == 200: res = response.json() if "data" in res: return len(res["data"]) else: return 0 raise Exception("Error getting room count") @app.get("/sync-rooms") async def sync_rooms(db: sqlite3.Connection = Depends(get_db)): try: jwtToken = generateAuthToken() rooms = db.execute("SELECT * FROM rooms").fetchall() for row in rooms: room_id = row["room_id"] users_count = get_room_count(room_id, jwtToken) print("Updating room", room_id, "with", users_count, "users") cursor = db.cursor() cursor.execute( "UPDATE rooms SET users_count = ? WHERE room_id = ?", (users_count, room_id)) db.commit() data = db.execute("SELECT * FROM rooms").fetchall() return data except Exception as e: print(e) return {"error": str(e)} if __name__ == "__main__": uvicorn.run("createRooms:app", host="0.0.0.0", log_level="debug", reload=True)