File size: 6,248 Bytes
8344de9 116e690 3e6686b a27d436 fac5a9c 24d027c 86b69b9 fac5a9c aea5fd9 116e690 fac5a9c 24d027c 8344de9 5ef0074 97eb4b4 8344de9 97eb4b4 24d027c fac5a9c a27d436 56d0518 116e690 56d0518 86b69b9 56d0518 a27d436 86b69b9 a27d436 86b69b9 a27d436 86b69b9 a27d436 86b69b9 ca6f10b 86b69b9 a27d436 fac5a9c a27d436 fac5a9c 5c97774 a27d436 56d0518 a27d436 56d0518 a27d436 fac5a9c 3e6686b 56d0518 5c97774 d0acf6f 56d0518 eeb8c5f 534f8af eeb8c5f 13842c7 eeb8c5f b4d2f11 eeb8c5f b4d2f11 eeb8c5f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import json
import os
import pandas as pd
import re
import streamlit as st
from streamlit_gsheets import GSheetsConnection
from streamlit.runtime.secrets import AttrDict, secrets_singleton
from data_storage import get_user_id_if_email_exists, get_user, add_new_user, create_new_token_for_user
NEW_USER_ENABLED = bool(int(os.getenv("NEW_USER_ENABLED", 1)))
class HFFriendlyGSheetsConnection(GSheetsConnection):
# HF doesnt currently support nested secrets as in secrets.toml
# this class overwrites the logic for where to find the secrets and instead creates them from the top level of secrets
@property
def _secrets(self) -> AttrDict:
"""Get the secrets for this connection from the corresponding st.secrets section.
We expect this property to be used primarily by connection authors when they
are implementing their class' ``_connect`` method. User scripts should, for the
most part, have no reason to use this property.
"""
connections_section = None
if secrets_singleton.load_if_toml_exists():
connections_section = AttrDict(secrets_singleton)
if connections_section is None:
return AttrDict({})
connections_copy = connections_section.to_dict()
# in hf env the private key may need json loads
private_key = connections_copy["private_key"]
try:
private_key = json.loads(private_key)
except Exception:
pass
connections_copy["private_key"] = private_key
return AttrDict(connections_copy)
conn = st.connection("gsheets", type=HFFriendlyGSheetsConnection)
# Make a regular expression
# for validating an Email
regex_email = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
# Define a function for
# for validating an Email
def check_email(email: str) -> bool:
return bool(re.fullmatch(regex_email, email))
def new_user_form():
"""Form to collect new user information and submit"""
if st.button("Existing User get Login link"):
existing_user_new_token()
else:
if NEW_USER_ENABLED:
"""Form to collect new user information and submit"""
with st.form("New User Registration"):
st.header("New User Registration")
st.write("After submitting, a url to login will be emailed to you.")
st.text_input("Email Address", key="new_user_email")
st.text_input("Name", key="new_user_name")
st.form_submit_button("Submit", on_click=new_user_submitted)
def existing_user_new_token():
with st.form("Existing User New URL"):
st.header("Existing User Login Link Request")
st.write("After submitting, a url to login will be emailed to you if a user with the email exists.")
st.text_input("Email Address", key="reset_token_user_email")
st.form_submit_button("Submit", on_click=generate_new_token_request)
def generate_new_token_request():
if not check_email(email := st.session_state["reset_token_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
user_id = get_user_id_if_email_exists(email)
if not user_id:
return
token = create_new_token_for_user(user_id, existing_user=True)
insert_new_token_to_sheet(email, token)
st.info("Request submitted. Please check your email for a login url with a few minutes.")
del st.session_state["reset_token_user_email"]
def insert_new_token_to_sheet(email: str, token: str):
conn.update(
worksheet="token-requests",
data=pd.DataFrame(
{
"email": email,
"token": token,
},
index=[0],
),
)
def create_new_user_request(email: str, name: str):
add_new_user(email, name)
user_id = get_user_id_if_email_exists(email)
assert user_id
token = create_new_token_for_user(user_id)
insert_new_token_to_sheet(email, token)
st.info("New user request submitted. Please check your email for a login url with a few minutes.")
def new_user_submitted():
if not check_email(email := st.session_state["new_user_email"]):
st.warning("Sorry email is invalid. Please try a valid email address.")
return
if not (name := st.session_state["new_user_name"]):
st.warning("No name entered. Please enter your name for display.")
return
if get_user_id_if_email_exists(email):
st.warning(
"User with that email already exists. If you would like a new login url, please click the New Login URL button."
)
return
create_new_user_request(email, name)
del st.session_state["new_user_email"]
del st.session_state["new_user_name"]
def check_password():
# if not logged in offer new user registration
if st.session_state.get("logged_in_user"):
_, name = get_logged_in_user_name_email()
st.write(f"Logged in as: {name}")
return True
else:
new_user_form()
return False
def get_logged_in_user_name_email() -> tuple[str | None, str | None]:
if not (user_id := st.session_state.get("logged_in_user")):
# if not logged
return (None, None)
user_info_map = get_user(user_id)
email = user_info_map.get("email")
name = user_info_map.get("name")
return email, name
# hack - data overrides for stats allowing for quick manual edits
@st.cache_data(ttl=60 * 10)
def get_stat_overrides() -> dict[int, dict[str, dict[str, float]]]:
df = conn.read(
worksheet="stats-overrides",
ttl="1m",
usecols=[0, 1, 2, 3],
)
stat_overrides_map: dict[int, dict[str, dict[str, float]]] = {w: {} for w in df.week.values}
for week, df_week in df.dropna().groupby("week"):
for player_id, df_player in df_week.groupby("player_id"):
if player_id not in stat_overrides_map[week]:
stat_overrides_map[week][player_id] = {}
for row in df_player.itertuples():
if isinstance(row.stat_key, str):
stat_overrides_map[week][player_id][row.stat_key] = float(row.stat_value)
return stat_overrides_map
|