Spaces:
Running
Running
import gradio as gr | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from plotly.subplots import make_subplots | |
import pickle | |
import tropycal.tracks as tracks | |
import pandas as pd | |
import numpy as np | |
import cachetools | |
import functools | |
import hashlib | |
import os | |
from datetime import datetime, timedelta | |
from datetime import date | |
from scipy import stats | |
from scipy.optimize import minimize, curve_fit | |
from sklearn.linear_model import LinearRegression | |
from sklearn.cluster import KMeans | |
from scipy.interpolate import interp1d | |
from fractions import Fraction | |
import statsmodels.api as sm | |
import time | |
import threading | |
import requests | |
from io import StringIO | |
import tempfile | |
import csv | |
from collections import defaultdict | |
import shutil | |
import filecmp | |
import warnings | |
warnings.filterwarnings('ignore') | |
# Constants | |
DATA_PATH = os.getcwd() | |
ONI_DATA_PATH = os.path.join(DATA_PATH, 'oni_data.csv') | |
TYPHOON_DATA_PATH = os.path.join(DATA_PATH, 'processed_typhoon_data.csv') | |
LOCAL_iBtrace_PATH = os.path.join(DATA_PATH, 'ibtracs.WP.list.v04r00.csv') | |
iBtrace_uri = 'https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.WP.list.v04r00.csv' | |
CACHE_FILE = 'ibtracs_cache.pkl' | |
CACHE_EXPIRY_DAYS = 1 | |
# Color mappings | |
COLOR_MAP = { | |
'C5 Super Typhoon': 'rgb(255, 0, 0)', | |
'C4 Very Strong Typhoon': 'rgb(255, 63, 0)', | |
'C3 Strong Typhoon': 'rgb(255, 127, 0)', | |
'C2 Typhoon': 'rgb(255, 191, 0)', | |
'C1 Typhoon': 'rgb(255, 255, 0)', | |
'Tropical Storm': 'rgb(0, 255, 255)', | |
'Tropical Depression': 'rgb(173, 216, 230)' | |
} | |
class TyphoonAnalyzer: | |
def __init__(self): | |
self.last_oni_update = None | |
self.ensure_data_files_exist() | |
self.load_initial_data() | |
def ensure_data_files_exist(self): | |
"""Ensure all required data files exist before loading""" | |
print("Checking and downloading required data files...") | |
# Create data directory if it doesn't exist | |
os.makedirs(DATA_PATH, exist_ok=True) | |
# Download ONI data if it doesn't exist | |
if not os.path.exists(ONI_DATA_PATH): | |
print("Downloading ONI data...") | |
url = "https://www.cpc.ncep.noaa.gov/data/indices/oni.ascii.txt" | |
temp_file = os.path.join(DATA_PATH, "temp_oni.ascii.txt") | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
with open(temp_file, 'wb') as f: | |
f.write(response.content) | |
self.convert_oni_ascii_to_csv(temp_file, ONI_DATA_PATH) | |
print("ONI data downloaded and converted successfully") | |
except Exception as e: | |
print(f"Error downloading ONI data: {e}") | |
raise | |
finally: | |
if os.path.exists(temp_file): | |
os.remove(temp_file) | |
# Download IBTrACS data if it doesn't exist | |
if not os.path.exists(LOCAL_iBtrace_PATH): | |
print("Downloading IBTrACS data...") | |
try: | |
response = requests.get(iBtrace_uri) | |
response.raise_for_status() | |
with open(LOCAL_iBtrace_PATH, 'w') as f: | |
f.write(response.text) | |
print("IBTrACS data downloaded successfully") | |
except Exception as e: | |
print(f"Error downloading IBTrACS data: {e}") | |
raise | |
# Create processed typhoon data if it doesn't exist | |
if not os.path.exists(TYPHOON_DATA_PATH): | |
print("Processing typhoon data...") | |
try: | |
self.convert_typhoondata(LOCAL_iBtrace_PATH, TYPHOON_DATA_PATH) | |
print("Typhoon data processed successfully") | |
except Exception as e: | |
print(f"Error processing typhoon data: {e}") | |
raise | |
print("All required data files are ready") | |
def load_initial_data(self): | |
"""Initialize all required data""" | |
print("Loading initial data...") | |
self.update_oni_data() | |
self.oni_df = self.fetch_oni_data_from_csv() | |
self.ibtracs = self.load_ibtracs_data() | |
self.update_typhoon_data() | |
self.oni_data, self.typhoon_data = self.load_data() | |
self.oni_long = self.process_oni_data(self.oni_data) | |
self.typhoon_max = self.process_typhoon_data(self.typhoon_data) | |
self.merged_data = self.merge_data() | |
print("Initial data loading complete") | |
def convert_typhoondata(self, input_file, output_file): | |
"""Convert IBTrACS data to processed format""" | |
print(f"Converting typhoon data from {input_file} to {output_file}") | |
with open(input_file, 'r') as infile: | |
# Skip the header lines | |
next(infile) | |
next(infile) | |
reader = csv.reader(infile) | |
sid_data = defaultdict(list) | |
for row in reader: | |
if not row: # Skip blank lines | |
continue | |
sid = row[0] | |
iso_time = row[6] | |
sid_data[sid].append((row, iso_time)) | |
with open(output_file, 'w', newline='') as outfile: | |
fieldnames = ['SID', 'ISO_TIME', 'LAT', 'LON', 'SEASON', 'NAME', | |
'WMO_WIND', 'WMO_PRES', 'USA_WIND', 'USA_PRES', | |
'START_DATE', 'END_DATE'] | |
writer = csv.DictWriter(outfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for sid, data in sid_data.items(): | |
start_date = min(data, key=lambda x: x[1])[1] | |
end_date = max(data, key=lambda x: x[1])[1] | |
for row, iso_time in data: | |
writer.writerow({ | |
'SID': row[0], | |
'ISO_TIME': iso_time, | |
'LAT': row[8], | |
'LON': row[9], | |
'SEASON': row[1], | |
'NAME': row[5], | |
'WMO_WIND': row[10].strip() or ' ', | |
'WMO_PRES': row[11].strip() or ' ', | |
'USA_WIND': row[23].strip() or ' ', | |
'USA_PRES': row[24].strip() or ' ', | |
'START_DATE': start_date, | |
'END_DATE': end_date | |
}) | |
def fetch_oni_data_from_csv(self): | |
"""Load ONI data from CSV""" | |
df = pd.read_csv(ONI_DATA_PATH) | |
df = df.melt(id_vars=['Year'], var_name='Month', value_name='ONI') | |
# Convert month numbers to month names | |
month_map = { | |
'01': 'Jan', '02': 'Feb', '03': 'Mar', '04': 'Apr', | |
'05': 'May', '06': 'Jun', '07': 'Jul', '08': 'Aug', | |
'09': 'Sep', '10': 'Oct', '11': 'Nov', '12': 'Dec' | |
} | |
df['Month'] = df['Month'].map(month_map) | |
# Now create the date | |
df['Date'] = pd.to_datetime(df['Year'].astype(str) + df['Month'], format='%Y%b') | |
return df.set_index('Date') | |
def update_oni_data(self): | |
"""Update ONI data from NOAA""" | |
if not self._should_update_oni(): | |
return | |
url = "https://www.cpc.ncep.noaa.gov/data/indices/oni.ascii.txt" | |
with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
temp_file.write(response.content) | |
self.convert_oni_ascii_to_csv(temp_file.name, ONI_DATA_PATH) | |
self.last_oni_update = date.today() | |
except Exception as e: | |
print(f"Error updating ONI data: {e}") | |
finally: | |
if os.path.exists(temp_file.name): | |
os.remove(temp_file.name) | |
def _should_update_oni(self): | |
"""Check if ONI data should be updated""" | |
today = datetime.now() | |
return (today.day in [1, 15] or | |
today.day == (today.replace(day=1, month=today.month%12+1) - timedelta(days=1)).day) | |
def convert_oni_ascii_to_csv(self, input_file, output_file): | |
"""Convert ONI ASCII data to CSV format""" | |
data = defaultdict(lambda: [''] * 12) | |
season_to_month = { | |
'DJF': 12, 'JFM': 1, 'FMA': 2, 'MAM': 3, 'AMJ': 4, 'MJJ': 5, | |
'JJA': 6, 'JAS': 7, 'ASO': 8, 'SON': 9, 'OND': 10, 'NDJ': 11 | |
} | |
with open(input_file, 'r') as f: | |
next(f) # Skip header | |
for line in f: | |
parts = line.split() | |
if len(parts) >= 4: | |
season, year, anom = parts[0], parts[1], parts[-1] | |
if season in season_to_month: | |
month = season_to_month[season] | |
if season == 'DJF': | |
year = str(int(year) - 1) | |
data[year][month-1] = anom | |
with open(output_file, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(['Year'] + [f"{m:02d}" for m in range(1, 13)]) | |
for year in sorted(data.keys()): | |
writer.writerow([year] + data[year]) | |
def load_ibtracs_data(self): | |
"""Load IBTrACS data with caching""" | |
if os.path.exists(CACHE_FILE): | |
cache_time = datetime.fromtimestamp(os.path.getmtime(CACHE_FILE)) | |
if datetime.now() - cache_time < timedelta(days=CACHE_EXPIRY_DAYS): | |
with open(CACHE_FILE, 'rb') as f: | |
return pickle.load(f) | |
if os.path.exists(LOCAL_iBtrace_PATH): | |
ibtracs = tracks.TrackDataset(basin='west_pacific', source='ibtracs', | |
ibtracs_url=LOCAL_iBtrace_PATH) | |
else: | |
response = requests.get(iBtrace_uri) | |
response.raise_for_status() | |
with open(LOCAL_iBtrace_PATH, 'w') as f: | |
f.write(response.text) | |
ibtracs = tracks.TrackDataset(basin='west_pacific', source='ibtracs', | |
ibtracs_url=LOCAL_iBtrace_PATH) | |
with open(CACHE_FILE, 'wb') as f: | |
pickle.dump(ibtracs, f) | |
return ibtracs | |
def update_typhoon_data(self): | |
"""Update typhoon data from IBTrACS""" | |
try: | |
response = requests.head(iBtrace_uri) | |
remote_modified = datetime.strptime(response.headers['Last-Modified'], | |
'%a, %d %b %Y %H:%M:%S GMT') | |
local_modified = (datetime.fromtimestamp(os.path.getmtime(LOCAL_iBtrace_PATH)) | |
if os.path.exists(LOCAL_iBtrace_PATH) else datetime.min) | |
if remote_modified > local_modified: | |
response = requests.get(iBtrace_uri) | |
response.raise_for_status() | |
with open(LOCAL_iBtrace_PATH, 'w') as f: | |
f.write(response.text) | |
print("Typhoon data updated successfully") | |
except Exception as e: | |
print(f"Error updating typhoon data: {e}") | |
def load_data(self): | |
"""Load ONI and typhoon data""" | |
oni_data = pd.read_csv(ONI_DATA_PATH) | |
typhoon_data = pd.read_csv(TYPHOON_DATA_PATH, low_memory=False) | |
typhoon_data['ISO_TIME'] = pd.to_datetime(typhoon_data['ISO_TIME']) | |
return oni_data, typhoon_data | |
def process_oni_data(self, oni_data): | |
"""Process ONI data""" | |
oni_long = oni_data.melt(id_vars=['Year'], var_name='Month', value_name='ONI') | |
# Create a mapping for month numbers | |
month_map = { | |
'01': 1, '02': 2, '03': 3, '04': 4, | |
'05': 5, '06': 6, '07': 7, '08': 8, | |
'09': 9, '10': 10, '11': 11, '12': 12 | |
} | |
# Convert month strings to numbers directly | |
oni_long['Month'] = oni_long['Month'].map(month_map) | |
return oni_long | |
def process_typhoon_data(self, typhoon_data): | |
"""Process typhoon data""" | |
typhoon_data['USA_WIND'] = pd.to_numeric(typhoon_data['USA_WIND'], errors='coerce') | |
typhoon_data['WMO_PRES'] = pd.to_numeric(typhoon_data['WMO_PRES'], errors='coerce') | |
typhoon_data['ISO_TIME'] = pd.to_datetime(typhoon_data['ISO_TIME']) | |
typhoon_data['Year'] = typhoon_data['ISO_TIME'].dt.year | |
typhoon_data['Month'] = typhoon_data['ISO_TIME'].dt.month | |
typhoon_max = typhoon_data.groupby(['SID', 'Year', 'Month']).agg({ | |
'USA_WIND': 'max', | |
'WMO_PRES': 'min', | |
'NAME': 'first', | |
'LAT': 'first', | |
'LON': 'first', | |
'ISO_TIME': 'first' | |
}).reset_index() | |
typhoon_max['Category'] = typhoon_max['USA_WIND'].apply(self.categorize_typhoon) | |
return typhoon_max | |
def merge_data(self): | |
"""Merge ONI and typhoon data""" | |
return pd.merge(self.typhoon_max, self.oni_long, on=['Year', 'Month']) | |
def categorize_typhoon(self, wind_speed): | |
"""Categorize typhoon based on wind speed""" | |
if np.isnan(wind_speed): | |
return 'Unknown' | |
if wind_speed >= 137: | |
return 'C5 Super Typhoon' | |
elif wind_speed >= 113: | |
return 'C4 Very Strong Typhoon' | |
elif wind_speed >= 96: | |
return 'C3 Strong Typhoon' | |
elif wind_speed >= 83: | |
return 'C2 Typhoon' | |
elif wind_speed >= 64: | |
return 'C1 Typhoon' | |
elif wind_speed >= 34: | |
return 'Tropical Storm' | |
else: | |
return 'Tropical Depression' | |
def analyze_typhoon(self, start_year, start_month, end_year, end_month, enso_value='all'): | |
"""Main analysis function""" | |
start_date = datetime(start_year, start_month, 1) | |
end_date = datetime(end_year, end_month, 28) | |
filtered_data = self.merged_data[ | |
(self.merged_data['ISO_TIME'] >= start_date) & | |
(self.merged_data['ISO_TIME'] <= end_date) | |
] | |
if enso_value != 'all': | |
filtered_data = filtered_data[ | |
(filtered_data['ONI'] >= 0.5 if enso_value == 'el_nino' else | |
filtered_data['ONI'] <= -0.5 if enso_value == 'la_nina' else | |
(filtered_data['ONI'] > -0.5) & (filtered_data['ONI'] < 0.5)) | |
] | |
return { | |
'tracks': self.create_tracks_plot(filtered_data), | |
'wind': self.create_wind_analysis(filtered_data), | |
'pressure': self.create_pressure_analysis(filtered_data), | |
'stats': self.generate_statistics(filtered_data) | |
} | |
def create_tracks_plot(self, data): | |
"""Create typhoon tracks visualization""" | |
fig = go.Figure() | |
fig.update_layout( | |
title={ | |
'text': 'Typhoon Tracks', | |
'y':0.95, | |
'x':0.5, | |
'xanchor': 'center', | |
'yanchor': 'top' | |
}, | |
showlegend=True, | |
legend=dict( | |
yanchor="top", | |
y=0.99, | |
xanchor="left", | |
x=0.01, | |
bgcolor='rgba(255, 255, 255, 0.8)' | |
), | |
geo=dict( | |
projection_type='mercator', | |
showland=True, | |
showcoastlines=True, | |
landcolor='rgb(243, 243, 243)', | |
countrycolor='rgb(204, 204, 204)', | |
coastlinecolor='rgb(214, 214, 214)', | |
showocean=True, | |
oceancolor='rgb(230, 250, 255)', | |
showlakes=True, | |
lakecolor='rgb(230, 250, 255)', | |
lataxis=dict(range=[0, 50]), | |
lonaxis=dict(range=[100, 180]), | |
center=dict(lat=20, lon=140), | |
bgcolor='rgba(255, 255, 255, 0.5)' | |
), | |
paper_bgcolor='rgba(255, 255, 255, 0.5)', | |
plot_bgcolor='rgba(255, 255, 255, 0.5)' | |
) | |
for category in COLOR_MAP.keys(): | |
category_data = data[data['Category'] == category] | |
for _, storm in category_data.groupby('SID'): | |
track_data = self.typhoon_data[self.typhoon_data['SID'] == storm['SID'].iloc[0]] | |
track_data = track_data.sort_values('ISO_TIME') | |
fig.add_trace(go.Scattergeo( | |
lon=track_data['LON'], | |
lat=track_data['LAT'], | |
mode='lines', | |
line=dict( | |
width=2, | |
color=COLOR_MAP[category] | |
), | |
name=category, | |
legendgroup=category, | |
showlegend=True if storm.iloc[0]['SID'] == category_data.iloc[0]['SID'] else False, | |
hovertemplate=( | |
f"Name: {storm['NAME'].iloc[0]}<br>" + | |
f"Category: {category}<br>" + | |
f"Wind Speed: {storm['USA_WIND'].iloc[0]:.1f} kt<br>" + | |
f"Pressure: {storm['WMO_PRES'].iloc[0]:.1f} hPa<br>" + | |
f"Date: {track_data['ISO_TIME'].dt.strftime('%Y-%m-%d %H:%M').iloc[0]}<br>" + | |
f"Lat: {track_data['LAT'].iloc[0]:.2f}°N<br>" + | |
f"Lon: {track_data['LON'].iloc[0]:.2f}°E<br>" + | |
"<extra></extra>" | |
) | |
)) | |
return fig | |
def create_wind_analysis(self, data): | |
"""Create wind speed analysis plot""" | |
fig = px.scatter(data, | |
x='ONI', | |
y='USA_WIND', | |
color='Category', | |
color_discrete_map=COLOR_MAP, | |
title='Wind Speed vs ONI Index', | |
labels={ | |
'ONI': 'Oceanic Niño Index', | |
'USA_WIND': 'Maximum Wind Speed (kt)' | |
}, | |
hover_data=['NAME', 'ISO_TIME', 'Category'] | |
) | |
# Add regression line | |
x = data['ONI'] | |
y = data['USA_WIND'] | |
slope, intercept = np.polyfit(x, y, 1) | |
fig.add_trace( | |
go.Scatter( | |
x=x, | |
y=slope * x + intercept, | |
mode='lines', | |
name=f'Regression (slope={slope:.2f})', | |
line=dict(color='black', dash='dash') | |
) | |
) | |
return fig | |
def create_pressure_analysis(self, data): | |
"""Create pressure analysis plot""" | |
fig = px.scatter(data, | |
x='ONI', | |
y='WMO_PRES', | |
color='Category', | |
color_discrete_map=COLOR_MAP, | |
title='Pressure vs ONI Index', | |
labels={ | |
'ONI': 'Oceanic Niño Index', | |
'WMO_PRES': 'Minimum Pressure (hPa)' | |
}, | |
hover_data=['NAME', 'ISO_TIME', 'Category'] | |
) | |
# Add regression line | |
x = data['ONI'] | |
y = data['WMO_PRES'] | |
slope, intercept = np.polyfit(x, y, 1) | |
fig.add_trace( | |
go.Scatter( | |
x=x, | |
y=slope * x + intercept, | |
mode='lines', | |
name=f'Regression (slope={slope:.2f})', | |
line=dict(color='black', dash='dash') | |
) | |
) | |
return fig | |
def generate_statistics(self, data): | |
"""Generate statistical summary""" | |
stats = { | |
'total_typhoons': len(data['SID'].unique()), | |
'avg_wind': data['USA_WIND'].mean(), | |
'max_wind': data['USA_WIND'].max(), | |
'avg_pressure': data['WMO_PRES'].mean(), | |
'min_pressure': data['WMO_PRES'].min(), | |
'oni_correlation_wind': data['ONI'].corr(data['USA_WIND']), | |
'oni_correlation_pressure': data['ONI'].corr(data['WMO_PRES']), | |
'category_counts': data['Category'].value_counts().to_dict() | |
} | |
return f""" | |
### Statistical Summary | |
- Total Typhoons: {stats['total_typhoons']} | |
- Average Wind Speed: {stats['avg_wind']:.2f} kt | |
- Maximum Wind Speed: {stats['max_wind']:.2f} kt | |
- Average Pressure: {stats['avg_pressure']:.2f} hPa | |
- Minimum Pressure: {stats['min_pressure']:.2f} hPa | |
- ONI-Wind Speed Correlation: {stats['oni_correlation_wind']:.3f} | |
- ONI-Pressure Correlation: {stats['oni_correlation_pressure']:.3f} | |
### Category Distribution | |
{chr(10).join(f'- {cat}: {count}' for cat, count in stats['category_counts'].items())} | |
""" | |
def analyze_clusters(self, year, n_clusters): | |
"""Analyze typhoon clusters for a specific year""" | |
year_data = self.typhoon_data[self.typhoon_data['SEASON'] == year] | |
if year_data.empty: | |
return go.Figure(), "No data available for selected year" | |
# Prepare data for clustering | |
routes = [] | |
for _, storm in year_data.groupby('SID'): | |
if len(storm) > 1: | |
# Standardize route length | |
t = np.linspace(0, 1, len(storm)) | |
t_new = np.linspace(0, 1, 100) | |
lon_interp = interp1d(t, storm['LON'], kind='linear')(t_new) | |
lat_interp = interp1d(t, storm['LAT'], kind='linear')(t_new) | |
routes.append(np.column_stack((lon_interp, lat_interp))) | |
if len(routes) < n_clusters: | |
return go.Figure(), f"Not enough typhoons ({len(routes)}) for {n_clusters} clusters" | |
# Perform clustering | |
routes_array = np.array(routes) | |
routes_reshaped = routes_array.reshape(routes_array.shape[0], -1) | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42) | |
clusters = kmeans.fit_predict(routes_reshaped) | |
# Create visualization | |
fig = go.Figure() | |
# Set layout | |
fig.update_layout( | |
title=f'Typhoon Route Clusters ({year})', | |
showlegend=True, | |
geo=dict( | |
projection_type='mercator', | |
showland=True, | |
showcoastlines=True, | |
landcolor='rgb(243, 243, 243)', | |
countrycolor='rgb(204, 204, 204)', | |
coastlinecolor='rgb(214, 214, 214)', | |
showocean=True, | |
oceancolor='rgb(230, 250, 255)', | |
lataxis=dict(range=[0, 50]), | |
lonaxis=dict(range=[100, 180]), | |
center=dict(lat=20, lon=140) | |
) | |
) | |
# Plot routes colored by cluster | |
for route, cluster_id in zip(routes, clusters): | |
fig.add_trace(go.Scattergeo( | |
lon=route[:, 0], | |
lat=route[:, 1], | |
mode='lines', | |
line=dict( | |
width=1, | |
color=f'hsl({cluster_id * 360/n_clusters}, 50%, 50%)' | |
), | |
name=f'Cluster {cluster_id + 1}', | |
showlegend=False | |
)) | |
# Plot cluster centers | |
for i in range(n_clusters): | |
center = kmeans.cluster_centers_[i].reshape(-1, 2) | |
fig.add_trace(go.Scattergeo( | |
lon=center[:, 0], | |
lat=center[:, 1], | |
mode='lines', | |
name=f'Cluster {i+1} Center', | |
line=dict( | |
width=3, | |
color=f'hsl({i * 360/n_clusters}, 100%, 50%)' | |
) | |
)) | |
# Generate statistics text | |
stats_text = "### Clustering Results\n\n" | |
cluster_counts = np.bincount(clusters) | |
for i in range(n_clusters): | |
stats_text += f"- Cluster {i+1}: {cluster_counts[i]} typhoons\n" | |
return fig, stats_text | |
def get_typhoons_for_year(self, year): | |
"""Get list of typhoons for a specific year""" | |
try: | |
season = self.ibtracs.get_season(year) | |
storm_summary = season.summary() | |
typhoon_options = [] | |
for i in range(storm_summary['season_storms']): | |
storm_id = storm_summary['id'][i] | |
storm_name = storm_summary['name'][i] | |
typhoon_options.append({'label': f"{storm_name} ({storm_id})", 'value': storm_id}) | |
return typhoon_options | |
except Exception as e: | |
print(f"Error getting typhoons for year {year}: {str(e)}") | |
return [] | |
def create_typhoon_animation(self, year, storm_id, standard='atlantic'): | |
"""Create animated visualization of typhoon path""" | |
if not storm_id: | |
return go.Figure(), "Please select a typhoon" | |
storm = self.ibtracs.get_storm(storm_id) | |
fig = go.Figure() | |
# Base map setup with correct scaling | |
fig.update_layout( | |
title=f"{year} - {storm.name} Typhoon Path", | |
geo=dict( | |
projection_type='natural earth', | |
showland=True, | |
landcolor='rgb(243, 243, 243)', | |
countrycolor='rgb(204, 204, 204)', | |
coastlinecolor='rgb(100, 100, 100)', | |
showocean=True, | |
oceancolor='rgb(230, 250, 255)', | |
lataxis=dict(range=[0, 50]), | |
lonaxis=dict(range=[100, 180]), | |
center=dict(lat=20, lon=140), | |
), | |
updatemenus=[{ | |
"buttons": [ | |
{ | |
"args": [None, {"frame": {"duration": 100, "redraw": True}, | |
"fromcurrent": True, | |
"transition": {"duration": 0}}], | |
"label": "Play", | |
"method": "animate" | |
}, | |
{ | |
"args": [[None], {"frame": {"duration": 0, "redraw": True}, | |
"mode": "immediate", | |
"transition": {"duration": 0}}], | |
"label": "Pause", | |
"method": "animate" | |
} | |
], | |
"direction": "left", | |
"pad": {"r": 10, "t": 87}, | |
"showactive": False, | |
"type": "buttons", | |
"x": 0.1, | |
"xanchor": "right", | |
"y": 0, | |
"yanchor": "top" | |
}] | |
) | |
# Create animation frames | |
frames = [] | |
for i in range(len(storm.time)): | |
category, color = self.categorize_typhoon_by_standard(storm.vmax[i], standard) | |
# Get extra radius data if available | |
radius_info = "" | |
if hasattr(storm, 'dict'): | |
r34_ne = storm.dict.get('USA_R34_NE', [None])[i] if 'USA_R34_NE' in storm.dict else None | |
r34_se = storm.dict.get('USA_R34_SE', [None])[i] if 'USA_R34_SE' in storm.dict else None | |
r34_sw = storm.dict.get('USA_R34_SW', [None])[i] if 'USA_R34_SW' in storm.dict else None | |
r34_nw = storm.dict.get('USA_R34_NW', [None])[i] if 'USA_R34_NW' in storm.dict else None | |
rmw = storm.dict.get('USA_RMW', [None])[i] if 'USA_RMW' in storm.dict else None | |
eye = storm.dict.get('USA_EYE', [None])[i] if 'USA_EYE' in storm.dict else None | |
if any([r34_ne, r34_se, r34_sw, r34_nw, rmw, eye]): | |
radius_info = f"<br>R34: NE={r34_ne}, SE={r34_se}, SW={r34_sw}, NW={r34_nw}<br>" | |
radius_info += f"RMW: {rmw}<br>Eye Diameter: {eye}" | |
frame = go.Frame( | |
data=[ | |
go.Scattergeo( | |
lon=storm.lon[:i+1], | |
lat=storm.lat[:i+1], | |
mode='lines', | |
line=dict(width=2, color='blue'), | |
name='Path Traveled', | |
showlegend=False, | |
), | |
go.Scattergeo( | |
lon=[storm.lon[i]], | |
lat=[storm.lat[i]], | |
mode='markers+text', | |
marker=dict(size=10, color=color, symbol='star'), | |
text=category, | |
textposition="top center", | |
textfont=dict(size=12, color=color), | |
name='Current Location', | |
hovertemplate=( | |
f"{storm.time[i].strftime('%Y-%m-%d %H:%M')}<br>" | |
f"Category: {category}<br>" | |
f"Wind Speed: {storm.vmax[i]:.1f} kt<br>" | |
f"{radius_info}" | |
), | |
), | |
],name=f"frame{i}" | |
) | |
frames.append(frame) | |
fig.frames = frames | |
# Add initial track and starting point | |
fig.add_trace( | |
go.Scattergeo( | |
lon=storm.lon, | |
lat=storm.lat, | |
mode='lines', | |
line=dict(width=2, color='gray'), | |
name='Complete Path', | |
showlegend=True, | |
) | |
) | |
fig.add_trace( | |
go.Scattergeo( | |
lon=[storm.lon[0]], | |
lat=[storm.lat[0]], | |
mode='markers', | |
marker=dict(size=10, color='green', symbol='star'), | |
name='Starting Point', | |
text=storm.time[0].strftime('%Y-%m-%d %H:%M'), | |
hoverinfo='text+name', | |
) | |
) | |
# Add slider for frame selection | |
sliders = [{ | |
"active": 0, | |
"yanchor": "top", | |
"xanchor": "left", | |
"currentvalue": { | |
"font": {"size": 20}, | |
"prefix": "Time: ", | |
"visible": True, | |
"xanchor": "right" | |
}, | |
"transition": {"duration": 100, "easing": "cubic-in-out"}, | |
"pad": {"b": 10, "t": 50}, | |
"len": 0.9, | |
"x": 0.1, | |
"y": 0, | |
"steps": [ | |
{ | |
"args": [[f"frame{k}"], | |
{"frame": {"duration": 100, "redraw": True}, | |
"mode": "immediate", | |
"transition": {"duration": 0}} | |
], | |
"label": storm.time[k].strftime('%Y-%m-%d %H:%M'), | |
"method": "animate" | |
} | |
for k in range(len(storm.time)) | |
] | |
}] | |
fig.update_layout(sliders=sliders) | |
info_text = f""" | |
### Typhoon Information | |
- **Name:** {storm.name} | |
- **Start Date:** {storm.time[0].strftime('%Y-%m-%d %H:%M')} | |
- **End Date:** {storm.time[-1].strftime('%Y-%m-%d %H:%M')} | |
- **Duration:** {(storm.time[-1] - storm.time[0]).total_seconds() / 3600:.1f} hours | |
- **Maximum Wind Speed:** {max(storm.vmax):.1f} kt | |
- **Minimum Pressure:** {min(storm.mslp):.1f} hPa | |
- **Peak Category:** {self.categorize_typhoon_by_standard(max(storm.vmax), standard)[0]} | |
""" | |
return fig, info_text | |
def search_typhoons(self, query): | |
"""Search for typhoons by name""" | |
if not query: | |
return go.Figure(), "Please enter a typhoon name to search" | |
# Find all typhoons matching the query | |
matching_storms = [] | |
# Limit search to last 30 years to improve performance | |
current_year = datetime.now().year | |
start_year = current_year - 30 | |
for year in range(start_year, current_year + 1): | |
try: | |
season = self.ibtracs.get_season(year) | |
for storm_id in season.summary()['id']: | |
storm = self.ibtracs.get_storm(storm_id) | |
if query.lower() in storm.name.lower(): | |
matching_storms.append((year, storm)) | |
except Exception as e: | |
print(f"Error searching year {year}: {str(e)}") | |
continue | |
if not matching_storms: | |
return go.Figure(), "No typhoons found matching your search" | |
# Create visualization of all matching typhoons | |
fig = go.Figure() | |
fig.update_layout( | |
title=f"Typhoons Matching: '{query}'", | |
geo=dict( | |
projection_type='natural earth', | |
showland=True, | |
landcolor='rgb(243, 243, 243)', | |
countrycolor='rgb(204, 204, 204)', | |
coastlinecolor='rgb(100, 100, 100)', | |
showocean=True, | |
oceancolor='rgb(230, 250, 255)', | |
lataxis=dict(range=[0, 50]), | |
lonaxis=dict(range=[100, 180]), | |
center=dict(lat=20, lon=140), | |
) | |
) | |
# Plot each matching storm with a different color | |
colors = px.colors.qualitative.Plotly | |
for i, (year, storm) in enumerate(matching_storms): | |
color = colors[i % len(colors)] | |
fig.add_trace(go.Scattergeo( | |
lon=storm.lon, | |
lat=storm.lat, | |
mode='lines', | |
line=dict(width=3, color=color), | |
name=f"{storm.name} ({year})", | |
hovertemplate=( | |
f"Name: {storm.name}<br>" | |
f"Year: {year}<br>" | |
f"Max Wind: {max(storm.vmax):.1f} kt<br>" | |
f"Min Pressure: {min(storm.mslp):.1f} hPa<br>" | |
f"Position: %{lat:.2f}°N, %{lon:.2f}°E" | |
) | |
)) | |
# Add starting points | |
for i, (year, storm) in enumerate(matching_storms): | |
color = colors[i % len(colors)] | |
fig.add_trace(go.Scattergeo( | |
lon=[storm.lon[0]], | |
lat=[storm.lat[0]], | |
mode='markers', | |
marker=dict(size=10, color=color, symbol='circle'), | |
name=f"Start: {storm.name} ({year})", | |
showlegend=False, | |
hoverinfo='name' | |
)) | |
# Create information text | |
info_text = f"### Found {len(matching_storms)} typhoons matching '{query}':\n\n" | |
for year, storm in matching_storms: | |
info_text += f"- **{storm.name} ({year})**\n" | |
info_text += f" - Time: {storm.time[0].strftime('%Y-%m-%d')} to {storm.time[-1].strftime('%Y-%m-%d')}\n" | |
info_text += f" - Max Wind: {max(storm.vmax):.1f} kt\n" | |
info_text += f" - Min Pressure: {min(storm.mslp):.1f} hPa\n" | |
info_text += f" - Category: {self.categorize_typhoon_by_standard(max(storm.vmax))[0]}\n\n" | |
return fig, info_text | |
def categorize_typhoon_by_standard(self, wind_speed, standard='atlantic'): | |
""" | |
Categorize typhoon based on wind speed and chosen standard | |
wind_speed is in knots | |
""" | |
if standard == 'taiwan': | |
# Convert knots to m/s for Taiwan standard | |
wind_speed_ms = wind_speed * 0.514444 | |
if wind_speed_ms >= 51.0: | |
return 'Strong Typhoon', 'rgb(255, 0, 0)' | |
elif wind_speed_ms >= 33.7: | |
return 'Medium Typhoon', 'rgb(255, 127, 0)' | |
elif wind_speed_ms >= 17.2: | |
return 'Mild Typhoon', 'rgb(255, 255, 0)' | |
else: | |
return 'Tropical Depression', 'rgb(173, 216, 230)' | |
else: | |
# Atlantic standard uses knots | |
if wind_speed >= 137: | |
return 'C5 Super Typhoon', 'rgb(255, 0, 0)' | |
elif wind_speed >= 113: | |
return 'C4 Very Strong Typhoon', 'rgb(255, 63, 0)' | |
elif wind_speed >= 96: | |
return 'C3 Strong Typhoon', 'rgb(255, 127, 0)' | |
elif wind_speed >= 83: | |
return 'C2 Typhoon', 'rgb(255, 191, 0)' | |
elif wind_speed >= 64: | |
return 'C1 Typhoon', 'rgb(255, 255, 0)' | |
elif wind_speed >= 34: | |
return 'Tropical Storm', 'rgb(0, 255, 255)' | |
else: | |
return 'Tropical Depression', 'rgb(173, 216, 230)' | |
def create_interface(): | |
analyzer = TyphoonAnalyzer() | |
with gr.Blocks(title="Typhoon Analysis Dashboard", theme=gr.themes.Base()) as demo: | |
gr.Markdown("# Typhoon Analysis Dashboard") | |
with gr.Tabs(): | |
# Main Analysis Tab | |
with gr.Tab("Main Analysis"): | |
with gr.Row(): | |
with gr.Column(): | |
start_year = gr.Slider(1900, 2024, 2000, label="Start Year") | |
start_month = gr.Slider(1, 12, 1, label="Start Month") | |
with gr.Column(): | |
end_year = gr.Slider(1900, 2024, 2024, label="End Year") | |
end_month = gr.Slider(1, 12, 12, label="End Month") | |
enso_dropdown = gr.Dropdown( | |
choices=["all", "el_nino", "la_nina", "neutral"], | |
value="all", | |
label="ENSO Phase" | |
) | |
analyze_btn = gr.Button("Analyze") | |
tracks_plot = gr.Plot(label="Typhoon Tracks") | |
with gr.Row(): | |
wind_plot = gr.Plot(label="Wind Speed Analysis") | |
pressure_plot = gr.Plot(label="Pressure Analysis") | |
stats_text = gr.Markdown() | |
# Typhoon Animation Tab | |
with gr.Tab("Typhoon Animation"): | |
with gr.Row(): | |
animation_year = gr.Slider( | |
minimum=1950, | |
maximum=2024, | |
value=2020, | |
step=1, | |
label="Select Year" | |
) | |
with gr.Row(): | |
animation_typhoon = gr.Dropdown( | |
choices=[], | |
label="Select Typhoon", | |
interactive=True | |
) | |
standard_dropdown = gr.Dropdown( | |
choices=[ | |
{"label": "Atlantic Standard", "value": "atlantic"}, | |
{"label": "Taiwan Standard", "value": "taiwan"} | |
], | |
value="atlantic", | |
label="Classification Standard" | |
) | |
animation_btn = gr.Button("Show Typhoon Path", variant="primary") | |
animation_plot = gr.Plot(label="Typhoon Path Animation") | |
animation_info = gr.Markdown() | |
# Search Tab | |
with gr.Tab("Typhoon Search"): | |
with gr.Row(): | |
search_input = gr.Textbox(label="Search Typhoon Name") | |
search_btn = gr.Button("Search Typhoons", variant="primary") | |
search_results = gr.Plot(label="Search Results") | |
search_info = gr.Markdown() | |
# Event handlers | |
def analyze_callback(start_y, start_m, end_y, end_m, enso): | |
results = analyzer.analyze_typhoon(start_y, start_m, end_y, end_m, enso) | |
return [ | |
results['tracks'], | |
results['wind'], | |
results['pressure'], | |
results['stats'] | |
] | |
def update_typhoon_choices(year): | |
typhoons = analyzer.get_typhoons_for_year(year) | |
return gr.update(choices=typhoons, value=None) | |
# Connect events for main analysis | |
analyze_btn.click( | |
analyze_callback, | |
inputs=[start_year, start_month, end_year, end_month, enso_dropdown], | |
outputs=[tracks_plot, wind_plot, pressure_plot, stats_text] | |
) | |
# Connect events for Animation tab | |
animation_year.change( | |
update_typhoon_choices, | |
inputs=[animation_year], | |
outputs=[animation_typhoon] | |
) | |
animation_btn.click( | |
analyzer.create_typhoon_animation, | |
inputs=[animation_year, animation_typhoon, standard_dropdown], | |
outputs=[animation_plot, animation_info] | |
) | |
# Connect events for Search tab | |
search_btn.click( | |
analyzer.search_typhoons, | |
inputs=[search_input], | |
outputs=[search_results, search_info] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True | |
) |