import base64
import os
import httpx
import streamlit as st
import webbrowser
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
APP_URL = os.environ["APP_URL"]
STRAVA_CLIENT_ID = os.environ["STRAVA_CLIENT_ID"]
STRAVA_CLIENT_SECRET = os.environ["STRAVA_CLIENT_SECRET"]
STRAVA_AUTHORIZATION_URL = "https://www.strava.com/oauth/authorize"
STRAVA_API_BASE_URL = "https://www.strava.com/api/v3"
SCOPE = "activity:read_all,profile:read_all,activity:write"
DEFAULT_ACTIVITY_LABEL = "NO_ACTIVITY_SELECTED"
STRAVA_ORANGE = "#fc4c02"
@st.cache_data
def load_image_as_base64(image_path):
with open(image_path, "rb") as f:
contents = f.read()
return base64.b64encode(contents).decode("utf-8")
def powered_by_strava_logo():
base64_image = load_image_as_base64("static/api_logo_pwrd_by_strava.png")
st.markdown(
f'',
unsafe_allow_html=True,
)
@st.cache_data
def authorization_url():
request = httpx.Request(
method="GET",
url=STRAVA_AUTHORIZATION_URL,
params={
"client_id": STRAVA_CLIENT_ID,
"redirect_uri": APP_URL,
"response_type": "code",
"approval_prompt": "auto",
"scope": SCOPE
}
)
return request.url
def login_header(header=None):
strava_authorization_url = authorization_url()
if header is None:
base = st
else:
col1, _, _, button = header
base = button
with col1:
powered_by_strava_logo()
base64_image = load_image_as_base64("./static/btn_strava_connect.png")
base.markdown(
(
f""
f" "
f""
),
unsafe_allow_html=True,
)
def logout_header(header=None):
if header is None:
base = st
else:
_, col2, _, button = header
base = button
with col2:
powered_by_strava_logo()
if base.button("Log out"):
webbrowser.open(APP_URL, new=0)
def logged_in_title(strava_auth, header=None):
if header is None:
base = st
else:
col, _, _, _ = header
base = col
first_name = strava_auth["athlete"]["firstname"]
last_name = strava_auth["athlete"]["lastname"]
col.markdown(f"*Welcome, {first_name} {last_name}!*")
@st.cache_data
def exchange_authorization_code(authorization_code):
response = httpx.post(
url="https://www.strava.com/oauth/token",
json={
"client_id": STRAVA_CLIENT_ID,
"client_secret": STRAVA_CLIENT_SECRET,
"code": authorization_code,
"grant_type": "authorization_code",
}
)
try:
response.raise_for_status()
except httpx.HTTPStatusError:
st.error("Something went wrong while authenticating with Strava. Please reload and try again")
st.experimental_set_query_params()
st.stop()
return
strava_auth = response.json()
return strava_auth
def authenticate(header=None, stop_if_unauthenticated=True):
query_params = st.experimental_get_query_params()
authorization_code = query_params.get("code", [None])[0]
if authorization_code is None:
authorization_code = query_params.get("session", [None])[0]
if authorization_code is None:
login_header(header=header)
if stop_if_unauthenticated:
st.stop()
return
else:
logout_header(header=header)
strava_auth = exchange_authorization_code(authorization_code)
logged_in_title(strava_auth, header)
st.experimental_set_query_params(session=authorization_code)
return strava_auth
def header():
col1, col2, col3 = st.columns(3)
with col3:
strava_button = st.empty()
return col1, col2, col3, strava_button
def catch_strava_api_error(response):
if response.status_code == 200:
return
st.write(response.status_code)
if response.status_code == 401:
st.error("You are not authorized to access this resource. Please relog yourself.")
st.stop()
return
else:
st.error(response)
if response["errors"]:
st.error(f"Something went wrong while fetching data from Strava. Please reload and try again (error message: {response['message']})")
st.stop()
return
def strava_call(auth, uri_params, params = None):
access_token = auth["access_token"]
response = httpx.get(
url=f"{STRAVA_API_BASE_URL}/{uri_params}",
params=params,
headers={
"Authorization": f"Bearer {access_token}",
},
)
catch_strava_api_error(response)
return response.json()
@st.cache_data
def get_athlete_detail(auth, page=1):
return strava_call(auth, "athlete")
@st.cache_data
def get_shoes(athlete):
return athlete["shoes"]
@st.cache_data
def get_activity(activity_id, auth):
return strava_call(auth, f"activities/{activity_id}")
@st.cache_data
def get_activities_on_period(auth, activities, start_date, end_date, page):
response = get_activities(auth, page)
number_added = 0
for activity in response:
strava_start_date = datetime.strptime(activity["start_date"], '%Y-%m-%dT%H:%M:%SZ').date()
if strava_start_date >= start_date and strava_start_date <= end_date:
activities.append(activity)
number_added += 1
if number_added == 0:
return activities
else:
return get_activities_on_period(auth, activities, start_date, end_date, page + 1)
@st.cache_data
def get_activities(auth, page=1):
return strava_call(auth, f"athlete/activities", params={"page": page})
@st.cache_data
def get_activity_zones(auth, activity_id):
return strava_call(auth, f"activities/{activity_id}/zones")
@st.cache_data
def get_athlete_zones(auth):
return strava_call(auth, f"athlete/zones")
# @st.cache_data
# def get_all_activities(auth):
# page = 1
# activities = []
# while True:
# new_activities = strava_call(auth, f"athlete/activities", params={
# "page": page,
# "per_page": 200
# })
# activities.append(new_activities)
# if len(new_activities) == 0:
# break
# page += 1
#
# return activities
# def export_all_activities(auth):
# activities = get_all_activities(auth=auth)
# st.write(f"You got {len(activities)} activities")
# with open('activities.json', 'w') as f:
# json.dump(activities, f)