radames's picture
change init room logic
80e8f4d
raw history blame
No virus
3.89 kB
import os
from fastapi import Depends, FastAPI
import sqlite3
import requests
import uvicorn
from pathlib import Path
import json
app = FastAPI()
LIVEBLOCKS_SECRET = os.environ.get("LIVEBLOCKS_SECRET")
DB_PATH = Path("rooms.db")
if not DB_PATH.exists():
print("Creating database")
print("DB_PATH", DB_PATH)
db = sqlite3.connect(DB_PATH)
with open(Path("schema.sql"), "r") as f:
db.executescript(f.read())
db.commit()
db.close()
def get_db():
db = sqlite3.connect(Path("./rooms.db"), check_same_thread=False)
print("Connected to database")
db.row_factory = sqlite3.Row
try:
yield db
except Exception:
db.rollback()
finally:
db.close()
app = FastAPI()
rooms = ["room-" + str(i) for i in range(0, 41)]
@app.get("/")
async def read_root(db: sqlite3.Connection = Depends(get_db)):
out = db.execute("SELECT * FROM rooms").fetchall()
print(out)
return out
@app.get("/create-rooms")
async def create_room(db: sqlite3.Connection = Depends(get_db)):
for room_id in rooms:
print(room_id)
createRoom(room_id, db)
all = db.execute("SELECT * FROM rooms").fetchall()
return all
@app.get("/create-one-room/{room_id}")
async def create_one_room(room_id: str):
payload = {"id": room_id, "defaultAccesses": ["room:write"]}
response = requests.post(f"https://api.liveblocks.io/v2/rooms",
headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}, json=payload)
data = response.json()
return data
def createRoom(room_id, db):
payload = {"id": room_id, "defaultAccesses": ["room:write"]}
response = requests.post(f"https://api.liveblocks.io/v2/rooms",
headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"}, json=payload)
# if response.status_code == 200:
data = response.json()
print(data)
if "error" in data and data["error"] == "ROOM_ALREADY_EXISTS":
print("Room already exists")
cursor = db.cursor()
cursor.execute("INSERT INTO rooms (room_id) VALUES (?)", (room_id,))
db.commit()
print("Room created")
print("Created room", room_id)
return True
def generateAuthToken():
response = requests.get(f"https://liveblocks.io/api/authorize",
headers={"Authorization": f"Bearer {LIVEBLOCKS_SECRET}"})
if response.status_code == 200:
data = response.json()
return data["token"]
else:
raise Exception(response.status_code, response.text)
def get_room_count(room_id: str, jwtToken: str = ''):
print("Getting room count" + room_id)
response = requests.get(
f"https://liveblocks.net/api/v1/room/{room_id}/users", headers={"Authorization": f"Bearer {jwtToken}", "Content-Type": "application/json"})
if response.status_code == 200:
res = response.json()
if "data" in res:
return len(res["data"])
else:
return 0
raise Exception("Error getting room count")
@app.get("/sync-rooms")
async def sync_rooms(db: sqlite3.Connection = Depends(get_db)):
try:
jwtToken = generateAuthToken()
rooms = db.execute("SELECT * FROM rooms").fetchall()
for row in rooms:
room_id = row["room_id"]
users_count = get_room_count(room_id, jwtToken)
print("Updating room", room_id, "with", users_count, "users")
cursor = db.cursor()
cursor.execute(
"UPDATE rooms SET users_count = ? WHERE room_id = ?", (users_count, room_id))
db.commit()
data = db.execute("SELECT * FROM rooms").fetchall()
return data
except Exception as e:
print(e)
return {"error": str(e)}
if __name__ == "__main__":
uvicorn.run("createRooms:app", host="0.0.0.0",
log_level="debug", reload=True)