|
import os |
|
from secrets import token_urlsafe |
|
import streamlit as st |
|
import sqlite3 |
|
|
|
DATA_DIR = "/data" |
|
|
|
DB_PATH = os.path.join(DATA_DIR, "data.db") |
|
|
|
|
|
def get_db_connection(): |
|
return sqlite3.connect(DB_PATH) |
|
|
|
|
|
def initialize_data_storage(): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
cur.execute("CREATE TABLE IF NOT EXISTS user_rosters( user_id INTEGER, position_id TEXT, player_id TEXT)") |
|
cur.execute("CREATE TABLE IF NOT EXISTS users( user_id INTEGER PRIMARY KEY ASC, email TEXT, name TEXT)") |
|
cur.execute("CREATE TABLE IF NOT EXISTS tokens( user_id INTEGER PRIMARY KEY, token TEXT)") |
|
|
|
|
|
def update_selection(user_id: str | int, position_id: str, player_id: str): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
cur.execute( |
|
f"""DELETE FROM user_rosters where user_id = {user_id} and position_id = '{position_id}' |
|
""" |
|
) |
|
cur.execute( |
|
f"""INSERT INTO user_rosters (user_id, position_id, player_id ) |
|
VALUES({user_id}, '{position_id}', '{player_id}') |
|
""" |
|
) |
|
|
|
|
|
def get_user_team(user_id): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
team = cur.execute(f"select * from user_rosters where user_id = {user_id}").fetchall() |
|
if team: |
|
return {x[1]: x[2] for x in team} |
|
else: |
|
return {} |
|
|
|
|
|
def add_new_user(email: str, name: str): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
cur.execute( |
|
f"""INSERT INTO users (email, name ) |
|
VALUES('{email.lower()}', '{name}') |
|
""" |
|
) |
|
|
|
|
|
def get_user(user_id: int): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
user_data = cur.execute(f"select * from users where user_id = {user_id}").fetchone() |
|
if not user_data: |
|
return {} |
|
return { |
|
"user_id": user_data[0], |
|
"email": user_data[1], |
|
"name": user_data[2], |
|
} |
|
|
|
|
|
def get_user_id_if_email_exists(email: str) -> int | None: |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
query_result = cur.execute(f"select user_id from users where email = '{email.lower()}'").fetchone() |
|
if query_result: |
|
user_id = query_result[0] |
|
else: |
|
user_id = None |
|
return user_id |
|
|
|
|
|
def is_admin(user_id: int): |
|
|
|
return user_id == 1 |
|
|
|
|
|
def login_by_token(token: str): |
|
|
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
query_result = cur.execute(f"select user_id from tokens where token = '{token}'").fetchone() |
|
if query_result: |
|
user_id = query_result[0] |
|
st.session_state["logged_in_user"] = user_id |
|
else: |
|
user_id = None |
|
return user_id |
|
|
|
|
|
def create_new_token_for_user(user_id: int, existing_user: bool = False): |
|
|
|
token = token_urlsafe(32) |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
if existing_user: |
|
cur.execute( |
|
f"""DELETE FROM tokens where user_id = {user_id} |
|
""" |
|
) |
|
|
|
cur.execute( |
|
f"""INSERT INTO tokens (user_id, token ) |
|
VALUES({user_id}, '{token}') |
|
""" |
|
) |
|
return token |
|
|
|
|
|
def drop_tables(): |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
cur.execute("DROP TABLE user_rosters") |
|
cur.execute("DROP TABLE users") |
|
cur.execute("DROP TABLE tokens") |
|
|
|
|
|
def get_all_users(columns_included: list[str] = ["user_id", "name", "email"]): |
|
columns_as_str = ",".join(columns_included) |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
all_users = cur.execute(f"select {columns_as_str} from users").fetchall() |
|
return all_users |
|
|
|
|
|
def get_all_rosters() -> list[tuple[int, str, str]]: |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
all_rosters = cur.execute("select * from user_rosters").fetchall() |
|
return all_rosters |
|
|
|
|
|
def get_all_rosters_week(week: int) -> list[tuple[int, str, str]]: |
|
with get_db_connection() as con: |
|
cur = con.cursor() |
|
all_rosters = cur.execute(f"select * from user_rosters where position_id like '{week}%'").fetchall() |
|
return all_rosters |
|
|
|
|
|
def migrate_players_from_week(migrate_from_week: int): |
|
""" |
|
Migrate players from the week = migrate_from_week to the week = migrate_from_week + 1 |
|
""" |
|
rosters = get_all_rosters_week(migrate_from_week) |
|
for user_id, position_id, player_id in rosters: |
|
new_position_id = f"""{migrate_from_week + 1}-{position_id.split("-", 1)[1]}""" |
|
update_selection(user_id, new_position_id, player_id) |
|
|