euler314's picture
Update app.py
80709af verified
raw
history blame
24.4 kB
import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pickle
import tropycal.tracks as tracks
import pandas as pd
import numpy as np
import cachetools
import functools
import hashlib
import os
from datetime import datetime, timedelta
from datetime import date
from scipy import stats
from scipy.optimize import minimize, curve_fit
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from scipy.interpolate import interp1d
from fractions import Fraction
import statsmodels.api as sm
import time
import threading
import requests
from io import StringIO
import tempfile
import csv
from collections import defaultdict
import shutil
import filecmp
import warnings
warnings.filterwarnings('ignore')
# Constants
DATA_PATH = os.getcwd()
ONI_DATA_PATH = os.path.join(DATA_PATH, 'oni_data.csv')
TYPHOON_DATA_PATH = os.path.join(DATA_PATH, 'processed_typhoon_data.csv')
LOCAL_iBtrace_PATH = os.path.join(DATA_PATH, 'ibtracs.WP.list.v04r00.csv')
iBtrace_uri = 'https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r00/access/csv/ibtracs.WP.list.v04r00.csv'
CACHE_FILE = 'ibtracs_cache.pkl'
CACHE_EXPIRY_DAYS = 1
# Color mappings
COLOR_MAP = {
'C5 Super Typhoon': 'rgb(255, 0, 0)',
'C4 Very Strong Typhoon': 'rgb(255, 63, 0)',
'C3 Strong Typhoon': 'rgb(255, 127, 0)',
'C2 Typhoon': 'rgb(255, 191, 0)',
'C1 Typhoon': 'rgb(255, 255, 0)',
'Tropical Storm': 'rgb(0, 255, 255)',
'Tropical Depression': 'rgb(173, 216, 230)'
}
class TyphoonAnalyzer:
def __init__(self):
self.last_oni_update = None
self.ensure_data_files_exist()
self.load_initial_data()
def ensure_data_files_exist(self):
"""Ensure all required data files exist before loading"""
print("Checking and downloading required data files...")
# Create data directory if it doesn't exist
os.makedirs(DATA_PATH, exist_ok=True)
# Download ONI data if it doesn't exist
if not os.path.exists(ONI_DATA_PATH):
print("Downloading ONI data...")
url = "https://www.cpc.ncep.noaa.gov/data/indices/oni.ascii.txt"
temp_file = os.path.join(DATA_PATH, "temp_oni.ascii.txt")
try:
response = requests.get(url)
response.raise_for_status()
with open(temp_file, 'wb') as f:
f.write(response.content)
self.convert_oni_ascii_to_csv(temp_file, ONI_DATA_PATH)
print("ONI data downloaded and converted successfully")
except Exception as e:
print(f"Error downloading ONI data: {e}")
raise
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
# Download IBTrACS data if it doesn't exist
if not os.path.exists(LOCAL_iBtrace_PATH):
print("Downloading IBTrACS data...")
try:
response = requests.get(iBtrace_uri)
response.raise_for_status()
with open(LOCAL_iBtrace_PATH, 'w') as f:
f.write(response.text)
print("IBTrACS data downloaded successfully")
except Exception as e:
print(f"Error downloading IBTrACS data: {e}")
raise
# Create processed typhoon data if it doesn't exist
if not os.path.exists(TYPHOON_DATA_PATH):
print("Processing typhoon data...")
try:
self.convert_typhoondata(LOCAL_iBtrace_PATH, TYPHOON_DATA_PATH)
print("Typhoon data processed successfully")
except Exception as e:
print(f"Error processing typhoon data: {e}")
raise
print("All required data files are ready")
def load_initial_data(self):
print("Loading initial data...")
self.update_oni_data()
self.oni_df = self.fetch_oni_data_from_csv()
self.ibtracs = self.load_ibtracs_data()
self.update_typhoon_data()
self.oni_data, self.typhoon_data = self.load_data()
self.oni_long = self.process_oni_data(self.oni_data)
self.typhoon_max = self.process_typhoon_data(self.typhoon_data)
self.merged_data = self.merge_data()
print("Initial data loading complete")
def fetch_oni_data_from_csv(self):
"""Load ONI data from CSV"""
df = pd.read_csv(ONI_DATA_PATH)
df = df.melt(id_vars=['Year'], var_name='Month', value_name='ONI')
# Convert month numbers to month names
month_map = {
'01': 'Jan', '02': 'Feb', '03': 'Mar', '04': 'Apr',
'05': 'May', '06': 'Jun', '07': 'Jul', '08': 'Aug',
'09': 'Sep', '10': 'Oct', '11': 'Nov', '12': 'Dec'
}
df['Month'] = df['Month'].map(month_map)
# Now create the date
df['Date'] = pd.to_datetime(df['Year'].astype(str) + df['Month'], format='%Y%b')
return df.set_index('Date')
def should_update_oni(self):
today = datetime.now()
return (today.day == 1 or today.day == 15 or
today.day == (today.replace(day=1, month=today.month%12+1) - timedelta(days=1)).day)
def convert_typhoondata(self, input_file, output_file):
"""Convert IBTrACS data to processed format"""
print(f"Converting typhoon data from {input_file} to {output_file}")
with open(input_file, 'r') as infile:
# Skip the header lines
next(infile)
next(infile)
reader = csv.reader(infile)
sid_data = defaultdict(list)
for row in reader:
if not row: # Skip blank lines
continue
sid = row[0]
iso_time = row[6]
sid_data[sid].append((row, iso_time))
with open(output_file, 'w', newline='') as outfile:
fieldnames = ['SID', 'ISO_TIME', 'LAT', 'LON', 'SEASON', 'NAME',
'WMO_WIND', 'WMO_PRES', 'USA_WIND', 'USA_PRES',
'START_DATE', 'END_DATE']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for sid, data in sid_data.items():
start_date = min(data, key=lambda x: x[1])[1]
end_date = max(data, key=lambda x: x[1])[1]
for row, iso_time in data:
writer.writerow({
'SID': row[0],
'ISO_TIME': iso_time,
'LAT': row[8],
'LON': row[9],
'SEASON': row[1],
'NAME': row[5],
'WMO_WIND': row[10].strip() or ' ',
'WMO_PRES': row[11].strip() or ' ',
'USA_WIND': row[23].strip() or ' ',
'USA_PRES': row[24].strip() or ' ',
'START_DATE': start_date,
'END_DATE': end_date
})
def update_oni_data(self):
if not self.should_update_oni():
return
url = "https://www.cpc.ncep.noaa.gov/data/indices/oni.ascii.txt"
temp_file = os.path.join(DATA_PATH, "temp_oni.ascii.txt")
try:
response = requests.get(url)
response.raise_for_status()
with open(temp_file, 'wb') as f:
f.write(response.content)
self.convert_oni_ascii_to_csv(temp_file, ONI_DATA_PATH)
self.last_oni_update = date.today()
except Exception as e:
print(f"Error updating ONI data: {e}")
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
def convert_oni_ascii_to_csv(self, input_file, output_file):
data = defaultdict(lambda: [''] * 12)
season_to_month = {
'DJF': 12, 'JFM': 1, 'FMA': 2, 'MAM': 3, 'AMJ': 4, 'MJJ': 5,
'JJA': 6, 'JAS': 7, 'ASO': 8, 'SON': 9, 'OND': 10, 'NDJ': 11
}
with open(input_file, 'r') as f:
next(f) # Skip header
for line in f:
parts = line.split()
if len(parts) >= 4:
season, year, anom = parts[0], parts[1], parts[-1]
if season in season_to_month:
month = season_to_month[season]
if season == 'DJF':
year = str(int(year) - 1)
data[year][month-1] = anom
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Year'] + [f"{m:02d}" for m in range(1, 13)])
for year in sorted(data.keys()):
writer.writerow([year] + data[year])
def load_ibtracs_data(self):
if os.path.exists(CACHE_FILE):
cache_time = datetime.fromtimestamp(os.path.getmtime(CACHE_FILE))
if datetime.now() - cache_time < timedelta(days=CACHE_EXPIRY_DAYS):
with open(CACHE_FILE, 'rb') as f:
return pickle.load(f)
if os.path.exists(LOCAL_iBtrace_PATH):
ibtracs = tracks.TrackDataset(basin='west_pacific', source='ibtracs',
ibtracs_url=LOCAL_iBtrace_PATH)
else:
response = requests.get(iBtrace_uri)
response.raise_for_status()
with open(LOCAL_iBtrace_PATH, 'w') as f:
f.write(response.text)
ibtracs = tracks.TrackDataset(basin='west_pacific', source='ibtracs',
ibtracs_url=LOCAL_iBtrace_PATH)
with open(CACHE_FILE, 'wb') as f:
pickle.dump(ibtracs, f)
return ibtracs
def update_typhoon_data(self):
try:
response = requests.head(iBtrace_uri)
remote_modified = datetime.strptime(response.headers['Last-Modified'],
'%a, %d %b %Y %H:%M:%S GMT')
local_modified = (datetime.fromtimestamp(os.path.getmtime(LOCAL_iBtrace_PATH))
if os.path.exists(LOCAL_iBtrace_PATH) else datetime.min)
if remote_modified > local_modified:
response = requests.get(iBtrace_uri)
response.raise_for_status()
with open(LOCAL_iBtrace_PATH, 'w') as f:
f.write(response.text)
except Exception as e:
print(f"Error updating typhoon data: {e}")
def load_data(self):
oni_data = pd.read_csv(ONI_DATA_PATH)
typhoon_data = pd.read_csv(TYPHOON_DATA_PATH, low_memory=False)
typhoon_data['ISO_TIME'] = pd.to_datetime(typhoon_data['ISO_TIME'])
return oni_data, typhoon_data
def process_oni_data(self, oni_data):
"""Process ONI data"""
oni_long = pd.melt(oni_data, id_vars=['Year'], var_name='Month', value_name='ONI')
# Create a mapping for month numbers
month_map = {
'01': 1, '02': 2, '03': 3, '04': 4,
'05': 5, '06': 6, '07': 7, '08': 8,
'09': 9, '10': 10, '11': 11, '12': 12
}
# Convert month strings to numbers directly
oni_long['Month'] = oni_long['Month'].map(month_map)
return oni_long
def process_typhoon_data(self, typhoon_data):
typhoon_data['USA_WIND'] = pd.to_numeric(typhoon_data['USA_WIND'], errors='coerce')
typhoon_data['WMO_PRES'] = pd.to_numeric(typhoon_data['WMO_PRES'], errors='coerce')
typhoon_data['ISO_TIME'] = pd.to_datetime(typhoon_data['ISO_TIME'])
typhoon_data['Year'] = typhoon_data['ISO_TIME'].dt.year
typhoon_data['Month'] = typhoon_data['ISO_TIME'].dt.month
typhoon_max = typhoon_data.groupby(['SID', 'Year', 'Month']).agg({
'USA_WIND': 'max',
'WMO_PRES': 'min',
'NAME': 'first',
'LAT': 'first',
'LON': 'first',
'ISO_TIME': 'first'
}).reset_index()
typhoon_max['Category'] = typhoon_max['USA_WIND'].apply(self.categorize_typhoon)
return typhoon_max
def merge_data(self):
return pd.merge(self.typhoon_max, self.oni_long, on=['Year', 'Month'])
def categorize_typhoon(self, wind_speed):
if wind_speed >= 137:
return 'C5 Super Typhoon'
elif wind_speed >= 113:
return 'C4 Very Strong Typhoon'
elif wind_speed >= 96:
return 'C3 Strong Typhoon'
elif wind_speed >= 83:
return 'C2 Typhoon'
elif wind_speed >= 64:
return 'C1 Typhoon'
elif wind_speed >= 34:
return 'Tropical Storm'
else:
return 'Tropical Depression'
def analyze_typhoon(self, start_year, start_month, end_year, end_month, enso_value='all'):
start_date = datetime(start_year, start_month, 1)
end_date = datetime(end_year, end_month, 28)
filtered_data = self.merged_data[
(self.merged_data['ISO_TIME'] >= start_date) &
(self.merged_data['ISO_TIME'] <= end_date)
]
if enso_value != 'all':
filtered_data = filtered_data[
(filtered_data['ONI'] >= 0.5 if enso_value == 'el_nino' else
filtered_data['ONI'] <= -0.5 if enso_value == 'la_nina' else
(filtered_data['ONI'] > -0.5) & (filtered_data['ONI'] < 0.5))
]
return {
'tracks': self.create_tracks_plot(filtered_data),
'wind': self.create_wind_analysis(filtered_data),
'pressure': self.create_pressure_analysis(filtered_data),
'clusters': self.create_cluster_analysis(filtered_data, 5),
'stats': self.generate_statistics(filtered_data)
}
def create_tracks_plot(self, data):
fig = go.Figure()
for _, storm in data.groupby('SID'):
fig.add_trace(go.Scattergeo(
lon=storm['LON'],
lat=storm['LAT'],
mode='lines',
name=storm['NAME'].iloc[0],
line=dict(
width=2,
color=COLOR_MAP[storm['Category'].iloc[0]]
),
hovertemplate=(
f"Name: {storm['NAME'].iloc[0]}<br>"
f"Category: {storm['Category'].iloc[0]}<br>"
f"Wind Speed: {storm['USA_WIND'].iloc[0]:.1f} kt<br>"
f"Pressure: {storm['WMO_PRES'].iloc[0]:.1f} hPa<br>"
f"Date: {storm['ISO_TIME'].iloc[0]:%Y-%m-%d}"
)
))
fig.update_layout(
title='Typhoon Tracks',
showlegend=True,
geo=dict(
projection_type='mercator',
showland=True,
showcoastlines=True,
landcolor='rgb(243, 243, 243)',
countrycolor='rgb(204, 204, 204)',
coastlinecolor='rgb(214, 214, 214)',
lataxis=dict(range=[0, 50]),
lonaxis=dict(range=[100, 180]),
)
)
return fig
def create_wind_analysis(self, data):
fig = px.scatter(data,
x='ONI',
y='USA_WIND',
color='Category',
color_discrete_map=COLOR_MAP,
title='Wind Speed vs ONI Index',
labels={
'ONI': 'Oceanic Niño Index',
'USA_WIND': 'Maximum Wind Speed (kt)'
},
hover_data=['NAME', 'ISO_TIME']
)
# Add regression line
x = data['ONI']
y = data['USA_WIND']
slope, intercept = np.polyfit(x, y, 1)
fig.add_trace(
go.Scatter(
x=x,
y=slope * x + intercept,
mode='lines',
name=f'Regression (slope={slope:.2f})',
line=dict(color='black', dash='dash')
)
)
return fig
def create_pressure_analysis(self, data):
fig = px.scatter(data,
x='ONI',
y='WMO_PRES',
color='Category',
color_discrete_map=COLOR_MAP,
title='Pressure vs ONI Index',
labels={
'ONI': 'Oceanic Niño Index',
'WMO_PRES': 'Minimum Pressure (hPa)'
},
hover_data=['NAME', 'ISO_TIME']
)
# Add regression line
x = data['ONI']
y = data['WMO_PRES']
slope, intercept = np.polyfit(x, y, 1)
fig.add_trace(
go.Scatter(
x=x,
y=slope * x + intercept,
mode='lines',
name=f'Regression (slope={slope:.2f})',
line=dict(color='black', dash='dash')
)
)
return fig
def create_cluster_analysis(self, data, n_clusters=5):
# Prepare data for clustering
routes = []
for _, storm in data.groupby('SID'):
if len(storm) > 1:
# Standardize route length
t = np.linspace(0, 1, len(storm))
t_new = np.linspace(0, 1, 100)
lon_interp = interp1d(t, storm['LON'], kind='linear')(t_new)
lat_interp = interp1d(t, storm['LAT'], kind='linear')(t_new)
routes.append(np.column_stack((lon_interp, lat_interp)))
if not routes:
return go.Figure()
# Perform clustering
routes_array = np.array(routes)
routes_reshaped = routes_array.reshape(routes_array.shape[0], -1)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(routes_reshaped)
# Create visualization
fig = go.Figure()
# Plot original routes colored by cluster
for route, cluster_id in zip(routes, clusters):
fig.add_trace(go.Scattergeo(
lon=route[:, 0],
lat=route[:, 1],
mode='lines',
line=dict(width=1, color=f'hsl({cluster_id * 360/n_clusters}, 50%, 50%)'),
showlegend=False
))
# Plot cluster centers
for i in range(n_clusters):
center = kmeans.cluster_centers_[i].reshape(-1, 2)
fig.add_trace(go.Scattergeo(
lon=center[:, 0],
lat=center[:, 1],
mode='lines',
name=f'Cluster {i+1} Center',
line=dict(width=3, color=f'hsl({i * 360/n_clusters}, 100%, 50%)')
))
fig.update_layout(
title='Typhoon Route Clusters',
showlegend=True,
geo=dict(
projection_type='mercator',
showland=True,
showcoastlines=True,
landcolor='rgb(243, 243, 243)',
countrycolor='rgb(204, 204, 204)',
coastlinecolor='rgb(214, 214, 214)',
lataxis=dict(range=[0, 50]),
lonaxis=dict(range=[100, 180]),
)
)
return fig
def generate_statistics(self, data):
stats = {
'total_typhoons': len(data['SID'].unique()),
'avg_wind': data['USA_WIND'].mean(),
'max_wind': data['USA_WIND'].max(),
'avg_pressure': data['WMO_PRES'].mean(),
'min_pressure': data['WMO_PRES'].min(),
'oni_correlation_wind': data['ONI'].corr(data['USA_WIND']),
'oni_correlation_pressure': data['ONI'].corr(data['WMO_PRES']),
'category_counts': data['Category'].value_counts().to_dict()
}
return f"""
### Statistical Summary
- Total Typhoons: {stats['total_typhoons']}
- Average Wind Speed: {stats['avg_wind']:.2f} kt
- Maximum Wind Speed: {stats['max_wind']:.2f} kt
- Average Pressure: {stats['avg_pressure']:.2f} hPa
- Minimum Pressure: {stats['min_pressure']:.2f} hPa
- ONI-Wind Speed Correlation: {stats['oni_correlation_wind']:.3f}
- ONI-Pressure Correlation: {stats['oni_correlation_pressure']:.3f}
### Category Distribution
{chr(10).join(f'- {cat}: {count}' for cat, count in stats['category_counts'].items())}
"""
def create_interface():
analyzer = TyphoonAnalyzer()
with gr.Blocks(title="Typhoon Analysis Dashboard", theme=gr.themes.Base()) as demo:
gr.Markdown("# Typhoon Analysis Dashboard")
with gr.Tabs():
# Main Analysis Tab
with gr.Tab("Main Analysis"):
with gr.Row():
with gr.Column():
start_year = gr.Slider(1900, 2024, 2000, label="Start Year")
start_month = gr.Slider(1, 12, 1, label="Start Month")
with gr.Column():
end_year = gr.Slider(1900, 2024, 2024, label="End Year")
end_month = gr.Slider(1, 12, 12, label="End Month")
enso_dropdown = gr.Dropdown(
choices=["all", "el_nino", "la_nina", "neutral"],
value="all",
label="ENSO Phase"
)
analyze_btn = gr.Button("Analyze")
plots_tabs = gr.Tabs()
with plots_tabs:
with gr.Tab("Tracks"):
tracks_plot = gr.Plot()
with gr.Tab("Wind Analysis"):
wind_plot = gr.Plot()
with gr.Tab("Pressure Analysis"):
pressure_plot = gr.Plot()
with gr.Tab("Clusters"):
cluster_plot = gr.Plot()
stats_text = gr.Markdown()
# Search Tab
with gr.Tab("Typhoon Search"):
with gr.Row():
search_input = gr.Textbox(label="Search Typhoon Name")
search_btn = gr.Button("Search")
search_results = gr.Plot()
typhoon_info = gr.Markdown()
def analyze_callback(start_y, start_m, end_y, end_m, enso):
results = analyzer.analyze_typhoon(start_y, start_m, end_y, end_m, enso)
return [
results['tracks'],
results['wind'],
results['pressure'],
results['clusters'],
results['stats']
]
analyze_btn.click(
analyze_callback,
inputs=[start_year, start_month, end_year, end_month, enso_dropdown],
outputs=[tracks_plot, wind_plot, pressure_plot, cluster_plot, stats_text]
)
def search_callback(query):
if not query:
return None, "Please enter a typhoon name to search."
matches = analyzer.merged_data[
analyzer.merged_data['NAME'].str.contains(query, case=False, na=False)
]
if matches.empty:
return None, "No typhoons found matching your search."
fig = analyzer.create_tracks_plot(matches)
info = f"### Found {len(matches['SID'].unique())} matching typhoons:\n\n"
for _, storm in matches.groupby('SID'):
info += (f"- {storm['NAME'].iloc[0]} ({storm['ISO_TIME'].iloc[0]:%Y-%m-%d})\n"
f" - Category: {storm['Category'].iloc[0]}\n"
f" - Max Wind: {storm['USA_WIND'].iloc[0]:.1f} kt\n"
f" - Min Pressure: {storm['WMO_PRES'].iloc[0]:.1f} hPa\n")
return fig, info
search_btn.click(
search_callback,
inputs=[search_input],
outputs=[search_results, typhoon_info]
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)