Jon Solow
Implement admin utility for migrating week
e11a10a
raw
history blame
4.84 kB
import os
from secrets import token_urlsafe
import streamlit as st
import sqlite3
DATA_DIR = "/data"
DB_PATH = os.path.join(DATA_DIR, "data.db")
def get_db_connection():
return sqlite3.connect(DB_PATH)
def initialize_data_storage():
with get_db_connection() as con:
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS user_rosters( user_id INTEGER, position_id TEXT, player_id TEXT)")
cur.execute("CREATE TABLE IF NOT EXISTS users( user_id INTEGER PRIMARY KEY ASC, email TEXT, name TEXT)")
cur.execute("CREATE TABLE IF NOT EXISTS tokens( user_id INTEGER PRIMARY KEY, token TEXT)")
def update_selection(user_id: str | int, position_id: str, player_id: str):
with get_db_connection() as con:
cur = con.cursor()
cur.execute(
f"""DELETE FROM user_rosters where user_id = {user_id} and position_id = '{position_id}'
"""
)
cur.execute(
f"""INSERT INTO user_rosters (user_id, position_id, player_id )
VALUES({user_id}, '{position_id}', '{player_id}')
"""
)
def get_user_team(user_id):
with get_db_connection() as con:
cur = con.cursor()
team = cur.execute(f"select * from user_rosters where user_id = {user_id}").fetchall()
if team:
return {x[1]: x[2] for x in team}
else:
return {}
def add_new_user(email: str, name: str):
with get_db_connection() as con:
cur = con.cursor()
cur.execute(
f"""INSERT INTO users (email, name )
VALUES('{email.lower()}', '{name}')
"""
)
def get_user(user_id: int):
with get_db_connection() as con:
cur = con.cursor()
user_data = cur.execute(f"select * from users where user_id = {user_id}").fetchone()
if not user_data:
return {}
return {
"user_id": user_data[0],
"email": user_data[1],
"name": user_data[2],
}
def get_user_id_if_email_exists(email: str) -> int | None:
with get_db_connection() as con:
cur = con.cursor()
query_result = cur.execute(f"select user_id from users where email = '{email.lower()}'").fetchone()
if query_result:
user_id = query_result[0]
else:
user_id = None
return user_id
def is_admin(user_id: int):
# Replace with db data field later
return user_id == 1
def login_by_token(token: str):
# returns true if logged in successfully
with get_db_connection() as con:
cur = con.cursor()
query_result = cur.execute(f"select user_id from tokens where token = '{token}'").fetchone()
if query_result:
user_id = query_result[0]
st.session_state["logged_in_user"] = user_id
else:
user_id = None
return user_id
def create_new_token_for_user(user_id: int, existing_user: bool = False):
# returns true if logged in successfully
token = token_urlsafe(32)
with get_db_connection() as con:
cur = con.cursor()
if existing_user:
cur.execute(
f"""DELETE FROM tokens where user_id = {user_id}
"""
)
cur.execute(
f"""INSERT INTO tokens (user_id, token )
VALUES({user_id}, '{token}')
"""
)
return token
def drop_tables():
with get_db_connection() as con:
cur = con.cursor()
cur.execute("DROP TABLE user_rosters")
cur.execute("DROP TABLE users")
cur.execute("DROP TABLE tokens")
def get_all_users(columns_included: list[str] = ["user_id", "name", "email"]):
columns_as_str = ",".join(columns_included)
with get_db_connection() as con:
cur = con.cursor()
all_users = cur.execute(f"select {columns_as_str} from users").fetchall()
return all_users
def get_all_rosters() -> list[tuple[int, str, str]]:
with get_db_connection() as con:
cur = con.cursor()
all_rosters = cur.execute("select * from user_rosters").fetchall()
return all_rosters
def get_all_rosters_week(week: int) -> list[tuple[int, str, str]]:
with get_db_connection() as con:
cur = con.cursor()
all_rosters = cur.execute(f"select * from user_rosters where position_id like '{week}%'").fetchall()
return all_rosters
def migrate_players_from_week(migrate_from_week: int):
"""
Migrate players from the week = migrate_from_week to the week = migrate_from_week + 1
"""
rosters = get_all_rosters_week(migrate_from_week)
for user_id, position_id, player_id in rosters:
new_position_id = f"""{migrate_from_week + 1}-{position_id.split("-", 1)[1]}"""
update_selection(user_id, new_position_id, player_id)