|
import json |
|
import os |
|
import pandas as pd |
|
import re |
|
import streamlit as st |
|
from streamlit_gsheets import GSheetsConnection |
|
from streamlit.runtime.secrets import AttrDict, secrets_singleton |
|
from data_storage import get_user_id_if_email_exists, get_user, add_new_user, create_new_token_for_user |
|
|
|
NEW_USER_ENABLED = bool(int(os.getenv("NEW_USER_ENABLED", 1))) |
|
|
|
|
|
class HFFriendlyGSheetsConnection(GSheetsConnection): |
|
|
|
|
|
@property |
|
def _secrets(self) -> AttrDict: |
|
"""Get the secrets for this connection from the corresponding st.secrets section. |
|
|
|
We expect this property to be used primarily by connection authors when they |
|
are implementing their class' ``_connect`` method. User scripts should, for the |
|
most part, have no reason to use this property. |
|
""" |
|
connections_section = None |
|
if secrets_singleton.load_if_toml_exists(): |
|
connections_section = AttrDict(secrets_singleton) |
|
|
|
if connections_section is None: |
|
return AttrDict({}) |
|
|
|
connections_copy = connections_section.to_dict() |
|
|
|
private_key = connections_copy["private_key"] |
|
try: |
|
private_key = json.loads(private_key) |
|
except Exception: |
|
pass |
|
connections_copy["private_key"] = private_key |
|
|
|
return AttrDict(connections_copy) |
|
|
|
|
|
conn = st.connection("gsheets", type=HFFriendlyGSheetsConnection) |
|
|
|
|
|
|
|
|
|
regex_email = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b" |
|
|
|
|
|
|
|
|
|
def check_email(email: str) -> bool: |
|
return bool(re.fullmatch(regex_email, email)) |
|
|
|
|
|
def new_user_form(): |
|
"""Form to collect new user information and submit""" |
|
if st.button("Existing User get Login link"): |
|
existing_user_new_token() |
|
else: |
|
if NEW_USER_ENABLED: |
|
"""Form to collect new user information and submit""" |
|
with st.form("New User Registration"): |
|
st.header("New User Registration") |
|
st.write("After submitting, a url to login will be emailed to you.") |
|
st.text_input("Email Address", key="new_user_email") |
|
st.text_input("Name", key="new_user_name") |
|
st.form_submit_button("Submit", on_click=new_user_submitted) |
|
|
|
|
|
def existing_user_new_token(): |
|
with st.form("Existing User New URL"): |
|
st.header("Existing User Login Link Request") |
|
st.write("After submitting, a url to login will be emailed to you if a user with the email exists.") |
|
st.text_input("Email Address", key="reset_token_user_email") |
|
st.form_submit_button("Submit", on_click=generate_new_token_request) |
|
|
|
|
|
def generate_new_token_request(): |
|
if not check_email(email := st.session_state["reset_token_user_email"]): |
|
st.warning("Sorry email is invalid. Please try a valid email address.") |
|
return |
|
user_id = get_user_id_if_email_exists(email) |
|
if not user_id: |
|
return |
|
token = create_new_token_for_user(user_id, existing_user=True) |
|
insert_new_token_to_sheet(email, token) |
|
|
|
st.info("Request submitted. Please check your email for a login url with a few minutes.") |
|
del st.session_state["reset_token_user_email"] |
|
|
|
|
|
def insert_new_token_to_sheet(email: str, token: str): |
|
conn.update( |
|
worksheet="token-requests", |
|
data=pd.DataFrame( |
|
{ |
|
"email": email, |
|
"token": token, |
|
}, |
|
index=[0], |
|
), |
|
) |
|
|
|
|
|
def create_new_user_request(email: str, name: str): |
|
add_new_user(email, name) |
|
user_id = get_user_id_if_email_exists(email) |
|
assert user_id |
|
token = create_new_token_for_user(user_id) |
|
insert_new_token_to_sheet(email, token) |
|
st.info("New user request submitted. Please check your email for a login url with a few minutes.") |
|
|
|
|
|
def new_user_submitted(): |
|
if not check_email(email := st.session_state["new_user_email"]): |
|
st.warning("Sorry email is invalid. Please try a valid email address.") |
|
return |
|
|
|
if not (name := st.session_state["new_user_name"]): |
|
st.warning("No name entered. Please enter your name for display.") |
|
return |
|
|
|
if get_user_id_if_email_exists(email): |
|
st.warning( |
|
"User with that email already exists. If you would like a new login url, please click the New Login URL button." |
|
) |
|
return |
|
create_new_user_request(email, name) |
|
del st.session_state["new_user_email"] |
|
del st.session_state["new_user_name"] |
|
|
|
|
|
def check_password(): |
|
|
|
if st.session_state.get("logged_in_user"): |
|
_, name = get_logged_in_user_name_email() |
|
st.write(f"Logged in as: {name}") |
|
return True |
|
else: |
|
new_user_form() |
|
return False |
|
|
|
|
|
def get_logged_in_user_name_email() -> tuple[str | None, str | None]: |
|
if not (user_id := st.session_state.get("logged_in_user")): |
|
|
|
return (None, None) |
|
|
|
user_info_map = get_user(user_id) |
|
email = user_info_map.get("email") |
|
name = user_info_map.get("name") |
|
return email, name |
|
|
|
|
|
|
|
@st.cache_data(ttl=60 * 10) |
|
def get_stat_overrides() -> dict[int, dict[str, dict[str, float]]]: |
|
df = conn.read( |
|
worksheet="stats-overrides", |
|
ttl="1m", |
|
usecols=[0, 1, 2, 3], |
|
) |
|
stat_overrides_map: dict[int, dict[str, dict[str, float]]] = {w: {} for w in df.week.values} |
|
for week, df_week in df.dropna().groupby("week"): |
|
for player_id, df_player in df_week.groupby("player_id"): |
|
if player_id not in stat_overrides_map[week]: |
|
stat_overrides_map[week][player_id] = {} |
|
for row in df_player.itertuples(): |
|
if isinstance(row.stat_key, str): |
|
stat_overrides_map[week][player_id][row.stat_key] = float(row.stat_value) |
|
return stat_overrides_map |
|
|