from datetime import datetime, timedelta import pandas as pd import numpy as np import os import srtm elevation_data = srtm.get_data() import json import geopy from geopy import distance from beaufort_scale.beaufort_scale import beaufort_scale_kmh import requests import requests_cache import openmeteo_requests from retry_requests import retry from dash import Dash, dcc, html, Input, Output, callback, no_update from dash_extensions import Purify import plotly.graph_objects as go ### UPDATE PEAK LIST ### lat = 49.610755 lon = 6.13268 ele = 310 dist = 100 overpass_url = 'https://overpass.private.coffee/api/interpreter' def add_ele(row): if str(int(round(row['altitude'], 0))).isnumeric(): row['altitude'] = row['altitude'] else: row['altitude'] = elevation_data.get_elevation(row['latitude'], row['longitude'], 0) return row def eukarney(lat1, lon1): p1 = (lat1, lon1) p2 = (lat, lon) karney = distance.distance(p1, p2).m return karney def compute_bbox(lat, lon, dist): bearings = [225, 45] origin = geopy.Point(lat, lon) l = [] for bearing in bearings: destination = distance.distance(dist).destination(origin, bearing) coords = destination.latitude, destination.longitude l.extend(coords) return l bbox = compute_bbox(lat, lon, dist) bbox = ','.join(str(x) for x in compute_bbox(lat, lon, dist)) peak_list = 'peak_list.csv' def update_peaks(): overpass_query = '[out:json];(nwr[natural=peak](' + bbox + ');nwr[natural=hill](' + bbox + '););out body;' response = requests.get(overpass_url, params={'data': overpass_query}) response = response.json() peak_dict = {'name': [], 'latitude': [], 'longitude': [], 'altitude': []} for e in response['elements']: peak_dict['latitude'].append(float(e['lat'])) peak_dict['longitude'].append(float(e['lon'])) if 'name' in e['tags'].keys(): peak_dict['name'].append(e['tags']['name']) else: peak_dict['name'].append('Unnamed hill') if 'ele' in e['tags'].keys(): peak_dict['altitude'].append(float(e['tags']['ele'])) else: peak_dict['altitude'].append(elevation_data.get_elevation(e['lat'], e['lon'], 0)) df = pd.DataFrame.from_dict(peak_dict) df = df.apply(lambda x: add_ele(x), axis=1) df['distances'] = df.apply(lambda x: eukarney(x['latitude'], x['longitude']), axis=1).fillna(0) df['altitude'] = df['altitude'].round(0).astype(int) df.to_csv(peak_list, index=False) return df ### WEATHER FORECAST ### cache_session = requests_cache.CachedSession('.cache', expire_after = 3600) retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2) openmeteo = openmeteo_requests.Client(session = retry_session) # Open Meteo weather forecast API url = 'https://api.open-meteo.com/v1/forecast' params = { 'timezone': 'auto', 'hourly': ['temperature_2m', 'is_day', 'rain', 'weather_code', 'wind_speed_10m', 'snow_depth'] } # Load the JSON files mapping weather codes to descriptions and icons with open('weather_icons_custom.json', 'r') as file: icons = json.load(file) # Weather icons URL icon_url = 'https://raw.githubusercontent.com/basmilius/weather-icons/refs/heads/dev/production/fill/svg/' def map_icons(df): code = df['weather_code'] if df['is_day'] == 1: icon = icons[str(code)]['day']['icon'] description = icons[str(code)]['day']['description'] elif df['is_day'] == 0: icon = icons[str(code)]['night']['icon'] description = icons[str(code)]['night']['description'] df['Weather'] = icon_url + icon df['Weather outline'] = description return df # Quantitative pluviometry to natural language def rain_intensity(precipt): if precipt >= 50: rain = 'Extreme rain' elif 50 < precipt <= 16: rain = 'Very heavy rain' elif 4 <= precipt < 16: rain = 'Heavy rain' elif 1 <= precipt < 4: rain = 'Moderate rain' elif 0.25 <= precipt < 1: rain = 'Light rain' elif 0 < precipt < 0.25: rain = 'Light drizzle' else: rain = 'No rain / No info' return rain # Obtain the weather forecast for each waypoint at each specific time def get_weather(df): params['latitude'] = df['latitude'] params['longitude'] = df['longitude'] params['elevation'] = df['altitude'] now = datetime.now() start_period = (now - timedelta(seconds=3600)).strftime('%Y-%m-%dT%H:%M') end_period = now.strftime('%Y-%m-%dT%H:%M') params['start_hour'] = start_period params['end_hour'] = end_period responses = openmeteo.weather_api(url, params=params) # Process first location. Add a for-loop for multiple locations or weather models response = responses[0] # Process hourly data. The order of variables needs to be the same as requested. # currently = response.Current() hourly = response.Hourly() minutely_temperature_2m = hourly.Variables(0).ValuesAsNumpy()[0] is_day = hourly.Variables(1).ValuesAsNumpy()[0] rain = hourly.Variables(2).ValuesAsNumpy()[0] weather_code = hourly.Variables(3).ValuesAsNumpy()[0] minutely_wind_speed_10m = hourly.Variables(4).ValuesAsNumpy()[0] snow_depth = hourly.Variables(5).ValuesAsNumpy()[0] df['Temp (°C)'] = minutely_temperature_2m df['weather_code'] = weather_code df['is_day'] = is_day v_rain_intensity = np.vectorize(rain_intensity) df['Rain level'] = v_rain_intensity(rain) v_beaufort_scale_kmh = np.vectorize(beaufort_scale_kmh) df['Wind level'] = v_beaufort_scale_kmh(minutely_wind_speed_10m, language='en') df['Rain (mm/h)'] = rain df['Wind (km/h)'] = minutely_wind_speed_10m df['Snow depth (cm)'] = (snow_depth * 100).round(1) return df def format_peaks(): if not os.path.isfile(peak_list): update_peaks() today = datetime.today() modified_date = datetime.fromtimestamp(os.path.getmtime(peak_list)) peak_age = today - modified_date if peak_age.days > 30: update_peaks() df = pd.read_csv(peak_list) df = df[df['altitude']>=df['altitude'].quantile(3/4)].copy() df = df.sort_values(by='distances',ascending=True).reset_index(drop=True) df = df.head(600).copy() df = df.apply(lambda x: get_weather(x), axis=1) df['Temp (°C)'] = df['Temp (°C)'].round(0).astype(int).astype(str) + '°C' df['Wind (km/h)'] = df['Wind (km/h)'].round(1).astype(str).replace('0.0', '') df['Rain (mm/h)'] = df['Rain (mm/h)'].round(1).astype(str).replace('0.0', '') df['distances'] = (df['distances'] / 1000).round(1).astype(str) + ' km' df['Snow depth (cm)'] = df['Snow depth (cm)'].astype(str) + ' cm' df['altitude'] = df['altitude'].astype(str) + ' m' df['is_day'] = df['is_day'].astype(int) df['weather_code'] = df['weather_code'].astype(int) df = df.apply(map_icons, axis=1) df['Rain level'] = df['Rain level'].astype(str) df['Wind level'] = df['Wind level'].astype(str) df = df.rename(columns={'distances': 'Distance (km)'}) df['dist_read'] = ('
' +
df['name'] + '
' +
df['altitude'] + ' | ' + df['Distance (km)'] + '
' +
'Snow: ' + df['Snow depth (cm)'] + '
' +
'' + df['Weather outline'] + '
' +
df['Temp (°C)'] + '
' +
df['Rain level'] + '
' +
df['Wind level'])
df = df[(df['Snow depth (cm)'] != '0.0 cm') | (df['Weather outline'].str.lower().str.contains('snow'))].copy()
return df
def snow_color(row):
if row['Snow depth (cm)'] == '0.0 cm':
row['snow_colour'] = 'goldenrod'
else:
row['snow_colour'] = 'aqua'
return row
def plot_fig():
global df
lat_centre = 49.8464
lon_centre = 6.0992
df = format_peaks()
df['snow_colour'] = ''
df = df.apply(lambda row: snow_color(row), axis=1)
fig = go.Figure()
fig.add_trace(go.Scattermap(lon=df['longitude'],
lat=df['latitude'],
mode='markers', marker=dict(size=24, color=df['snow_colour'], opacity=0.8, symbol='circle'),
name='circles'))
fig.add_trace(go.Scattermap(lon=df['longitude'],
lat=df['latitude'],
mode='markers', marker=dict(size=8, opacity=1, symbol='mountain'),
name='peaks'))
fig.update_layout(map_style='open-street-map',
map=dict(center=dict(lat=lat_centre, lon=lon_centre), zoom=8))
fig.update_traces(showlegend=False, hoverinfo='none', hovertemplate=None, selector=({'name': 'circles'}))
fig.update_traces(showlegend=False, hoverinfo='none', hovertemplate=None, selector=({'name': 'peaks'}))
return fig
app = Dash(__name__)
server = app.server
fig = plot_fig()
def serve_layout():
layout = html.Div([
html.Div([dcc.Graph(id='base-figure', figure=fig, clear_on_unhover=True, style={'height': '99vh'})], id='base-figure-div'),
dcc.Tooltip(id='figure-tooltip'),
dcc.Interval(
id='interval-component',
interval=60 * 60 * 1000,
n_intervals=0),
], id='layout-content')
return layout
app.layout = serve_layout
@callback(Output('layout-content', 'children'),
Output('base-figure', 'figure'),
Input('interval-component', 'n_intervals'))
def refresh_layout(n):
global fig
fig = plot_fig()
layout = serve_layout()
return fig, layout
@callback(Output('figure-tooltip', 'show'),
Output('figure-tooltip', 'bbox'),
Output('figure-tooltip', 'children'),
Input('base-figure', 'hoverData'))
def display_hover(hoverData):
if hoverData is None:
return False, no_update, no_update
pt = hoverData['points'][0]
bbox = pt['bbox']
num = pt['pointNumber']
df_row = df.iloc[num].copy()
img_src = df_row['Weather']
txt_src = df_row['dist_read']
children = [html.Div([html.Img(src=img_src, style={'width': '100%'}), Purify(txt_src),],
style={'width': '96px', 'white-space': 'normal'})]
return True, bbox, children
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=7860)