Jon Solow
Get a client per session to help with expiry
050fd9f
import json
import os
import pandas as pd
import re
import streamlit as st
from streamlit_gsheets import GSheetsConnection
from streamlit.runtime.secrets import AttrDict, secrets_singleton
from data_storage import get_user_id_if_email_exists, get_user, add_new_user, create_new_token_for_user
NEW_USER_ENABLED = bool(int(os.getenv("NEW_USER_ENABLED", 1)))
class HFFriendlyGSheetsConnection(GSheetsConnection):
# HF doesnt currently support nested secrets as in secrets.toml
# this class overwrites the logic for where to find the secrets and instead creates them from the top level of secrets
@property
def _secrets(self) -> AttrDict:
"""Get the secrets for this connection from the corresponding st.secrets section.
We expect this property to be used primarily by connection authors when they
are implementing their class' ``_connect`` method. User scripts should, for the
most part, have no reason to use this property.
"""
connections_section = None
if secrets_singleton.load_if_toml_exists():
connections_section = AttrDict(secrets_singleton)
if connections_section is None:
return AttrDict({})
connections_copy = connections_section.to_dict()
# in hf env the private key may need json loads
private_key = connections_copy["private_key"]
try:
private_key = json.loads(private_key)
except Exception:
pass
connections_copy["private_key"] = private_key
return AttrDict(connections_copy)
conn = st.connection("gsheets", type=HFFriendlyGSheetsConnection)
# Make a regular expression
# for validating an Email
regex_email = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
# Define a function for
# for validating an Email
def check_email(email: str) -> bool:
return bool(re.fullmatch(regex_email, email))
def new_user_form():
"""Form to collect new user information and submit"""
if st.button("Existing User - resend login token email"):
existing_user_new_token()
else:
if NEW_USER_ENABLED:
if st.button("New User Registration"):
"""Form to collect new user information and submit"""
with st.form("New User Registration"):
st.header("New User Registration")
st.write("After submitting, a url to login will be emailed to you.")
st.text_input("Email Address", key="new_user_email")
st.text_input("Name", key="new_user_name")
st.form_submit_button("Submit", on_click=new_user_submitted)
def existing_user_new_token():
with st.form("Existing User New URL"):
st.header("Existing User Login Link Request")
st.write("After submitting, a url to login will be emailed to you if a user with the email exists.")
st.text_input("Email Address", key="reset_token_user_email")
st.form_submit_button("Submit", on_click=generate_new_token_request)
def generate_new_token_request():
if not check_email(email := st.session_state["reset_token_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
user_id = get_user_id_if_email_exists(email, st.session_state["db_client"])
if not user_id:
return
token = create_new_token_for_user(user_id, st.session_state["db_client"], existing_user=True)
insert_new_token_to_sheet(email, token)
st.info("Request submitted. Please check your email for a login url with a few minutes.")
del st.session_state["reset_token_user_email"]
def insert_new_token_to_sheet(email: str, token: str):
conn.update(
worksheet="token-requests",
data=pd.DataFrame(
{
"email": email,
"token": token,
},
index=[0],
),
)
def create_new_user_request(email: str, name: str):
add_new_user(email, name, st.session_state["db_client"])
user_id = get_user_id_if_email_exists(email, st.session_state["db_client"])
assert user_id
token = create_new_token_for_user(user_id, st.session_state["db_client"])
insert_new_token_to_sheet(email, token)
st.info("New user request submitted. Please check your email for a login url with a few minutes.")
def new_user_submitted():
if not check_email(email := st.session_state["new_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
if not (name := st.session_state["new_user_name"]):
st.warning("No name entered. Please enter your name for display.")
return
if get_user_id_if_email_exists(email, st.session_state["db_client"]):
st.warning(
"User with that email already exists. If you would like a new login url, please click the New Login URL button."
)
return
create_new_user_request(email, name)
del st.session_state["new_user_email"]
del st.session_state["new_user_name"]
def check_password():
# if not logged in offer new user registration
if st.session_state.get("logged_in_user"):
_, name = get_logged_in_user_name_email()
st.write(f"Logged in as: {name}")
return True
else:
new_user_form()
return False
def get_logged_in_user_name_email() -> tuple[str | None, str | None]:
if not (user_id := st.session_state.get("logged_in_user")):
# if not logged
return (None, None)
user_info_map = get_user(user_id, st.session_state["db_client"])
email = user_info_map.get("email")
name = user_info_map.get("name")
return email, name
# hack - data overrides for stats allowing for quick manual edits
@st.cache_data(ttl=60 * 10)
def get_stat_overrides() -> dict[int, dict[str, dict[str, float]]]:
df = conn.read(
worksheet="stats-overrides",
ttl="1m",
usecols=[0, 1, 2, 3],
)
stat_overrides_map: dict[int, dict[str, dict[str, float]]] = {w: {} for w in df.week.values}
for week, df_week in df.dropna().groupby("week"):
for player_id, df_player in df_week.groupby("player_id"):
if player_id not in stat_overrides_map[week]:
stat_overrides_map[week][player_id] = {}
for row in df_player.itertuples():
if isinstance(row.stat_key, str):
stat_overrides_map[week][player_id][row.stat_key] = float(row.stat_value)
return stat_overrides_map