Jon Solow
Fix override logic to allow setting value to zero and drop all nans
b4d2f11
raw
history blame
6.25 kB
import json
import os
import pandas as pd
import re
import streamlit as st
from streamlit_gsheets import GSheetsConnection
from streamlit.runtime.secrets import AttrDict, secrets_singleton
from data_storage import get_user_id_if_email_exists, get_user, add_new_user, create_new_token_for_user
NEW_USER_ENABLED = bool(int(os.getenv("NEW_USER_ENABLED", 1)))
class HFFriendlyGSheetsConnection(GSheetsConnection):
# HF doesnt currently support nested secrets as in secrets.toml
# this class overwrites the logic for where to find the secrets and instead creates them from the top level of secrets
@property
def _secrets(self) -> AttrDict:
"""Get the secrets for this connection from the corresponding st.secrets section.
We expect this property to be used primarily by connection authors when they
are implementing their class' ``_connect`` method. User scripts should, for the
most part, have no reason to use this property.
"""
connections_section = None
if secrets_singleton.load_if_toml_exists():
connections_section = AttrDict(secrets_singleton)
if connections_section is None:
return AttrDict({})
connections_copy = connections_section.to_dict()
# in hf env the private key may need json loads
private_key = connections_copy["private_key"]
try:
private_key = json.loads(private_key)
except Exception:
pass
connections_copy["private_key"] = private_key
return AttrDict(connections_copy)
conn = st.connection("gsheets", type=HFFriendlyGSheetsConnection)
# Make a regular expression
# for validating an Email
regex_email = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
# Define a function for
# for validating an Email
def check_email(email: str) -> bool:
return bool(re.fullmatch(regex_email, email))
def new_user_form():
"""Form to collect new user information and submit"""
if st.button("Existing User get Login link"):
existing_user_new_token()
else:
if NEW_USER_ENABLED:
"""Form to collect new user information and submit"""
with st.form("New User Registration"):
st.header("New User Registration")
st.write("After submitting, a url to login will be emailed to you.")
st.text_input("Email Address", key="new_user_email")
st.text_input("Name", key="new_user_name")
st.form_submit_button("Submit", on_click=new_user_submitted)
def existing_user_new_token():
with st.form("Existing User New URL"):
st.header("Existing User Login Link Request")
st.write("After submitting, a url to login will be emailed to you if a user with the email exists.")
st.text_input("Email Address", key="reset_token_user_email")
st.form_submit_button("Submit", on_click=generate_new_token_request)
def generate_new_token_request():
if not check_email(email := st.session_state["reset_token_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
user_id = get_user_id_if_email_exists(email)
if not user_id:
return
token = create_new_token_for_user(user_id, existing_user=True)
insert_new_token_to_sheet(email, token)
st.info("Request submitted. Please check your email for a login url with a few minutes.")
del st.session_state["reset_token_user_email"]
def insert_new_token_to_sheet(email: str, token: str):
conn.update(
worksheet="token-requests",
data=pd.DataFrame(
{
"email": email,
"token": token,
},
index=[0],
),
)
def create_new_user_request(email: str, name: str):
add_new_user(email, name)
user_id = get_user_id_if_email_exists(email)
assert user_id
token = create_new_token_for_user(user_id)
insert_new_token_to_sheet(email, token)
st.info("New user request submitted. Please check your email for a login url with a few minutes.")
def new_user_submitted():
if not check_email(email := st.session_state["new_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
if not (name := st.session_state["new_user_name"]):
st.warning("No name entered. Please enter your name for display.")
return
if get_user_id_if_email_exists(email):
st.warning(
"User with that email already exists. If you would like a new login url, please click the New Login URL button."
)
return
create_new_user_request(email, name)
del st.session_state["new_user_email"]
del st.session_state["new_user_name"]
def check_password():
# if not logged in offer new user registration
if st.session_state.get("logged_in_user"):
_, name = get_logged_in_user_name_email()
st.write(f"Logged in as: {name}")
return True
else:
new_user_form()
return False
def get_logged_in_user_name_email() -> tuple[str | None, str | None]:
if not (user_id := st.session_state.get("logged_in_user")):
# if not logged
return (None, None)
user_info_map = get_user(user_id)
email = user_info_map.get("email")
name = user_info_map.get("name")
return email, name
# hack - data overrides for stats allowing for quick manual edits
@st.cache_data(ttl=60 * 10)
def get_stat_overrides() -> dict[int, dict[str, dict[str, float]]]:
df = conn.read(
worksheet="stats-overrides",
ttl="1m",
usecols=[0, 1, 2, 3],
)
stat_overrides_map: dict[int, dict[str, dict[str, float]]] = {w: {} for w in df.week.values}
for week, df_week in df.dropna().groupby("week"):
for player_id, df_player in df_week.groupby("player_id"):
if player_id not in stat_overrides_map[week]:
stat_overrides_map[week][player_id] = {}
for row in df_player.itertuples():
if isinstance(row.stat_key, str):
stat_overrides_map[week][player_id][row.stat_key] = float(row.stat_value)
return stat_overrides_map