#!/usr/bin/env python3 """ ECMWF Open Data Weather Forecast Application Access real ECMWF operational forecast data with coordinate-based lookups. """ import gradio as gr import numpy as np import pandas as pd import matplotlib.pyplot as plt import xarray as xr import requests import tempfile import os import time from datetime import datetime, timedelta import warnings import folium import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots warnings.filterwarnings('ignore') try: from ecmwf.opendata import Client as OpenDataClient OPENDATA_AVAILABLE = True except ImportError: OPENDATA_AVAILABLE = False class ECMWFDataManager: def __init__(self): self.temp_dir = tempfile.mkdtemp() self.client = None if OPENDATA_AVAILABLE: try: self.client = OpenDataClient() except: self.client = None # AWS S3 direct access URLs self.aws_base_url = "https://ecmwf-forecasts.s3.eu-central-1.amazonaws.com" # ECMWF Open Data parameters - verified available as of 2024/2025 self.parameters = { # Surface level parameters (single level) "2t": {"name": "Temperature (2m)", "units": "°C", "description": "2-meter temperature", "level_type": "sfc"}, "msl": {"name": "Sea Level Pressure", "units": "hPa", "description": "Mean sea level pressure", "level_type": "sfc"}, "sp": {"name": "Surface Pressure", "units": "hPa", "description": "Surface pressure", "level_type": "sfc"}, "10u": {"name": "Wind U (10m)", "units": "m/s", "description": "10-meter U wind component", "level_type": "sfc"}, "10v": {"name": "Wind V (10m)", "units": "m/s", "description": "10-meter V wind component", "level_type": "sfc"}, "tp": {"name": "Precipitation", "units": "mm", "description": "Total precipitation", "level_type": "sfc"}, "tcwv": {"name": "Water Vapor", "units": "kg/m²", "description": "Total column water vapor", "level_type": "sfc"}, "skt": {"name": "Skin Temperature", "units": "°C", "description": "Skin temperature", "level_type": "sfc"}, "ro": {"name": "Runoff", "units": "m", "description": "Runoff", "level_type": "sfc"}, "st": {"name": "Soil Temperature", "units": "°C", "description": "Soil temperature", "level_type": "sfc"}, # Severe weather and convective parameters "cape": {"name": "CAPE", "units": "J/kg", "description": "Convective Available Potential Energy", "level_type": "sfc"}, "cin": {"name": "CIN", "units": "J/kg", "description": "Convective Inhibition", "level_type": "sfc"}, "lftx": {"name": "Lifted Index", "units": "K", "description": "Surface Lifted Index", "level_type": "sfc"}, "4lftx": {"name": "Best Lifted Index", "units": "K", "description": "Best (4-layer) Lifted Index", "level_type": "sfc"}, "cp": {"name": "Convective Precipitation", "units": "mm", "description": "Convective precipitation", "level_type": "sfc"}, "lsp": {"name": "Large Scale Precipitation", "units": "mm", "description": "Large-scale precipitation", "level_type": "sfc"}, "sf": {"name": "Snowfall", "units": "m", "description": "Snowfall", "level_type": "sfc"}, "10fg": {"name": "Wind Gust (10m)", "units": "m/s", "description": "10-meter wind gust", "level_type": "sfc"}, # Pressure level parameters (add common levels) "t": {"name": "Temperature", "units": "°C", "description": "Temperature at pressure levels", "level_type": "pl", "levels": [850, 500, 200]}, "gh": {"name": "Geopotential Height", "units": "m", "description": "Geopotential height", "level_type": "pl", "levels": [850, 500, 200]}, "u": {"name": "Wind U", "units": "m/s", "description": "U wind component", "level_type": "pl", "levels": [850, 500, 200]}, "v": {"name": "Wind V", "units": "m/s", "description": "V wind component", "level_type": "pl", "levels": [850, 500, 200]}, "q": {"name": "Specific Humidity", "units": "g/kg", "description": "Specific humidity", "level_type": "pl", "levels": [850, 500]}, "r": {"name": "Relative Humidity", "units": "%", "description": "Relative humidity", "level_type": "pl", "levels": [850, 500]}, } self.forecast_cache = {} self.preloaded_data = {} def get_latest_forecast_info(self, max_retries=3): """Get the most recent available forecast run with retry logic""" now = datetime.utcnow() # Check recent 6-hour cycles for hours_back in range(4, 24, 6): test_time = now - timedelta(hours=hours_back) run_hour = (test_time.hour // 6) * 6 run_time = test_time.replace(hour=run_hour, minute=0, second=0, microsecond=0) date_str = run_time.strftime("%Y%m%d") time_str = f"{run_hour:02d}" # Test availability with retry logic test_url = f"{self.aws_base_url}/{date_str}/{time_str}z/0p25/oper/" for attempt in range(max_retries): try: response = requests.head(test_url, timeout=10) if response.status_code == 429: if attempt < max_retries - 1: print(f"Rate limit hit while checking forecast availability (attempt {attempt + 1}/{max_retries}). Waiting 10 seconds...") time.sleep(10) continue else: print(f"Max retries reached while checking forecast availability due to rate limiting") break elif response.status_code in [200, 403]: return date_str, time_str, run_time else: break except Exception: if attempt < max_retries - 1: time.sleep(2) # Short wait for general errors continue else: break # Fallback return now.strftime("%Y%m%d"), "12", now def download_forecast_data(self, parameter="2t", step=0, level=None, max_retries=3): """Download ECMWF forecast data using multiple methods with retry logic for rate limiting""" date_str, time_str, run_time = self.get_latest_forecast_info() # Get parameter info param_info = self.parameters.get(parameter, {}) level_type = param_info.get('level_type', 'sfc') # Method 1: Official client with retry logic if OPENDATA_AVAILABLE and self.client: for attempt in range(max_retries): try: cache_suffix = f"_{level}" if level else "" filename = os.path.join(self.temp_dir, f'ecmwf_{parameter}{cache_suffix}_{step}h.grib') # Build retrieval request request = { "type": "fc", "param": parameter, "step": step, "target": filename } # Add pressure level if needed if level_type == 'pl' and level: request["levelist"] = level elif level_type == 'pl': # Use first available level if no specific level requested levels = param_info.get('levels', [850]) request["levelist"] = levels[0] self.client.retrieve(**request) if os.path.exists(filename) and os.path.getsize(filename) > 1000: level_info = f" at {level}hPa" if level else "" return filename, f"Downloaded {parameter}{level_info} +{step}h via ECMWF client" except Exception as e: error_msg = str(e).lower() if "429" in error_msg or "rate limit" in error_msg or "too many requests" in error_msg: if attempt < max_retries - 1: # Don't wait on last attempt print(f"Rate limit hit for {parameter} (attempt {attempt + 1}/{max_retries}). Waiting 10 seconds...") time.sleep(10) continue else: print(f"Max retries reached for {parameter} due to rate limiting") else: print(f"Client method failed for {parameter}: {e}") break # Method 2: AWS S3 direct access (for surface parameters only) with retry logic if level_type == 'sfc': for attempt in range(max_retries): try: step_str = f"{step:03d}" filename = f"{date_str}{time_str}0000-{step_str}h-oper-fc.grib2" url = f"{self.aws_base_url}/{date_str}/{time_str}z/0p25/oper/{filename}" response = requests.get(url, timeout=120, stream=True) if response.status_code == 429: if attempt < max_retries - 1: # Don't wait on last attempt print(f"Rate limit hit for {parameter} AWS download (attempt {attempt + 1}/{max_retries}). Waiting 10 seconds...") time.sleep(10) continue else: print(f"Max retries reached for {parameter} AWS download due to rate limiting") break elif response.status_code == 200: local_file = os.path.join(self.temp_dir, f'ecmwf_{parameter}_{step}h.grib2') with open(local_file, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) if os.path.getsize(local_file) > 1000: return local_file, f"Downloaded {parameter} +{step}h via AWS S3" else: print(f"AWS returned status {response.status_code} for {parameter}") break except Exception as e: print(f"AWS method failed for {parameter}: {e}") break return None, f"Failed to download {parameter} at +{step}h" def extract_point_data(self, filename, lat, lon, parameter): """Extract weather data at specific coordinates""" try: ds = xr.open_dataset(filename, engine='cfgrib', backend_kwargs={'indexpath': ''}) data_vars = list(ds.data_vars.keys()) if not data_vars: return None data = ds[data_vars[0]] # Handle coordinates if 'latitude' in ds.coords: lats, lons = ds.latitude, ds.longitude elif 'lat' in ds.coords: lats, lons = ds.lat, ds.longitude else: return None # Select first time if multiple if 'time' in data.dims and len(data.time) > 1: data = data.isel(time=0) elif 'valid_time' in data.dims: data = data.isel(valid_time=0) # Find nearest point try: point_data = data.sel(latitude=lat, longitude=lon, method='nearest') except: try: point_data = data.sel(lat=lat, lon=lon, method='nearest') except: return None value = float(point_data.values) ds.close() return self.convert_units(value, parameter) except Exception as e: print(f"Error extracting point data: {e}") return None def convert_units(self, value, parameter): """Convert values to standard meteorological units""" if parameter in ['2t', 'skt', 't', 'st'] and value > 100: return value - 273.15 # K to °C elif parameter in ['msl', 'sp']: return value / 100 # Pa to hPa elif parameter == 'tp': return value * 1000 # m to mm elif parameter == 'q': return value * 1000 # kg/kg to g/kg elif parameter == 'ro': return value * 1000 # m to mm elif parameter == 'gh': return value / 9.80665 # m²/s² to meters (geopotential to height) return value def preload_all_data(self): """Preload all forecast data for quick access""" # 3-hourly for first 24 hours (ECMWF operational availability), then longer intervals forecast_steps = [0, 3, 6, 9, 12, 15, 18, 21, 24, 30, 36, 42, 48, 60, 72, 96, 120] # Include basic surface parameters and severe weather indicators surface_params = ["2t", "msl", "sp", "10u", "10v", "tp", "tcwv", "skt", "cape", "10fg", "cp", "lsp"] total_files = len(surface_params) * len(forecast_steps) loaded_count = 0 for param in surface_params: for step in forecast_steps: try: cache_key = f"{param}_{step}" filename, msg = self.download_forecast_data(param, step) if filename: self.preloaded_data[cache_key] = filename loaded_count += 1 except Exception as e: print(f"Failed to preload {param} at step {step}: {e}") continue return loaded_count, total_files def get_point_forecast(self, latitude, longitude): """Get comprehensive forecast for a specific location""" forecast_data = [] # 3-hourly for first 24 hours (ECMWF operational availability), then longer intervals forecast_steps = [0, 3, 6, 9, 12, 15, 18, 21, 24, 30, 36, 42, 48, 60, 72, 96, 120] # Focus on surface parameters including severe weather indicators surface_params = ["2t", "msl", "sp", "10u", "10v", "tp", "tcwv", "skt", "cape", "10fg", "cp", "lsp"] for param in surface_params: if param not in self.parameters: continue param_data = [] for step in forecast_steps: try: cache_key = f"{param}_{step}" # Try to use preloaded data first if cache_key in self.preloaded_data: filename = self.preloaded_data[cache_key] else: filename, _ = self.download_forecast_data(param, step) if filename and os.path.exists(filename): value = self.extract_point_data(filename, latitude, longitude, param) if value is not None: param_data.append({ 'step': step, 'value': value, 'datetime': datetime.utcnow() + timedelta(hours=step) }) except Exception as e: print(f"Error getting {param} at step {step}: {e}") continue if param_data: forecast_data.append({ 'parameter': param, 'name': self.parameters[param]['name'], 'units': self.parameters[param]['units'], 'data': param_data }) return forecast_data class WeatherNarrativeGenerator: """Generate natural language weather descriptions""" def __init__(self): pass def get_temperature_descriptor(self, temp_f): """Convert temperature to descriptive terms (input in Fahrenheit)""" if temp_f < 14: return "extremely cold" elif temp_f < 32: return "very cold" elif temp_f < 50: return "cold" elif temp_f < 68: return "cool" elif temp_f < 77: return "mild" elif temp_f < 86: return "warm" elif temp_f < 95: return "hot" else: return "very hot" def get_wind_descriptor(self, wind_speed_ms): """Convert wind speed to descriptive terms""" wind_speed_mph = wind_speed_ms * 2.237 # Convert m/s to mph if wind_speed_mph < 1: return "calm" elif wind_speed_mph < 8: return "light winds" elif wind_speed_mph < 18: return "moderate winds" elif wind_speed_mph < 25: return "strong winds" elif wind_speed_mph < 39: return "very strong winds" else: return "extremely strong winds" def get_precipitation_descriptor(self, precip_mm): """Convert precipitation to descriptive terms""" if precip_mm < 0.1: return None elif precip_mm < 1: return "light showers" elif precip_mm < 5: return "moderate rain" elif precip_mm < 10: return "heavy rain" else: return "very heavy rain" def get_sky_condition(self, cloud_cover_percent=None, precip_mm=0): """Determine sky conditions based on available data""" if precip_mm > 0.1: if precip_mm < 1: return "mostly cloudy with light showers" elif precip_mm < 5: return "overcast with rain" else: return "stormy with heavy rain" # If no cloud data available, infer from other conditions return "partly cloudy" def get_wind_direction_text(self, u_wind, v_wind): """Convert wind components to direction description""" if abs(u_wind) < 0.5 and abs(v_wind) < 0.5: return "" # Calculate wind direction (meteorological convention) import math wind_dir = (270 - math.degrees(math.atan2(v_wind, u_wind))) % 360 directions = [ "north", "northeast", "east", "southeast", "south", "southwest", "west", "northwest" ] idx = int((wind_dir + 22.5) / 45) % 8 return f"from the {directions[idx]}" def analyze_weather_trend(self, weather_data): """Analyze weather trends to create smart groupings""" trends = [] # Sort by hour sorted_hours = sorted(weather_data.keys()) current_trend = { 'start_hour': sorted_hours[0] if sorted_hours else 0, 'end_hour': sorted_hours[0] if sorted_hours else 0, 'conditions': {}, 'temp_range': [] } for hour in sorted_hours: data = weather_data[hour] # Calculate current conditions temp = data.get('temp', 0) wind_u = data.get('wind_u', 0) wind_v = data.get('wind_v', 0) wind_speed = (wind_u**2 + wind_v**2)**0.5 precip = data.get('precip', 0) # Determine if conditions are similar to current trend current_wind_speed = current_trend['conditions'].get('wind_speed', wind_speed) current_precip = current_trend['conditions'].get('precip', precip) # Check for significant changes wind_change = abs(wind_speed - current_wind_speed) > 2 # 2 m/s change precip_change = abs(precip - current_precip) > 0.5 # 0.5mm change # If conditions changed significantly, start new trend if wind_change or precip_change: if current_trend['temp_range']: # Save previous trend trends.append(current_trend.copy()) # Start new trend current_trend = { 'start_hour': hour, 'end_hour': hour, 'conditions': { 'wind_speed': wind_speed, 'wind_u': wind_u, 'wind_v': wind_v, 'precip': precip }, 'temp_range': [temp] } else: # Continue current trend current_trend['end_hour'] = hour current_trend['temp_range'].append(temp) # Update average conditions current_trend['conditions']['wind_speed'] = wind_speed current_trend['conditions']['wind_u'] = wind_u current_trend['conditions']['wind_v'] = wind_v current_trend['conditions']['precip'] = precip # Add final trend if current_trend['temp_range']: trends.append(current_trend) return trends def hour_to_time_description(self, hour): """Convert hour to natural time description""" if hour == 0: return "midnight" elif hour < 6: return "early morning" elif hour < 12: return "morning" elif hour == 12: return "noon" elif hour < 18: return "afternoon" elif hour < 21: return "evening" else: return "night" def generate_time_range_text(self, start_hour, end_hour): """Generate natural time range description""" if start_hour == end_hour: if start_hour == 0: return "around midnight" elif start_hour == 12: return "around noon" else: period = self.hour_to_time_description(start_hour) return f"in the {period}" else: start_desc = self.hour_to_time_description(start_hour) end_desc = self.hour_to_time_description(end_hour) if start_desc == end_desc: return f"throughout the {start_desc}" else: return f"from {start_desc} through {end_desc}" def get_comfort_description(self, temp_f, wind_speed_ms, precip_mm): """Generate comfort and activity descriptions""" descriptions = [] # Temperature comfort if temp_f < 32: descriptions.append("Bundle up in winter clothing and watch for icy conditions") elif temp_f < 50: descriptions.append("A warm jacket or coat will be needed for outdoor activities") elif temp_f < 68: descriptions.append("Light layers are recommended for comfortable outdoor time") elif temp_f < 80: descriptions.append("Perfect weather for outdoor activities and recreation") else: descriptions.append("Stay hydrated and seek shade during extended outdoor time") # Wind impact wind_mph = wind_speed_ms * 2.237 if wind_mph > 15: descriptions.append("Strong winds may affect driving conditions and outdoor events") elif wind_mph > 8: descriptions.append("Moderate winds will be noticeable, especially in exposed areas") # Precipitation impact if precip_mm > 5: descriptions.append("Heavy rain expected - indoor activities recommended, roads may be slick") elif precip_mm > 1: descriptions.append("Keep an umbrella handy and allow extra time for travel") elif precip_mm > 0.1: descriptions.append("Light rain possible - consider bringing a light rain jacket") return descriptions def get_pressure_description(self, pressure_hpa): """Generate atmospheric pressure description""" if pressure_hpa > 1025: return "High pressure systems typically bring stable, clear conditions" elif pressure_hpa < 1005: return "Low pressure systems often bring unsettled weather and possible storms" else: return "Pressure conditions are typical for the season" def _max_risk_level(self, current, new): """Helper function to compare risk levels""" levels = {"none": 0, "low": 1, "moderate": 2, "high": 3, "extreme": 4} if levels.get(new, 0) > levels.get(current, 0): return new return current def assess_thunderstorm_risk(self, cape, cin=None, lifted_index=None, wind_gust=None, convective_precip=None): """Assess thunderstorm potential based on atmospheric parameters""" risks = [] risk_level = "none" # CAPE analysis (Convective Available Potential Energy) if cape is not None and cape > 0: if cape > 4000: risks.append("Very high atmospheric instability with explosive thunderstorm potential") risk_level = "extreme" elif cape > 2500: risks.append("High atmospheric instability favoring severe thunderstorm development") risk_level = "high" elif cape > 1500: risks.append("Moderate atmospheric instability supporting thunderstorm development") risk_level = "moderate" elif cape > 500: risks.append("Low to moderate instability with isolated thunderstorm potential") risk_level = "low" # Lifted Index analysis (lower values = higher instability) if lifted_index is not None: if lifted_index < -6: risks.append("Extremely unstable atmosphere - severe thunderstorms likely") risk_level = self._max_risk_level(risk_level, "extreme") elif lifted_index < -3: risks.append("Very unstable conditions favoring strong thunderstorms") risk_level = self._max_risk_level(risk_level, "high") elif lifted_index < 0: risks.append("Unstable atmosphere conducive to thunderstorm development") risk_level = self._max_risk_level(risk_level, "moderate") # Wind gust analysis if wind_gust is not None: wind_gust_mph = wind_gust * 2.237 # Convert to mph if wind_gust_mph > 58: # Severe thunderstorm criteria risks.append(f"Damaging wind gusts up to {wind_gust_mph:.0f} mph possible - severe weather likely") risk_level = self._max_risk_level(risk_level, "high") elif wind_gust_mph > 39: risks.append(f"Strong wind gusts up to {wind_gust_mph:.0f} mph expected") risk_level = self._max_risk_level(risk_level, "moderate") # Convective precipitation analysis if convective_precip is not None and convective_precip > 0: if convective_precip > 25: # Heavy convective precipitation risks.append("Heavy convective rainfall with flash flood potential") risk_level = self._max_risk_level(risk_level, "high") elif convective_precip > 10: risks.append("Significant convective rainfall expected") risk_level = self._max_risk_level(risk_level, "moderate") return risks, risk_level def get_hazard_warnings(self, weather_data): """Generate weather hazard warnings based on forecast data""" warnings = [] max_risk_level = "none" for hour, data in weather_data.items(): cape = data.get('cape', 0) wind_gust = data.get('wind_gust', 0) conv_precip = data.get('conv_precip', 0) temp = data.get('temp', 0) # Thunderstorm risk assessment storm_risks, risk_level = self.assess_thunderstorm_risk( cape=cape, wind_gust=wind_gust, convective_precip=conv_precip ) if storm_risks: max_risk_level = self._max_risk_level(max_risk_level, risk_level) for risk in storm_risks[:2]: # Limit to top 2 risks per hour if risk not in warnings: warnings.append(risk) # Extreme temperature warnings if temp < -10: # Very cold conditions warning = "Extreme cold conditions - risk of frostbite and hypothermia" if warning not in warnings: warnings.append(warning) max_risk_level = self._max_risk_level(max_risk_level, "moderate") elif temp > 35: # Very hot conditions in Celsius warning = "Extreme heat conditions - risk of heat exhaustion and heat stroke" if warning not in warnings: warnings.append(warning) max_risk_level = self._max_risk_level(max_risk_level, "moderate") # High wind warnings if wind_gust > 15: # > 33 mph warning = f"High wind warning - gusts up to {wind_gust * 2.237:.0f} mph" if warning not in warnings: warnings.append(warning) max_risk_level = self._max_risk_level(max_risk_level, "moderate") return warnings, max_risk_level def get_snow_analysis(self, snowfall): """Analyze snowfall potential""" if snowfall > 0.3: # 30cm+ return "Heavy snow expected - significant travel disruption likely" elif snowfall > 0.1: # 10cm+ return "Moderate snowfall expected - travel may be impacted" elif snowfall > 0.02: # 2cm+ return "Light snow possible - minor travel impacts" return None def generate_detailed_period_description(self, trend, trend_index, total_trends): """Generate a comprehensive description for a weather period""" start_hour = trend['start_hour'] end_hour = trend['end_hour'] temp_range = trend['temp_range'] conditions = trend['conditions'] if not temp_range: return "" # Time description with more context time_desc = self.generate_time_range_text(start_hour, end_hour) # Enhanced temperature analysis avg_temp = sum(temp_range) / len(temp_range) temp_desc = self.get_temperature_descriptor(avg_temp) if len(temp_range) > 1: min_temp = min(temp_range) max_temp = max(temp_range) temp_trend = "rising" if temp_range[-1] > temp_range[0] else "falling" if temp_range[-1] < temp_range[0] else "steady" if max_temp - min_temp > 5: temp_text = f"temperatures {temp_desc}, {temp_trend} from {min_temp:.0f}°F to {max_temp:.0f}°F" else: temp_text = f"temperatures {temp_desc} around {avg_temp:.0f}°F, remaining {temp_trend}" else: temp_text = f"temperatures {temp_desc} around {avg_temp:.0f}°F" # Weather conditions analysis precip = conditions.get('precip', 0) wind_speed = conditions.get('wind_speed', 0) wind_u = conditions.get('wind_u', 0) wind_v = conditions.get('wind_v', 0) pressure = conditions.get('pressure', 1013) # Sky conditions with more detail sky_desc = self.get_sky_condition(precip_mm=precip) # Enhanced wind analysis wind_desc = self.get_wind_descriptor(wind_speed) wind_dir = self.get_wind_direction_text(wind_u, wind_v) wind_mph = wind_speed * 2.237 # Build comprehensive description description = f"{time_desc.capitalize()}, expect {sky_desc} with {temp_text}" # Add precipitation details precip_desc = self.get_precipitation_descriptor(precip) if precip_desc and "with rain" not in sky_desc and "with showers" not in sky_desc: description += f" and {precip_desc}" # Add detailed wind information if wind_speed > 2: # Lowered threshold for more wind reporting description += f". {wind_desc.capitalize()}" if wind_dir: description += f" {wind_dir}" if wind_mph > 10: description += f" at {wind_mph:.0f} mph" description += "." # Add comfort and activity guidance comfort_items = self.get_comfort_description(avg_temp, wind_speed, precip) if comfort_items: description += f" {comfort_items[0]}." if len(comfort_items) > 1: description += f" {comfort_items[1]}." return description def generate_24_hour_forecast(self, forecast_data): """Generate a comprehensive, detailed 24-hour narrative forecast""" if not forecast_data: return "Weather forecast data is not available at this time." # Convert forecast data to time-indexed format weather_by_hour = {} for param_info in forecast_data: param_name = param_info['parameter'] for data_point in param_info['data']: hour = data_point['step'] if hour <= 24: # Only use first 24 hours if hour not in weather_by_hour: weather_by_hour[hour] = {} # Map parameter names to our internal names if param_name == '2t': weather_by_hour[hour]['temp'] = data_point['value'] elif param_name == 'tp': weather_by_hour[hour]['precip'] = data_point['value'] elif param_name == '10u': weather_by_hour[hour]['wind_u'] = data_point['value'] elif param_name == '10v': weather_by_hour[hour]['wind_v'] = data_point['value'] elif param_name == 'msl': weather_by_hour[hour]['pressure'] = data_point['value'] elif param_name == 'cape': weather_by_hour[hour]['cape'] = data_point['value'] elif param_name == '10fg': weather_by_hour[hour]['wind_gust'] = data_point['value'] elif param_name == 'cp': weather_by_hour[hour]['conv_precip'] = data_point['value'] elif param_name == 'lsp': weather_by_hour[hour]['large_precip'] = data_point['value'] elif param_name == 'sf': weather_by_hour[hour]['snowfall'] = data_point['value'] if not weather_by_hour: return "Insufficient weather data to generate forecast." # Analyze trends for smart grouping trends = self.analyze_weather_trend(weather_by_hour) # Assess severe weather hazards first hazard_warnings, max_risk_level = self.get_hazard_warnings(weather_by_hour) # Generate comprehensive narrative with hazard alerts forecast_text = "📝 **24-Hour Detailed Weather Forecast**\n\n" # Add severe weather alerts if present if hazard_warnings: risk_emoji = {"extreme": "🚨", "high": "⚠️", "moderate": "⚡", "low": "🌩️"}.get(max_risk_level, "⚠️") forecast_text += f"{risk_emoji} **WEATHER HAZARDS ALERT - {max_risk_level.upper()} RISK**\n\n" for warning in hazard_warnings[:3]: # Limit to top 3 warnings forecast_text += f"• {warning}\n" forecast_text += "\n" # Enhanced overall summary all_temps = [] all_pressures = [] total_precip = 0 total_snowfall = 0 max_wind = 0 max_wind_gust = 0 max_cape = 0 for hour_data in weather_by_hour.values(): if 'temp' in hour_data: all_temps.append(hour_data['temp']) if 'precip' in hour_data: total_precip += hour_data['precip'] if 'pressure' in hour_data: all_pressures.append(hour_data['pressure']) if 'snowfall' in hour_data: total_snowfall += hour_data['snowfall'] if 'wind_gust' in hour_data: max_wind_gust = max(max_wind_gust, hour_data['wind_gust']) if 'cape' in hour_data: max_cape = max(max_cape, hour_data['cape']) # Calculate wind speed if 'wind_u' in hour_data and 'wind_v' in hour_data: wind_speed = (hour_data['wind_u']**2 + hour_data['wind_v']**2)**0.5 max_wind = max(max_wind, wind_speed) if all_temps: high_temp = max(all_temps) low_temp = min(all_temps) temp_range_desc = self.get_temperature_descriptor((high_temp + low_temp) / 2) temp_swing = high_temp - low_temp # Enhanced summary with more details forecast_text += f"**Today's Overview:** Expect a {temp_range_desc} day with temperatures ranging from {low_temp:.0f}°F to {high_temp:.0f}°F" if temp_swing > 15: forecast_text += f" - a significant {temp_swing:.0f}-degree temperature range" elif temp_swing > 10: forecast_text += f" - a moderate {temp_swing:.0f}-degree temperature variation" else: forecast_text += " with relatively stable temperatures" # Add precipitation summary if total_precip > 0.1: precip_desc = self.get_precipitation_descriptor(total_precip) if precip_desc: forecast_text += f". {precip_desc.capitalize()} is expected" if total_precip > 5: forecast_text += " - plan for wet conditions and potential travel delays" elif total_precip > 1: forecast_text += " - keep rain gear accessible" else: forecast_text += ". No significant precipitation expected" # Add wind summary if max_wind_gust > 5: wind_desc = self.get_wind_descriptor(max_wind_gust) forecast_text += f". {wind_desc.capitalize()} with gusts up to {max_wind_gust * 2.237:.0f} mph possible" elif max_wind > 5: wind_desc = self.get_wind_descriptor(max_wind) forecast_text += f". {wind_desc.capitalize()}" # Add snowfall summary if present if total_snowfall > 0.01: # > 1cm snow_desc = self.get_snow_analysis(total_snowfall) if snow_desc: forecast_text += f". {snow_desc}" # Add thunderstorm potential if significant CAPE if max_cape > 500: cape_risks, _ = self.assess_thunderstorm_risk(cape=max_cape) if cape_risks: forecast_text += f". {cape_risks[0]}" # Add pressure context if available if all_pressures: avg_pressure = sum(all_pressures) / len(all_pressures) pressure_desc = self.get_pressure_description(avg_pressure) forecast_text += f". {pressure_desc}" forecast_text += ".\n\n" # Generate detailed trend-based narrative forecast_text += "**Detailed Period Forecast:**\n\n" for i, trend in enumerate(trends): period_desc = self.generate_detailed_period_description(trend, i, len(trends)) if period_desc: forecast_text += period_desc + "\n\n" # Add comprehensive guidance section forecast_text += "**Planning Guidance:**\n\n" # Clothing recommendations if all_temps: avg_temp = sum(all_temps) / len(all_temps) if avg_temp < 32: forecast_text += "• **Clothing:** Heavy winter coat, insulated boots, gloves, and warm hat recommended.\n" elif avg_temp < 50: forecast_text += "• **Clothing:** Warm jacket or coat, long pants, and closed-toe shoes recommended.\n" elif avg_temp < 68: forecast_text += "• **Clothing:** Light jacket or sweater, comfortable layers for temperature changes.\n" elif avg_temp < 80: forecast_text += "• **Clothing:** Light, comfortable clothing perfect for most outdoor activities.\n" else: forecast_text += "• **Clothing:** Light, breathable fabrics and sun protection recommended.\n" # Activity recommendations if total_precip > 5: forecast_text += "• **Activities:** Indoor activities recommended due to heavy rain. Outdoor events should be postponed or moved indoors.\n" elif total_precip > 1: forecast_text += "• **Activities:** Outdoor activities possible with proper rain gear. Indoor backup plans advised.\n" elif max_wind > 8: forecast_text += "• **Activities:** Outdoor activities should account for windy conditions. Secure loose objects.\n" else: forecast_text += "• **Activities:** Good conditions for most outdoor activities and events.\n" # Travel considerations if total_precip > 1 or max_wind > 6: forecast_text += "• **Travel:** Allow extra time for travel, reduced visibility and wet roads possible.\n" else: forecast_text += "• **Travel:** Normal travel conditions expected.\n" forecast_text += "\n" # Add disclaimer forecast_text += "*This detailed forecast is based on ECMWF operational model data. Weather conditions can change rapidly - check for updates before making important outdoor plans.*" return forecast_text class WeatherApp: def __init__(self): self.ecmwf = ECMWFDataManager() self.narrative_generator = WeatherNarrativeGenerator() self.preload_status = {"loaded": False, "count": 0, "total": 0} def create_map(self): """Create interactive map for location selection""" try: m = folium.Map( location=[45.0, 0.0], zoom_start=2, tiles='OpenStreetMap' ) # Add click functionality m.add_child(folium.ClickForMarker(popup="Click for coordinates")) return m._repr_html_() except: return """

🗺️ World Map

Map unavailable - use coordinate inputs below

""" def preload_data(self): """Preload forecast data for faster access""" try: loaded_count, total_files = self.ecmwf.preload_all_data() self.preload_status = {"loaded": True, "count": loaded_count, "total": total_files} return f"""✅ Data Preloaded Successfully! 📊 Status: {loaded_count}/{total_files} files cached 🌍 Coverage: Global forecast data ready ⚡ Ready for instant weather lookups anywhere on Earth! Now you can click on the map or enter coordinates for instant forecasts.""" except Exception as e: return f"❌ Preload failed: {str(e)}" def get_weather_forecast(self, latitude, longitude): """Get weather forecast for specified coordinates""" try: if not (-90 <= latitude <= 90) or not (-180 <= longitude <= 180): return "Invalid coordinates", "", "", "Cannot generate forecast for invalid coordinates." forecast_data = self.ecmwf.get_point_forecast(latitude, longitude) if not forecast_data: return "No forecast data available", "", "", "No weather data available to generate narrative forecast." # Create visualization with calculated wind speed fig = go.Figure() colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6', '#1abc9c', '#e67e22', '#34495e'] color_idx = 0 # Find wind components for speed calculation wind_u_data = None wind_v_data = None for param_info in forecast_data: if param_info['parameter'] == '10u': wind_u_data = param_info['data'] elif param_info['parameter'] == '10v': wind_v_data = param_info['data'] for param_info in forecast_data: if param_info['data']: steps = [d['step'] for d in param_info['data']] values = [d['value'] for d in param_info['data']] fig.add_trace(go.Scatter( x=steps, y=values, mode='lines+markers', name=f"{param_info['name']} ({param_info['units']})", line=dict(color=colors[color_idx % len(colors)], width=2), marker=dict(size=4), connectgaps=True )) color_idx += 1 # Add calculated wind speed if both components available if wind_u_data and wind_v_data and len(wind_u_data) == len(wind_v_data): wind_speeds = [] wind_steps = [] for i, u_data in enumerate(wind_u_data): if i < len(wind_v_data): v_data = wind_v_data[i] if u_data['step'] == v_data['step']: wind_speed = np.sqrt(u_data['value']**2 + v_data['value']**2) wind_speeds.append(wind_speed) wind_steps.append(u_data['step']) if wind_speeds: fig.add_trace(go.Scatter( x=wind_steps, y=wind_speeds, mode='lines+markers', name="Wind Speed (m/s)", line=dict(color=colors[color_idx % len(colors)], width=3, dash='dash'), marker=dict(size=4), connectgaps=True )) fig.update_layout( title=f"🌍 ECMWF 3-Hourly Forecast - {latitude:.3f}°N, {longitude:.3f}°E", xaxis_title="Hours Ahead", yaxis_title="Values", height=700, hovermode='x unified', xaxis=dict( tickmode='array', tickvals=[0, 3, 6, 9, 12, 15, 18, 21, 24, 48, 72, 96, 120], ticktext=['0h', '3h', '6h', '9h', '12h', '15h', '18h', '21h', '1d', '2d', '3d', '4d', '5d'], gridcolor='lightgray', gridwidth=1 ), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), margin=dict(t=80) ) # Create enhanced data table with calculated parameters table_data = [] for param_info in forecast_data: for data_point in param_info['data']: table_data.append({ 'Parameter': param_info['name'], 'Hours': f"+{data_point['step']}h", 'Value': f"{data_point['value']:.2f} {param_info['units']}", 'Valid Time': data_point['datetime'].strftime('%Y-%m-%d %H:%M UTC') }) # Add calculated wind parameters if wind_u_data and wind_v_data: for i, u_data in enumerate(wind_u_data): if i < len(wind_v_data): v_data = wind_v_data[i] if u_data['step'] == v_data['step']: # Wind speed wind_speed = np.sqrt(u_data['value']**2 + v_data['value']**2) # Wind direction (meteorological convention) wind_dir = (270 - np.degrees(np.arctan2(v_data['value'], u_data['value']))) % 360 table_data.extend([{ 'Parameter': 'Wind Speed (calculated)', 'Hours': f"+{u_data['step']}h", 'Value': f"{wind_speed:.2f} m/s", 'Valid Time': u_data['datetime'].strftime('%Y-%m-%d %H:%M UTC') }, { 'Parameter': 'Wind Direction (calculated)', 'Hours': f"+{u_data['step']}h", 'Value': f"{wind_dir:.0f} degrees", 'Valid Time': u_data['datetime'].strftime('%Y-%m-%d %H:%M UTC') }]) df = pd.DataFrame(table_data) table_html = df.to_html(index=False, classes="table table-striped") status = f"""✅ 3-Hourly Forecast Retrieved! 📍 Location: {latitude:.4f}°N, {longitude:.4f}°E 📊 Parameters: {len(forecast_data)} weather variables ⏰ Forecast range: 3-hourly for first 24h, then extended to 120h 🔄 Data points: {len(table_data)} measurements 🕐 Resolution: 3-hour intervals for first day, then 6-hour+""" # Generate narrative forecast narrative_forecast = self.narrative_generator.generate_24_hour_forecast(forecast_data) return status, fig, table_html, narrative_forecast except Exception as e: return f"Error: {str(e)}", None, "", "Unable to generate narrative forecast due to data error." # Initialize the application weather_app = WeatherApp() # Gradio interface with gr.Blocks(title="ECMWF Weather Forecast") as app: gr.Markdown(""" # 🌍 ECMWF Global Weather Forecast ## Real-time weather data from ECMWF operational forecasts **Features:** - 🌐 Global coverage at 25km resolution - 🕐 **3-hourly forecasts for first 24 hours** - 📝 **Plain English narrative forecasts** (like weather.gov) - 🔄 Updated every 6 hours - 📊 Professional meteorological data - 🆓 No API keys required """) with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 🗺️ Interactive World Map") map_display = gr.HTML(value=weather_app.create_map()) with gr.Column(scale=1): gr.Markdown("### ⚡ Quick Setup") preload_btn = gr.Button("🚀 Preload Global Data", variant="primary", size="lg") preload_status = gr.Textbox(label="Status", lines=8, interactive=False) gr.Markdown("### 📍 Enter Coordinates") latitude = gr.Number( label="Latitude (-90 to 90)", value=40.7128, minimum=-90, maximum=90, step=0.001 ) longitude = gr.Number( label="Longitude (-180 to 180)", value=-74.0060, minimum=-180, maximum=180, step=0.001 ) get_forecast_btn = gr.Button("🌤️ Get Weather Forecast", variant="secondary", size="lg") with gr.Row(): with gr.Column(): forecast_status = gr.Textbox(label="Forecast Status", lines=6) forecast_plot = gr.Plot(label="Weather Forecast Chart") with gr.Column(): narrative_forecast = gr.Markdown(label="24-Hour Weather Forecast", value="Click 'Get Weather Forecast' to generate a narrative forecast...") forecast_table = gr.HTML(label="Detailed Forecast Data") # Event handlers preload_btn.click( weather_app.preload_data, outputs=[preload_status] ) get_forecast_btn.click( weather_app.get_weather_forecast, inputs=[latitude, longitude], outputs=[forecast_status, forecast_plot, forecast_table, narrative_forecast] ) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)