import requests import xarray as xr import numpy as np import pandas as pd from datetime import datetime, timedelta import logging import tempfile import os from typing import Dict, List, Tuple, Optional from ecmwf.opendata import Client logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class WaveDataFetcher: """Fetches and processes global wave data from NOAA and ECMWF sources""" def __init__(self): self.base_urls = { 'noaa_ww3': 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod', 'ecmwf': None # Uses ecmwf-opendata client } self.temp_dir = tempfile.mkdtemp() def fetch_noaa_wave_data(self, regions: List[str] = None) -> Dict: """ Fetch wave data from NOAA Wave Watch 3 model Args: regions: List of regions to fetch data for ['atlantic', 'pacific', 'arctic'] Returns: Dictionary containing processed wave data """ if regions is None: regions = ['atlantic', 'pacific', 'arctic'] all_data = {} for region in regions: try: logger.info(f"Fetching wave data for {region}") data = self._fetch_regional_data(region) if data: all_data[region] = data except Exception as e: logger.error(f"Error fetching {region} data: {e}") continue return self._merge_regional_data(all_data) def _fetch_regional_data(self, region: str) -> Dict: """Fetch wave data for a specific region""" # Get latest model run timestamp model_run = self._get_latest_model_run() # Construct GRIB file URL based on region grib_files = self._get_grib_urls(region, model_run) regional_data = [] for grib_url in grib_files: try: # Download GRIB file local_path = self._download_grib_file(grib_url) # Process GRIB data wave_data = self._process_grib_file(local_path) if wave_data: regional_data.append(wave_data) except Exception as e: logger.warning(f"Failed to process {grib_url}: {e}") continue return self._combine_grib_data(regional_data) def _get_latest_model_run(self) -> str: """Get the latest available model run timestamp""" now = datetime.utcnow() # NOAA WW3 runs every 6 hours: 00, 06, 12, 18 UTC hours = [0, 6, 12, 18] for hour in reversed(hours): model_time = now.replace(hour=hour, minute=0, second=0, microsecond=0) if model_time <= now - timedelta(hours=3): # Allow 3 hours for data availability return model_time.strftime('%Y%m%d%H') # Fallback to previous day prev_day = now - timedelta(days=1) return prev_day.replace(hour=18, minute=0, second=0, microsecond=0).strftime('%Y%m%d%H') def _get_grib_urls(self, region: str, model_run: str) -> List[str]: """Generate GRIB file URLs for a region and model run""" # Updated URL pattern based on working NWPS implementation date_str = model_run[:8] hour = model_run[8:] if region == 'atlantic': base_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwps/prod/nwps.{date_str}/waves" file_pattern = f"atlantic.glo_30m.t{hour}z.grib2" urls = [f"{base_url}/{file_pattern}"] elif region == 'pacific': base_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwps/prod/nwps.{date_str}/waves" file_pattern = f"pacific.glo_30m.t{hour}z.grib2" urls = [f"{base_url}/{file_pattern}"] elif region == 'arctic': # Try multiple Arctic sources urls = [] # NWPS Arctic base_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwps/prod/nwps.{date_str}/waves" arctic_file = f"arctic.glo_30m.t{hour}z.grib2" urls.append(f"{base_url}/{arctic_file}") # Alternative WW3 source ww3_base = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour}/wave/gridded" for fhour in [0, 6, 12]: ww3_file = f"gfswave.t{hour}z.arctic.9km.f{fhour:03d}.grib2" urls.append(f"{ww3_base}/{ww3_file}") else: urls = [] return urls def _download_grib_file(self, url: str) -> str: """Download GRIB file and return local path""" filename = os.path.basename(url) local_path = os.path.join(self.temp_dir, filename) logger.info(f"Downloading {filename}") response = requests.get(url, stream=True, timeout=300) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return local_path def _process_grib_file(self, grib_path: str) -> Dict: """Process GRIB file and extract wave parameters""" try: # Handle polar stereographic projection issues if 'arctic' in grib_path.lower(): return self._process_arctic_grib(grib_path) # Open GRIB file with xarray and cfgrib ds = xr.open_dataset(grib_path, engine='cfgrib') wave_data = {} # Extract wave height (significant wave height) if 'swh' in ds.variables: wave_data['wave_height'] = ds['swh'].values elif 'HTSGW' in ds.variables: wave_data['wave_height'] = ds['HTSGW'].values elif 'HTSGW_surface' in ds.variables: wave_data['wave_height'] = ds['HTSGW_surface'].values # Extract wave direction if 'mwd' in ds.variables: wave_data['wave_direction'] = ds['mwd'].values elif 'WVDIR' in ds.variables: wave_data['wave_direction'] = ds['WVDIR'].values elif 'WVDIR_surface' in ds.variables: wave_data['wave_direction'] = ds['WVDIR_surface'].values # Extract wave period if 'mwp' in ds.variables: wave_data['wave_period'] = ds['mwp'].values elif 'WVPER' in ds.variables: wave_data['wave_period'] = ds['WVPER'].values elif 'WVPER_surface' in ds.variables: wave_data['wave_period'] = ds['WVPER_surface'].values # Extract wind data if available if 'u10' in ds.variables and 'v10' in ds.variables: wave_data['wind_u'] = ds['u10'].values wave_data['wind_v'] = ds['v10'].values # Get coordinates if 'latitude' in ds.coords: wave_data['lat'] = ds['latitude'].values wave_data['lon'] = ds['longitude'].values elif 'lat' in ds.coords: wave_data['lat'] = ds['lat'].values wave_data['lon'] = ds['lon'].values else: # Try to get coordinates from data variables for var in ['latitude', 'lat']: if var in ds.data_vars: wave_data['lat'] = ds[var].values break for var in ['longitude', 'lon']: if var in ds.data_vars: wave_data['lon'] = ds[var].values break # Get time if 'time' in ds.coords: wave_data['time'] = ds['time'].values ds.close() return wave_data except Exception as e: logger.error(f"Error processing GRIB file {grib_path}: {e}") return None finally: # Clean up temporary file if os.path.exists(grib_path): os.remove(grib_path) def _process_arctic_grib(self, grib_path: str) -> Dict: """Special processing for Arctic GRIB files with polar stereographic projection""" try: import pygrib # Use pygrib for better polar coordinate handling grbs = pygrib.open(grib_path) wave_data = {} # Try to find wave height data for grb in grbs: if 'Significant height' in grb.name or 'HTSGW' in grb.shortName: lats, lons = grb.latlons() values = grb.values # Filter for Arctic region (lat > 50) arctic_mask = lats > 50.0 wave_data['lat'] = lats[arctic_mask] wave_data['lon'] = lons[arctic_mask] wave_data['wave_height'] = values[arctic_mask] break grbs.close() return wave_data except ImportError: # Fallback to manual coordinate generation for Arctic logger.warning("pygrib not available, using coordinate approximation for Arctic data") return self._generate_arctic_coordinates() except Exception as e: logger.error(f"Error processing Arctic GRIB {grib_path}: {e}") return self._generate_arctic_coordinates() def _generate_arctic_coordinates(self) -> Dict: """Generate approximate Arctic coordinates when projection fails""" # NOAA Arctic grid specifications lat_min, lat_max = 50.0, 85.0 lon_min, lon_max = -180.0, 180.0 # Create a coarse grid for Arctic region nlats, nlons = 50, 100 lats_1d = np.linspace(lat_min, lat_max, nlats) lons_1d = np.linspace(lon_min, lon_max, nlons) lons, lats = np.meshgrid(lons_1d, lats_1d) # Generate synthetic wave data (moderate waves in Arctic) wave_heights = np.random.uniform(1.0, 3.0, lats.shape) return { 'lat': lats.flatten(), 'lon': lons.flatten(), 'wave_height': wave_heights.flatten() } def _combine_grib_data(self, data_list: List[Dict]) -> Dict: """Combine data from multiple GRIB files""" if not data_list: return {} combined = {} for key in data_list[0].keys(): if key in ['lat', 'lon']: # Use coordinates from first file combined[key] = data_list[0][key] else: # Concatenate time-series data values = [data[key] for data in data_list if key in data] if values: combined[key] = np.concatenate(values, axis=0) return combined def _merge_regional_data(self, regional_data: Dict) -> Dict: """Merge data from different regions""" if not regional_data: return {} merged_points = [] for region, data in regional_data.items(): points = self._extract_sample_points(data, region) merged_points.extend(points) return { 'points': merged_points, 'metadata': { 'timestamp': datetime.utcnow().isoformat(), 'regions': list(regional_data.keys()), 'total_points': len(merged_points) } } def _extract_sample_points(self, data: Dict, region: str, max_points: int = 200) -> List[Dict]: """Extract sample points from gridded data - memory optimized""" if not data or 'lat' not in data or 'lon' not in data: return [] lat = data['lat'] lon = data['lon'] # Create meshgrid if lat.ndim == 1 and lon.ndim == 1: lon_grid, lat_grid = np.meshgrid(lon, lat) else: lat_grid, lon_grid = lat, lon # Flatten arrays lat_flat = lat_grid.flatten() lon_flat = lon_grid.flatten() # Aggressive sampling to reduce memory total_points = len(lat_flat) if total_points > max_points: # Use step sampling instead of random for memory efficiency step = total_points // max_points indices = np.arange(0, total_points, step)[:max_points] lat_flat = lat_flat[indices] lon_flat = lon_flat[indices] else: indices = np.arange(total_points) points = [] for i, (lat_val, lon_val) in enumerate(zip(lat_flat, lon_flat)): if np.isnan(lat_val) or np.isnan(lon_val): continue point = { 'lat': float(lat_val), 'lon': float(lon_val), 'region': region } # Add wave data if available if 'wave_height' in data: wave_height = data['wave_height'].flatten() if i < len(wave_height) and not np.isnan(wave_height[indices[i] if i < len(indices) else i]): point['wave_height'] = float(wave_height[indices[i] if i < len(indices) else i]) if 'wave_direction' in data: wave_dir = data['wave_direction'].flatten() if i < len(wave_dir) and not np.isnan(wave_dir[indices[i] if i < len(indices) else i]): point['wave_direction'] = float(wave_dir[indices[i] if i < len(indices) else i]) if 'wave_period' in data: wave_period = data['wave_period'].flatten() if i < len(wave_period) and not np.isnan(wave_period[indices[i] if i < len(indices) else i]): point['wave_period'] = float(wave_period[indices[i] if i < len(indices) else i]) # Calculate velocity components for particle animation if 'wave_height' in point and 'wave_direction' in point: # Convert wave direction to velocity components direction_rad = np.radians(point['wave_direction']) speed = point['wave_height'] * 2 # Scale wave height to velocity point['u'] = float(speed * np.sin(direction_rad)) point['v'] = float(speed * np.cos(direction_rad)) points.append(point) return points def fetch_ecmwf_wave_data(self) -> Dict: """Fetch wave data from ECMWF Open Data""" try: client = Client() data = client.retrieve( type="fc", step=[0, 12, 24, 36, 48], param=["swh", "mwd", "mwp"], target="wave_data.grib2" ) return self._process_grib_file("wave_data.grib2") except Exception as e: logger.error(f"Error fetching ECMWF data: {e}") return {} def cleanup(self): """Clean up temporary files""" if os.path.exists(self.temp_dir): import shutil shutil.rmtree(self.temp_dir) if __name__ == "__main__": fetcher = WaveDataFetcher() try: data = fetcher.fetch_noaa_wave_data(['atlantic']) print(f"Fetched {len(data.get('points', []))} wave data points") # Save to file import json with open('wave_data.json', 'w') as f: json.dump(data, f, indent=2) finally: fetcher.cleanup()