Spaces:
Runtime error
Runtime error
import datetime | |
import os | |
import streamlit as st | |
import numpy as np | |
import math | |
import fastf1 | |
import pandas as pd | |
from fastapi import FastAPI | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse, HTMLResponse | |
from pydantic import BaseModel | |
import functools | |
import available_data | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
import math | |
import numpy as np | |
def smooth_derivative(t_in, v_in): | |
# | |
# Function to compute a smooth estimation of a derivative. | |
# [REF: http://holoborodko.com/pavel/numerical-methods/numerical-derivative/smooth-low-noise-differentiators/] | |
# | |
# Configuration | |
# | |
# Derivative method: two options: 'smooth' or 'centered'. Smooth is more conservative | |
# but helps to supress the very noisy signals. 'centered' is more agressive but more noisy | |
method = "smooth" | |
t = t_in.copy() | |
v = v_in.copy() | |
# (0) Prepare inputs | |
# (0.1) Time needs to be transformed to seconds | |
try: | |
for i in range(0, t.size): | |
t.iloc[i] = t.iloc[i].total_seconds() | |
except: | |
pass | |
t = np.array(t) | |
v = np.array(v) | |
# (0.1) Assert they have the same size | |
assert t.size == v.size | |
# (0.2) Initialize output | |
dvdt = np.zeros(t.size) | |
# (1) Manually compute points out of the stencil | |
# (1.1) First point | |
dvdt[0] = (v[1] - v[0]) / (t[1] - t[0]) | |
# (1.2) Second point | |
dvdt[1] = (v[2] - v[0]) / (t[2] - t[0]) | |
# (1.3) Third point | |
dvdt[2] = (v[3] - v[1]) / (t[3] - t[1]) | |
# (1.4) Last points | |
n = t.size | |
dvdt[n - 1] = (v[n - 1] - v[n - 2]) / (t[n - 1] - t[n - 2]) | |
dvdt[n - 2] = (v[n - 1] - v[n - 3]) / (t[n - 1] - t[n - 3]) | |
dvdt[n - 3] = (v[n - 2] - v[n - 4]) / (t[n - 2] - t[n - 4]) | |
# (2) Compute the rest of the points | |
if method == "smooth": | |
c = [5.0 / 32.0, 4.0 / 32.0, 1.0 / 32.0] | |
for i in range(3, t.size - 3): | |
for j in range(1, 4): | |
dvdt[i] += ( | |
2 * j * c[j - 1] * (v[i + j] - v[i - j]) / | |
(t[i + j] - t[i - j]) | |
) | |
elif method == "centered": | |
for i in range(3, t.size - 2): | |
for j in range(1, 4): | |
dvdt[i] = (v[i + 1] - v[i - 1]) / (t[i + 1] - t[i - 1]) | |
return dvdt | |
def truncated_remainder(dividend, divisor): | |
divided_number = dividend / divisor | |
divided_number = ( | |
-int(-divided_number) if divided_number < 0 else int(divided_number) | |
) | |
remainder = dividend - divisor * divided_number | |
return remainder | |
def transform_to_pipi(input_angle): | |
pi = math.pi | |
revolutions = int((input_angle + np.sign(input_angle) * pi) / (2 * pi)) | |
p1 = truncated_remainder(input_angle + np.sign(input_angle) * pi, 2 * pi) | |
p2 = ( | |
np.sign( | |
np.sign(input_angle) | |
+ 2 | |
* ( | |
np.sign( | |
math.fabs( | |
(truncated_remainder(input_angle + pi, 2 * pi)) / (2 * pi) | |
) | |
) | |
- 1 | |
) | |
) | |
) * pi | |
output_angle = p1 - p2 | |
return output_angle, revolutions | |
def remove_acceleration_outliers(acc): | |
acc_threshold_g = 7.5 | |
if math.fabs(acc[0]) > acc_threshold_g: | |
acc[0] = 0.0 | |
for i in range(1, acc.size - 1): | |
if math.fabs(acc[i]) > acc_threshold_g: | |
acc[i] = acc[i - 1] | |
if math.fabs(acc[-1]) > acc_threshold_g: | |
acc[-1] = acc[-2] | |
return acc | |
def compute_accelerations(telemetry): | |
v = np.array(telemetry["Speed"]) / 3.6 | |
lon_acc = smooth_derivative(telemetry["Time"], v) / 9.81 | |
dx = smooth_derivative(telemetry["Distance"], telemetry["X"]) | |
dy = smooth_derivative(telemetry["Distance"], telemetry["Y"]) | |
theta = np.zeros(dx.size) | |
theta[0] = math.atan2(dy[0], dx[0]) | |
for i in range(0, dx.size): | |
theta[i] = ( | |
theta[i - 1] + | |
transform_to_pipi(math.atan2(dy[i], dx[i]) - theta[i - 1])[0] | |
) | |
kappa = smooth_derivative(telemetry["Distance"], theta) | |
lat_acc = v * v * kappa / 9.81 | |
# Remove outliers | |
lon_acc = remove_acceleration_outliers(lon_acc) | |
lat_acc = remove_acceleration_outliers(lat_acc) | |
return np.round(lon_acc,2), np.round(lat_acc,2) | |
# @st.cache_data | |
def driver_standings() -> any: | |
YEAR = 2023 #datetime.datetime.now().year | |
df = pd.DataFrame( | |
pd.read_html(f"https://www.formula1.com/en/results.html/{YEAR}/drivers.html")[0] | |
) | |
df = df[["Driver", "PTS", "Car"]] | |
# reverse the order | |
df = df.sort_values(by="PTS", ascending=True) | |
# in Driver column only keep the last 3 characters | |
df["Driver"] = df["Driver"].str[:-5] | |
# add colors to the dataframe | |
car_colors = available_data.team_colors(YEAR) | |
df["fill"] = df["Car"].map(car_colors) | |
# remove rows where points is 0 | |
df = df[df["PTS"] != 0] | |
df.reset_index(inplace=True, drop=True) | |
df.rename(columns={"PTS": "Points"}, inplace=True) | |
return {"WDC":df.to_dict("records")} | |
# @st.cache_data | |
async def root(): | |
return HTMLResponse( | |
content="""<iframe src="https://tracinginsights-f1-analysis.hf.space" frameborder="0" style="width:100%; height:100%;" scrolling="yes" allowfullscreen:"yes"></iframe>""", | |
status_code=200) | |
# @st.cache_data | |
def years_available() -> any: | |
# make a list from 2018 to current year | |
current_year = datetime.datetime.now().year | |
years = list(range(2018, current_year+1)) | |
# reverse the list to get the latest year first | |
years.reverse() | |
years = [{"label": str(year), "value": year} for year in years] | |
return {"years": years} | |
# format for events {"events":[{"label":"Saudi Arabian Grand Prix","value":2},{"label":"Bahrain Grand Prix","value":1},{"label":"Pre-Season Testing","value":"t1"}]} | |
# @st.cache_data | |
def events_available(year: int) -> any: | |
# get events available for a given year | |
data = available_data.LatestData(year) | |
events = data.get_events() | |
events = [{"label": event, "value": event} for i, event in enumerate(events)] | |
events.reverse() | |
return {"events": events} | |
# format for sessions {"sessions":[{"label":"FP1","value":"FP1"},{"label":"FP2","value":"FP2"},{"label":"FP3","value":"FP3"},{"label":"Qualifying","value":"Q"},{"label":"Race","value":"R"}]} | |
# @st.cache_data | |
def sessions_available(year: int, event: str | int) -> any: | |
# get sessions available for a given year and event | |
data = available_data.LatestData(year) | |
sessions = data.get_sessions(event) | |
sessions = [{"label": session, "value": session} for session in sessions] | |
return {"sessions": sessions} | |
# format for drivers {"drivers":[{"color":"#fff500","label":"RIC","value":"RIC"},{"color":"#ff8700","label":"NOR","value":"NOR"},{"color":"#c00000","label":"VET","value":"VET"},{"color":"#0082fa","label":"LAT","value":"LAT"},{"color":"#787878","label":"GRO","value":"GRO"},{"color":"#ffffff","label":"GAS","value":"GAS"},{"color":"#f596c8","label":"STR","value":"STR"},{"color":"#787878","label":"MAG","value":"MAG"},{"color":"#0600ef","label":"ALB","value":"ALB"},{"color":"#ffffff","label":"KVY","value":"KVY"},{"color":"#fff500","label":"OCO","value":"OCO"},{"color":"#0600ef","label":"VER","value":"VER"},{"color":"#00d2be","label":"HAM","value":"HAM"},{"color":"#ff8700","label":"SAI","value":"SAI"},{"color":"#00d2be","label":"BOT","value":"BOT"},{"color":"#960000","label":"GIO","value":"GIO"}]} | |
# @st.cache_data | |
def session_drivers(year: int, event: str | int, session: str) -> any: | |
# fastf1.Cache.enable_cache('cache') | |
# get drivers available for a given year, event and session | |
f1session = fastf1.get_session(year, event, session) | |
api_path = f1session.api_path | |
drivers_raw = fastf1.api.driver_info(api_path) | |
drivers = [] | |
for driver in drivers_raw.items(): | |
drivers.append({ | |
"color": available_data.team_colors(year)[driver[1]['TeamName']], | |
"label": driver[1]['Tla'], | |
"value": driver[1]['Tla']}) | |
return {"drivers": drivers} | |
# format for chartData {"chartData":[{"lapnumber":1},{ | |
# "VER":91.564, | |
# "VER_compound":"SOFT", | |
# "VER_compound_color":"#FF5733", | |
# "lapnumber":2 | |
# },{"lapnumber":3},{"VER":90.494,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":4},{"lapnumber":5},{"VER":90.062,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":6},{"lapnumber":7},{"VER":89.815,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":8},{"VER":105.248,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":9},{"lapnumber":10},{"VER":89.79,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":11},{"VER":145.101,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":12},{"lapnumber":13},{"VER":89.662,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":14},{"lapnumber":15},{"VER":89.617,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":16},{"lapnumber":17},{"VER":140.717,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":18}]} | |
# @st.cache_data | |
def laps_data(year: int, event: str | int, session: str, driver: str) -> any: | |
# fastf1.Cache.enable_cache('cache') | |
# get drivers available for a given year, event and session | |
f1session = fastf1.get_session(year, event, session) | |
f1session.load(telemetry=False, weather=False, messages=False) | |
laps = f1session.laps | |
team_colors = available_data.team_colors(year) | |
# add team_colors dict to laps on Team column | |
drivers = laps.Driver.unique() | |
# for each driver in drivers, get the Team column from laps and get the color from team_colors dict | |
drivers = [{"color": team_colors[laps[laps.Driver == | |
driver].Team.iloc[0]], "label": driver, "value": driver} for driver in drivers] | |
driver_laps = laps.pick_driver(driver) | |
driver_laps['LapTime'] = driver_laps['LapTime'].dt.total_seconds() | |
compound_colors = { | |
"SOFT": "#FF0000", | |
"MEDIUM": "#FFFF00", | |
"HARD": "#FFFFFF", | |
"INTERMEDIATE": "#00FF00", | |
"WET": "#088cd0", | |
} | |
driver_laps_data = [] | |
for _, row in driver_laps.iterrows(): | |
if row['LapTime'] > 0: | |
lap = {f"{driver}": row['LapTime'], | |
f"{driver}_compound": row['Compound'], | |
f"{driver}_compound_color": compound_colors[row['Compound']], | |
"lapnumber": row['LapNumber']} | |
else: | |
lap = {"lapnumber": row['LapNumber']} | |
driver_laps_data.append(lap) | |
return {"chartData": driver_laps_data} | |
# @st.cache_data | |
def telemetry_data(year: int, event: str | int, session: str, driver: str, lap_number: int) -> any: | |
# fastf1.Cache.enable_cache('cache') | |
f1session = fastf1.get_session(year, event, session) | |
f1session.load(telemetry=True, weather=False, messages=False) | |
laps = f1session.laps | |
driver_laps = laps.pick_driver(driver) | |
driver_laps['LapTime'] = driver_laps['LapTime'].dt.total_seconds() | |
# get the telemetry for lap_number | |
selected_lap = driver_laps[driver_laps.LapNumber == lap_number] | |
telemetry = selected_lap.get_telemetry() | |
lon_acc, lat_acc = compute_accelerations(telemetry) | |
telemetry["lon_acc"] = lon_acc | |
telemetry["lat_acc"] = lat_acc | |
telemetry['Time'] = telemetry['Time'].dt.total_seconds() | |
laptime = selected_lap.LapTime.values[0] | |
data_key = f"{driver} - Lap {int(lap_number)} - {year} {session} [{int(laptime//60)}:{laptime%60}]" | |
telemetry['DRS'] = telemetry['DRS'].apply(lambda x: 1 if x in [10,12,14] else 0) | |
brake_tel = [] | |
drs_tel = [] | |
gear_tel = [] | |
rpm_tel = [] | |
speed_tel = [] | |
throttle_tel = [] | |
time_tel = [] | |
track_map = [] | |
lon_acc_tel = [] | |
lat_acc_tel = [] | |
for _, row in telemetry.iterrows(): | |
brake = {"x": row['Distance'], | |
"y": row['Brake'], | |
} | |
brake_tel.append(brake) | |
drs = {"x": row['Distance'], | |
"y": row['DRS'], | |
} | |
drs_tel.append(drs) | |
gear = {"x": row['Distance'], | |
"y": row['nGear'], | |
} | |
gear_tel.append(gear) | |
rpm = {"x": row['Distance'], | |
"y": row['RPM'], | |
} | |
rpm_tel.append(rpm) | |
speed = {"x": row['Distance'], | |
"y": row['Speed'], | |
} | |
speed_tel.append(speed) | |
throttle = {"x": row['Distance'], | |
"y": row['Throttle'], | |
} | |
throttle_tel.append(throttle) | |
time = {"x": row['Distance'], | |
"y": row['Time'], | |
} | |
time_tel.append(time) | |
lon_acc = {"x": row['Distance'], | |
"y": row['lon_acc'], | |
} | |
lon_acc_tel.append(lon_acc) | |
lat_acc = {"x": row['Distance'], | |
"y": row['lat_acc'], | |
} | |
lat_acc_tel.append(lat_acc) | |
track = {"x": row['X'], | |
"y": row['Y'], | |
} | |
track_map.append(track) | |
telemetry_data = { | |
"telemetryData":{ | |
"brake": brake_tel, | |
"dataKey": data_key, | |
"drs": drs_tel, | |
"gear": gear_tel, | |
"rpm": rpm_tel, | |
"speed": speed_tel, | |
"throttle": throttle_tel, | |
"time": time_tel, | |
"lon_acc": lon_acc_tel, | |
"lat_acc": lat_acc_tel, | |
"trackMap": track_map, | |
} | |
} | |
return telemetry_data | |