import os import sys import tempfile import logging import subprocess import shutil from datetime import datetime, timedelta import numpy as np import xarray as xr from ecmwf.opendata import Client import requests from typing import Dict, List, Optional, Tuple # Setup logging first logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedWaveDataFetcher: """ Enhanced wave data fetcher using proven GRIB processing methods from NWPS_SWAN implementation for particle-based visualization """ def __init__(self): self.client = Client("ecmwf") self.output_dir = os.getenv('OUTPUT_DIR', '/tmp/wave_data') os.makedirs(self.output_dir, exist_ok=True) # Set ECCODES environment variables to handle polar stereographic issues self._setup_eccodes_environment() def _setup_eccodes_environment(self): """Setup ECCODES environment variables to handle projection issues""" try: # Set environment variables that might help with polar stereographic processing os.environ['ECCODES_GRIB_STRICT_PARSING'] = '0' # Relaxed parsing os.environ['ECCODES_GRIB_IGNORE_GRID_DEFINITION'] = '1' # Ignore grid definition errors logger.info("Set ECCODES environment variables for relaxed parsing") except Exception as e: logger.warning(f"Could not set ECCODES environment variables: {e}") def _check_cdo_available(self): """Check if CDO (Climate Data Operators) is available""" try: result = subprocess.run(['cdo', '--version'], capture_output=True, text=True, timeout=10) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): return False def _reproject_arctic_with_cdo(self, grib_file_path): """Reproject Arctic GRIB file using CDO as alternative to wgrib2""" try: if not self._check_cdo_available(): logger.warning("CDO not available for Arctic reprojection") return None logger.info("Attempting to reproject Arctic GRIB file using CDO") # Create temporary file for reprojected data temp_reprojected = tempfile.NamedTemporaryFile(delete=False, suffix='_cdo_reprojected.grib2') temp_reprojected.close() # Use CDO to reproject to regular lat-lon grid # remapbil = bilinear interpolation to regular lat-lon grid cmd = [ 'cdo', 'remapbil,r720x360', # 0.5° resolution global grid grib_file_path, temp_reprojected.name ] logger.info(f"Running CDO command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode == 0: logger.info("Successfully reprojected Arctic GRIB file with CDO") return temp_reprojected.name else: logger.error(f"CDO failed: {result.stderr}") if os.path.exists(temp_reprojected.name): os.unlink(temp_reprojected.name) return None except Exception as e: logger.error(f"Error reprojecting with CDO: {e}") return None def fetch_noaa_wave_grib(self, forecast_hour=0): """Fetch global wave data from NOAA WW3 model""" try: logger.info(f"Fetching NOAA WW3 global wave GRIB data for forecast hour {forecast_hour}...") # NOAA GFS/WW3 wave data URL pattern base_url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod" # Try current date and previous days (in case of delayed updates) now = datetime.utcnow() dates_to_try = [ now.strftime("%Y%m%d"), (now - timedelta(days=1)).strftime("%Y%m%d"), (now - timedelta(days=2)).strftime("%Y%m%d") ] # Try different model runs (00, 06, 12, 18 UTC) to find available data model_runs = ["00", "06", "12", "18"] current_hour = now.hour # Start with the most recent available run if current_hour >= 18: preferred_runs = ["18", "12", "06", "00"] elif current_hour >= 12: preferred_runs = ["12", "06", "00", "18"] elif current_hour >= 6: preferred_runs = ["06", "00", "18", "12"] else: preferred_runs = ["00", "18", "12", "06"] # Try different dates and model runs for date_str in dates_to_try: logger.info(f"Trying date: {date_str}") for hour in preferred_runs: try: # Format forecast hour with leading zeros (f000, f001, f002, etc.) forecast_str = f"f{forecast_hour:03d}" # Download multiple regional files for global coverage successful_downloads = [] # Try different regional GRIB files available on NOAA (skip Arctic for now) regional_files = [ (f"gfswave.t{hour}z.atlocn.0p16.{forecast_str}.grib2", "Atlantic"), (f"gfswave.t{hour}z.epacif.0p16.{forecast_str}.grib2", "East_Pacific"), (f"gfswave.t{hour}z.wcoast.0p16.{forecast_str}.grib2", "West_Coast"), (f"gfswave.t{hour}z.global.0p16.{forecast_str}.grib2", "Global"), # Skip Arctic for now: (f"gfswave.t{hour}z.arctic.9km.{forecast_str}.grib2", "Arctic"), ] # Try to download each regional file for filename, region_name in regional_files: try: url = f"{base_url}/gfs.{date_str}/{hour}/wave/gridded/{filename}" logger.info(f"Attempting to download {region_name} region: {filename}") temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.grib2') response = requests.get(url, timeout=300) if response.status_code == 200: temp_file.write(response.content) temp_file.close() successful_downloads.append((temp_file.name, region_name, hour, forecast_hour)) logger.info(f"{region_name} GRIB file downloaded: {temp_file.name}") else: logger.debug(f"HTTP {response.status_code} for {region_name}") os.unlink(temp_file.name) continue except Exception as file_error: logger.debug(f"Error downloading {region_name}: {file_error}") continue # If we got at least one regional file, return the list if successful_downloads: logger.info(f"Successfully downloaded {len(successful_downloads)} regional files") return successful_downloads except Exception as run_error: logger.warning(f"Error trying {hour}Z run on {date_str}: {run_error}") continue logger.error("Failed to download NOAA data from any model run") return None except Exception as e: logger.error(f"Error in fetch_noaa_wave_grib: {e}") return None def process_grib_file(self, grib_file_path, region_name=None): """Process GRIB file and extract wave data with velocity vectors for particle animation""" try: logger.info(f"Processing GRIB file: {grib_file_path}") # Skip Arctic processing for now - focus on Atlantic/Pacific is_arctic = False # Try to open GRIB file and extract all available wave parameters try: datasets = [] # Normal processing for regional files (no Arctic) ds_height = xr.open_dataset(grib_file_path, engine='cfgrib', decode_timedelta=True) datasets.append(ds_height) # Try to get wave direction and period by opening with different filters try: ds_ocean = xr.open_dataset(grib_file_path, engine='cfgrib', filter_by_keys={'discipline': 10}, decode_timedelta=True) if ds_ocean.variables.keys() != ds_height.variables.keys(): datasets.append(ds_ocean) except: logger.info("Could not open oceanographic discipline data separately") # Combine all available variables all_vars = {} for ds in datasets: all_vars.update(ds.variables) logger.info(f"Available variables: {list(all_vars.keys())}") except Exception as e: error_msg = str(e) logger.error(f"Error opening GRIB file: {error_msg}") return None # Extract wave height data wave_height_var = None wave_heights = None for var_name in ['swh', 'HTSGW', 'htsgw']: if var_name in all_vars: wave_height_var = var_name wave_heights = all_vars[var_name].values logger.info(f"Using wave height variable: {wave_height_var}") break if wave_heights is None: # Try broader search for var_name in all_vars: if any(keyword in var_name.lower() for keyword in ['wave', 'height', 'swh']): wave_height_var = var_name wave_heights = all_vars[var_name].values logger.info(f"Found wave height variable: {wave_height_var}") break if wave_heights is None: logger.error("No wave height variables found in GRIB file") for ds in datasets: ds.close() return None # Extract wave direction data wave_directions = None wave_dir_var = None for var_name in ['dirpw', 'DIRPW', 'dp', 'wvdir', 'WVDIR', 'dir']: if var_name in all_vars: wave_dir_var = var_name wave_directions = all_vars[var_name].values logger.info(f"Found wave direction variable: {wave_dir_var}") break # Extract wave period data wave_periods = None wave_period_var = None for var_name in ['perpw', 'PERPW', 'tp', 'wvper', 'WVPER', 'per']: if var_name in all_vars: wave_period_var = var_name wave_periods = all_vars[var_name].values logger.info(f"Found wave period variable: {wave_period_var}") break # Get coordinates from the first dataset ds_main = datasets[0] lats = ds_main.latitude.values if 'latitude' in ds_main else ds_main.lat.values lons = ds_main.longitude.values if 'longitude' in ds_main else ds_main.lon.values # Log what we found if wave_directions is not None: logger.info(f"Wave directions shape: {wave_directions.shape}, range: {np.nanmin(wave_directions):.1f}-{np.nanmax(wave_directions):.1f} degrees") if wave_periods is not None: logger.info(f"Wave periods shape: {wave_periods.shape}, range: {np.nanmin(wave_periods):.1f}-{np.nanmax(wave_periods):.1f} seconds") # Extract particle data for visualization particle_points = self._extract_particle_points(lats, lons, wave_heights, wave_directions, wave_periods) # Close all datasets for ds in datasets: ds.close() return particle_points except Exception as e: logger.error(f"Error processing GRIB file: {e}") return None def _extract_particle_points(self, lats, lons, wave_heights, wave_directions=None, wave_periods=None, max_particles=2000): """Extract particle points with velocity vectors for wave animation""" try: # Create meshgrid for coordinates lon_grid, lat_grid = np.meshgrid(lons, lats) # Flatten arrays flat_lats = lat_grid.flatten() flat_lons = lon_grid.flatten() flat_waves = wave_heights.flatten() flat_dirs = None flat_periods = None if wave_directions is not None: flat_dirs = wave_directions.flatten() if wave_periods is not None: flat_periods = wave_periods.flatten() # Remove NaN values and invalid data valid_mask = (~np.isnan(flat_waves)) & (flat_waves > 0) & (flat_waves < 30) if flat_dirs is not None: valid_mask = valid_mask & ~np.isnan(flat_dirs) valid_lats = flat_lats[valid_mask] valid_lons = flat_lons[valid_mask] valid_waves = flat_waves[valid_mask] if flat_dirs is not None: valid_dirs = flat_dirs[valid_mask] else: # Generate synthetic wave directions based on location patterns valid_dirs = self._generate_synthetic_directions(valid_lats, valid_lons) if flat_periods is not None: valid_periods = flat_periods[valid_mask] else: # Generate synthetic periods based on wave height valid_periods = np.clip(4 + valid_waves * 2, 3, 15) if len(valid_waves) == 0: return [] # Sample points for particle visualization sample_size = min(max_particles, len(valid_waves)) if sample_size < len(valid_waves): sample_indices = np.random.choice(len(valid_waves), size=sample_size, replace=False) else: sample_indices = np.arange(len(valid_waves)) particle_points = [] for idx in sample_indices: lat = float(valid_lats[idx]) lon = float(valid_lons[idx]) height = float(valid_waves[idx]) direction = float(valid_dirs[idx]) period = float(valid_periods[idx]) # Calculate velocity components for particle movement # Wave direction is "coming from" in meteorological convention # Convert to mathematical convention (direction of travel) travel_direction = (direction + 180) % 360 dir_rad = np.radians(travel_direction) # Velocity magnitude based on wave height and period # Wave celerity approximation: c = g*T/(2*pi) for deep water wave_speed = 9.81 * period / (2 * np.pi) # m/s # Scale for visualization (convert to degrees per animation frame) velocity_scale = 0.001 # Adjust this for particle speed u_velocity = wave_speed * np.cos(dir_rad) * velocity_scale v_velocity = wave_speed * np.sin(dir_rad) * velocity_scale particle_points.append({ 'lat': lat, 'lon': lon, 'wave_height': height, 'wave_direction': direction, 'wave_period': period, 'u_velocity': u_velocity, # eastward component (degrees/frame) 'v_velocity': v_velocity, # northward component (degrees/frame) 'particle_size': max(1, min(8, height * 2)), # Size based on wave height 'color_intensity': min(1.0, height / 8.0), # Color intensity based on height 'region': 'Global' }) logger.info(f"Generated {len(particle_points)} particle points for visualization") return particle_points except Exception as e: logger.error(f"Error extracting particle points: {e}") return [] def _generate_synthetic_directions(self, lats, lons): """Generate realistic wave directions based on geographic patterns""" try: directions = np.zeros_like(lats) for i, (lat, lon) in enumerate(zip(lats, lons)): # Simplified wind/wave pattern generation if abs(lat) < 30: # Trade wind regions if lon < 0: # Atlantic/Americas directions[i] = np.random.normal(90, 30) # Generally eastward else: # Pacific/Asia directions[i] = np.random.normal(270, 30) # Generally westward elif abs(lat) > 60: # Polar regions directions[i] = np.random.uniform(0, 360) # More variable else: # Mid-latitudes if lat > 0: # Northern hemisphere directions[i] = np.random.normal(225, 45) # SW generally else: # Southern hemisphere directions[i] = np.random.normal(315, 45) # NW generally # Ensure direction is in [0, 360) range directions[i] = directions[i] % 360 return directions except Exception as e: logger.error(f"Error generating synthetic directions: {e}") return np.random.uniform(0, 360, len(lats)) def process_multiple_regional_files(self, regional_files): """Process multiple regional GRIB files and combine particle data""" try: logger.info(f"Processing {len(regional_files)} regional GRIB files for global particle coverage...") all_particles = [] regions_processed = [] for grib_file_path, region_name, model_run, forecast_hour in regional_files: try: logger.info(f"Processing {region_name} region: {grib_file_path}") # Process this regional file particles = self.process_grib_file(grib_file_path, region_name=region_name) if particles: # Add region info to each particle for particle in particles: particle['region'] = region_name particle['model_run'] = model_run all_particles.extend(particles) regions_processed.append(region_name) logger.info(f"Successfully processed {region_name}: {len(particles)} particles") else: logger.warning(f"Failed to process {region_name} region") # Clean up temp file if os.path.exists(grib_file_path): os.unlink(grib_file_path) except Exception as e: logger.error(f"Error processing {region_name} region: {e}") # Clean up temp file on error if os.path.exists(grib_file_path): os.unlink(grib_file_path) continue if not all_particles: logger.error("No valid particle data found in any regional file") return None logger.info(f"Combined particle data from {len(regions_processed)} regions: {regions_processed}") logger.info(f"Total particles: {len(all_particles)}") # Calculate global statistics wave_heights = [p['wave_height'] for p in all_particles if p.get('wave_height')] return { 'timestamp': datetime.utcnow().isoformat(), 'data_source': f'NOAA_MULTI_REGIONAL_GRIB ({"_".join(regions_processed)})', 'total_particles': len(all_particles), 'regions_processed': regions_processed, 'wave_statistics': { 'max_wave_height': float(max(wave_heights)) if wave_heights else None, 'min_wave_height': float(min(wave_heights)) if wave_heights else None, 'mean_wave_height': float(np.mean(wave_heights)) if wave_heights else None, 'std_wave_height': float(np.std(wave_heights)) if wave_heights else None }, 'particles': all_particles } except Exception as e: logger.error(f"Error processing multiple regional files: {e}") # Clean up any remaining temp files for grib_file_path, region_name, _, _ in regional_files: if os.path.exists(grib_file_path): os.unlink(grib_file_path) return None def fetch_global_wave_particles(self, forecast_hour=0): """Main method to fetch global wave data formatted for particle animation""" try: logger.info("Fetching wave data from NOAA WW3 model for particle animation...") # Try NOAA for wave data result = self.fetch_noaa_wave_grib(forecast_hour) if result and isinstance(result, list): # Multiple regional files downloaded regional_files = result model_run = regional_files[0][2] if regional_files else None # Process multiple regional files and combine particle_data = self.process_multiple_regional_files(regional_files) if particle_data: # Add forecast metadata particle_data['forecast_info'] = { 'forecast_hour': forecast_hour, 'model_run': model_run, 'forecast_valid_time': (datetime.utcnow() + timedelta(hours=forecast_hour)).isoformat(), 'is_current': forecast_hour == 0 } return particle_data else: logger.error("NOAA failed - generating fallback demo data for particle animation") return self._generate_demo_particle_data(forecast_hour) except Exception as e: logger.error(f"Error in fetch_global_wave_particles: {e}") return self._generate_demo_particle_data(forecast_hour) def _generate_demo_particle_data(self, forecast_hour=0): """Generate demo wave particle data for visualization when GRIB data is unavailable""" logger.info(f"Generating demo wave particle data for +{forecast_hour}h forecast...") # Create a grid of sample particles around the world with realistic patterns particles = [] # Atlantic Ocean patterns for lat in range(-40, 61, 8): for lon in range(-80, 21, 10): if lat > 60 or lat < -60: # Skip polar regions in Atlantic continue # Simulate realistic Atlantic wave patterns base_height = np.random.uniform(1.0, 3.5) if abs(lat) > 40: # Higher latitudes = bigger waves base_height += np.random.uniform(0.5, 2.0) # Atlantic wave directions (generally eastward in tropics, variable in north) if abs(lat) < 30: # Trade wind region wave_dir = np.random.normal(90, 20) # Eastward else: wave_dir = np.random.normal(225, 45) # SW in northern latitudes wave_dir = wave_dir % 360 period = np.clip(4 + base_height * 1.5, 4, 14) # Calculate velocity components travel_direction = (wave_dir + 180) % 360 dir_rad = np.radians(travel_direction) wave_speed = 9.81 * period / (2 * np.pi) velocity_scale = 0.001 u_velocity = wave_speed * np.cos(dir_rad) * velocity_scale v_velocity = wave_speed * np.sin(dir_rad) * velocity_scale particles.append({ 'lat': float(lat + np.random.uniform(-2, 2)), 'lon': float(lon + np.random.uniform(-3, 3)), 'wave_height': round(float(base_height), 2), 'wave_direction': round(float(wave_dir), 1), 'wave_period': round(float(period), 1), 'u_velocity': u_velocity, 'v_velocity': v_velocity, 'particle_size': max(1, min(6, base_height * 1.5)), 'color_intensity': min(1.0, base_height / 6.0), 'region': 'Atlantic_Demo' }) # Pacific Ocean patterns for lat in range(-50, 61, 8): for lon in range(120, 241, 12): if lat > 60 or lat < -60: # Skip polar regions continue base_height = np.random.uniform(1.2, 4.0) if abs(lat) > 35: # Storm regions base_height += np.random.uniform(0.8, 2.5) # Pacific wave patterns if abs(lat) < 25: # Tropical Pacific wave_dir = np.random.normal(270, 25) # Westward elif lat > 25: # North Pacific wave_dir = np.random.normal(315, 40) # NW else: # South Pacific wave_dir = np.random.normal(225, 40) # SW wave_dir = wave_dir % 360 period = np.clip(5 + base_height * 1.3, 5, 16) # Calculate velocity travel_direction = (wave_dir + 180) % 360 dir_rad = np.radians(travel_direction) wave_speed = 9.81 * period / (2 * np.pi) velocity_scale = 0.001 u_velocity = wave_speed * np.cos(dir_rad) * velocity_scale v_velocity = wave_speed * np.sin(dir_rad) * velocity_scale particles.append({ 'lat': float(lat + np.random.uniform(-2, 2)), 'lon': float(lon + np.random.uniform(-4, 4)), 'wave_height': round(float(base_height), 2), 'wave_direction': round(float(wave_dir), 1), 'wave_period': round(float(period), 1), 'u_velocity': u_velocity, 'v_velocity': v_velocity, 'particle_size': max(1, min(6, base_height * 1.5)), 'color_intensity': min(1.0, base_height / 6.0), 'region': 'Pacific_Demo' }) # Indian Ocean patterns for lat in range(-45, 31, 10): for lon in range(40, 121, 15): if abs(lat) > 50: continue base_height = np.random.uniform(1.5, 3.8) # Indian Ocean monsoon patterns if lat > 0: # Northern Indian Ocean wave_dir = np.random.normal(135, 30) # SE monsoon influence else: # Southern Indian Ocean wave_dir = np.random.normal(270, 35) # Westward wave_dir = wave_dir % 360 period = np.clip(4.5 + base_height * 1.4, 4, 15) travel_direction = (wave_dir + 180) % 360 dir_rad = np.radians(travel_direction) wave_speed = 9.81 * period / (2 * np.pi) velocity_scale = 0.001 u_velocity = wave_speed * np.cos(dir_rad) * velocity_scale v_velocity = wave_speed * np.sin(dir_rad) * velocity_scale particles.append({ 'lat': float(lat + np.random.uniform(-3, 3)), 'lon': float(lon + np.random.uniform(-5, 5)), 'wave_height': round(float(base_height), 2), 'wave_direction': round(float(wave_dir), 1), 'wave_period': round(float(period), 1), 'u_velocity': u_velocity, 'v_velocity': v_velocity, 'particle_size': max(1, min(6, base_height * 1.5)), 'color_intensity': min(1.0, base_height / 6.0), 'region': 'Indian_Demo' }) wave_heights = [p['wave_height'] for p in particles] return { 'timestamp': datetime.utcnow().isoformat(), 'data_source': 'DEMO_WAVE_PARTICLES', 'total_particles': len(particles), 'regions_processed': ['Atlantic_Demo', 'Pacific_Demo', 'Indian_Demo'], 'wave_statistics': { 'max_wave_height': float(max(wave_heights)), 'min_wave_height': float(min(wave_heights)), 'mean_wave_height': float(np.mean(wave_heights)), 'std_wave_height': float(np.std(wave_heights)) }, 'forecast_info': { 'forecast_hour': forecast_hour, 'model_run': 'DEMO', 'forecast_valid_time': (datetime.utcnow() + timedelta(hours=forecast_hour)).isoformat(), 'is_current': forecast_hour == 0 }, 'particles': particles }