import json import os import pandas as pd import re import streamlit as st from streamlit_gsheets import GSheetsConnection from streamlit.runtime.secrets import AttrDict, secrets_singleton from data_storage import get_user_id_if_email_exists, get_user, add_new_user, create_new_token_for_user NEW_USER_ENABLED = bool(int(os.getenv("NEW_USER_ENABLED", 1))) class HFFriendlyGSheetsConnection(GSheetsConnection): # HF doesnt currently support nested secrets as in secrets.toml # this class overwrites the logic for where to find the secrets and instead creates them from the top level of secrets @property def _secrets(self) -> AttrDict: """Get the secrets for this connection from the corresponding st.secrets section. We expect this property to be used primarily by connection authors when they are implementing their class' ``_connect`` method. User scripts should, for the most part, have no reason to use this property. """ connections_section = None if secrets_singleton.load_if_toml_exists(): connections_section = AttrDict(secrets_singleton) if connections_section is None: return AttrDict({}) connections_copy = connections_section.to_dict() # in hf env the private key may need json loads private_key = connections_copy["private_key"] try: private_key = json.loads(private_key) except Exception: pass connections_copy["private_key"] = private_key return AttrDict(connections_copy) conn = st.connection("gsheets", type=HFFriendlyGSheetsConnection) # Make a regular expression # for validating an Email regex_email = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b" # Define a function for # for validating an Email def check_email(email: str) -> bool: return bool(re.fullmatch(regex_email, email)) def new_user_form(): """Form to collect new user information and submit""" if st.button("Existing User get Login link"): existing_user_new_token() else: if NEW_USER_ENABLED: """Form to collect new user information and submit""" with st.form("New User Registration"): st.header("New User Registration") st.write("After submitting, a url to login will be emailed to you.") st.text_input("Email Address", key="new_user_email") st.text_input("Name", key="new_user_name") st.form_submit_button("Submit", on_click=new_user_submitted) def existing_user_new_token(): with st.form("Existing User New URL"): st.header("Existing User Login Link Request") st.write("After submitting, a url to login will be emailed to you if a user with the email exists.") st.text_input("Email Address", key="reset_token_user_email") st.form_submit_button("Submit", on_click=generate_new_token_request) def generate_new_token_request(): if not check_email(email := st.session_state["reset_token_user_email"]): st.warning("Sorry email is invalid. Please try a valid email address.") return user_id = get_user_id_if_email_exists(email) if not user_id: return token = create_new_token_for_user(user_id, existing_user=True) insert_new_token_to_sheet(email, token) st.info("Request submitted. Please check your email for a login url with a few minutes.") del st.session_state["reset_token_user_email"] def insert_new_token_to_sheet(email: str, token: str): conn.update( worksheet="token-requests", data=pd.DataFrame( { "email": email, "token": token, }, index=[0], ), ) def create_new_user_request(email: str, name: str): add_new_user(email, name) user_id = get_user_id_if_email_exists(email) assert user_id token = create_new_token_for_user(user_id) insert_new_token_to_sheet(email, token) st.info("New user request submitted. Please check your email for a login url with a few minutes.") def new_user_submitted(): if not check_email(email := st.session_state["new_user_email"]): st.warning("Sorry email is invalid. Please try a valid email address.") return if not (name := st.session_state["new_user_name"]): st.warning("No name entered. Please enter your name for display.") return if get_user_id_if_email_exists(email): st.warning( "User with that email already exists. If you would like a new login url, please click the New Login URL button." ) return create_new_user_request(email, name) del st.session_state["new_user_email"] del st.session_state["new_user_name"] def check_password(): # if not logged in offer new user registration if st.session_state.get("logged_in_user"): _, name = get_logged_in_user_name_email() st.write(f"Logged in as: {name}") return True else: new_user_form() return False def get_logged_in_user_name_email() -> tuple[str | None, str | None]: if not (user_id := st.session_state.get("logged_in_user")): # if not logged return (None, None) user_info_map = get_user(user_id) email = user_info_map.get("email") name = user_info_map.get("name") return email, name # hack - data overrides for stats allowing for quick manual edits @st.cache_data(ttl=60 * 10) def get_stat_overrides() -> dict[int, dict[str, dict[str, float]]]: df = conn.read( worksheet="stats-overrides", ttl="1m", usecols=[0, 1, 2, 3], ) stat_overrides_map: dict[int, dict[str, dict[str, float]]] = {w: {} for w in df.week.values} for week, df_week in df.dropna().groupby("week"): for player_id, df_player in df_week.groupby("player_id"): if player_id not in stat_overrides_map[week]: stat_overrides_map[week][player_id] = {} for row in df_player.itertuples(): if isinstance(row.stat_key, str): stat_overrides_map[week][player_id][row.stat_key] = float(row.stat_value) return stat_overrides_map