File size: 6,741 Bytes
21d674f
 
 
 
 
 
 
 
 
 
af5db05
21d674f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import base64
import os

import httpx
import streamlit as st
import webbrowser
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()

APP_URL = os.environ["APP_URL"]
STRAVA_CLIENT_ID = os.environ["STRAVA_CLIENT_ID"]
STRAVA_CLIENT_SECRET = os.environ["STRAVA_CLIENT_SECRET"]
STRAVA_AUTHORIZATION_URL = "https://www.strava.com/oauth/authorize"
STRAVA_API_BASE_URL = "https://www.strava.com/api/v3"
SCOPE = "activity:read_all,profile:read_all,activity:write"
DEFAULT_ACTIVITY_LABEL = "NO_ACTIVITY_SELECTED"
STRAVA_ORANGE = "#fc4c02"



@st.cache_data
def load_image_as_base64(image_path):
    with open(image_path, "rb") as f:
        contents = f.read()
    return base64.b64encode(contents).decode("utf-8")


def powered_by_strava_logo():
    base64_image = load_image_as_base64("static/api_logo_pwrd_by_strava.png")
    st.markdown(
        f'<img src="data:image/png;base64,{base64_image}" width="100%" alt="powered by strava">',
        unsafe_allow_html=True,
    )

@st.cache_data
def authorization_url():
    request = httpx.Request(
        method="GET",
        url=STRAVA_AUTHORIZATION_URL,
        params={
            "client_id": STRAVA_CLIENT_ID,
            "redirect_uri": APP_URL,
            "response_type": "code",
            "approval_prompt": "auto",
            "scope": SCOPE
        }
    )

    return request.url


def login_header(header=None):
    strava_authorization_url = authorization_url()

    if header is None:
        base = st
    else:
        col1, _, _, button = header
        base = button

    with col1:
        powered_by_strava_logo()

    base64_image = load_image_as_base64("./static/btn_strava_connect.png")
    base.markdown(
        (
            f"<a href=\"{strava_authorization_url}\">"
            f"  <img alt=\"strava login\" src=\"data:image/png;base64,{base64_image}\" width=\"100%\">"
            f"</a>"
        ),
        unsafe_allow_html=True,
    )


def logout_header(header=None):
    if header is None:
        base = st
    else:
        _, col2, _, button = header
        base = button


    with col2:
        powered_by_strava_logo()

    if base.button("Log out"):
        webbrowser.open(APP_URL, new=0)


def logged_in_title(strava_auth, header=None):
    if header is None:
        base = st
    else:
        col, _, _, _ = header
        base = col

    first_name = strava_auth["athlete"]["firstname"]
    last_name = strava_auth["athlete"]["lastname"]
    col.markdown(f"*Welcome, {first_name} {last_name}!*")


@st.cache_data
def exchange_authorization_code(authorization_code):
    response = httpx.post(
        url="https://www.strava.com/oauth/token",
        json={
            "client_id": STRAVA_CLIENT_ID,
            "client_secret": STRAVA_CLIENT_SECRET,
            "code": authorization_code,
            "grant_type": "authorization_code",
        }
    )
    try:
        response.raise_for_status()
    except httpx.HTTPStatusError:
        st.error("Something went wrong while authenticating with Strava. Please reload and try again")
        st.experimental_set_query_params()
        st.stop()
        return

    strava_auth = response.json()

    return strava_auth

def authenticate(header=None, stop_if_unauthenticated=True):
    query_params = st.experimental_get_query_params()
    authorization_code = query_params.get("code", [None])[0]

    if authorization_code is None:
        authorization_code = query_params.get("session", [None])[0]

    if authorization_code is None:
        login_header(header=header)
        if stop_if_unauthenticated:
            st.stop()
        return
    else:
        logout_header(header=header)
        strava_auth = exchange_authorization_code(authorization_code)
        logged_in_title(strava_auth, header)
        st.experimental_set_query_params(session=authorization_code)

        return strava_auth


def header():
    col1, col2, col3 = st.columns(3)

    with col3:
        strava_button = st.empty()

    return col1, col2, col3, strava_button

def catch_strava_api_error(response):
    if response.status_code == 200:
        return
    st.write(response.status_code)
    if response.status_code == 401:
        st.error("You are not authorized to access this resource. Please relog yourself.")
        st.stop()
        return
    else:
        st.error(response)
        if response["errors"]:
            st.error(f"Something went wrong while fetching data from Strava. Please reload and try again (error message: {response['message']})")
            st.stop()
            return

def strava_call(auth, uri_params, params = None):
    access_token = auth["access_token"]
    response = httpx.get(
        url=f"{STRAVA_API_BASE_URL}/{uri_params}",
        params=params,
        headers={
            "Authorization": f"Bearer {access_token}",
        },
    )
    catch_strava_api_error(response)
    return response.json()

@st.cache_data
def get_athlete_detail(auth, page=1):
    return strava_call(auth, "athlete")

@st.cache_data
def get_shoes(athlete):
    return athlete["shoes"]

@st.cache_data
def get_activity(activity_id, auth):
    return strava_call(auth, f"activities/{activity_id}")

@st.cache_data
def get_activities_on_period(auth, activities, start_date, end_date, page):
    response = get_activities(auth, page)
    number_added = 0
    for activity in response:
        strava_start_date = datetime.strptime(activity["start_date"], '%Y-%m-%dT%H:%M:%SZ').date()
        if strava_start_date >= start_date and strava_start_date <= end_date:
            activities.append(activity)
            number_added += 1
    if number_added == 0:
        return activities
    else:
        return get_activities_on_period(auth, activities, start_date, end_date, page + 1)

@st.cache_data
def get_activities(auth, page=1):
    return strava_call(auth, f"athlete/activities", params={"page": page})

@st.cache_data
def get_activity_zones(auth, activity_id):
    return strava_call(auth, f"activities/{activity_id}/zones")

@st.cache_data
def get_athlete_zones(auth):
    return strava_call(auth, f"athlete/zones")

# @st.cache_data
# def get_all_activities(auth):
#     page = 1
#     activities = []
#     while True:
#         new_activities = strava_call(auth, f"athlete/activities", params={
#                 "page": page,
#                 "per_page": 200
#             })
#         activities.append(new_activities)
#         if len(new_activities) == 0:
#             break
#         page += 1
#
#     return activities

# def export_all_activities(auth):
#     activities = get_all_activities(auth=auth)
#     st.write(f"You got {len(activities)} activities")
#     with open('activities.json', 'w') as f:
#         json.dump(activities, f)