import requests import xarray as xr import numpy as np import pandas as pd from datetime import datetime, timedelta import logging import tempfile import os import pygrib from typing import Dict, List, Optional from ecmwf.opendata import Client logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RealWaveDataFetcher: """ Real GRIB wave data fetcher - NO SYNTHETIC DATA Based on NWPS SWAN implementation for actual data retrieval """ def __init__(self): self.temp_dir = tempfile.mkdtemp() self.base_urls = { 'noaa_gfs': 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod', 'noaa_nwps': 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwps/prod', 'ecmwf': None # Uses client } def fetch_global_wave_data(self, regions: List[str] = None) -> Dict: """ Fetch real wave data from NOAA/ECMWF sources """ if regions is None: regions = ['atlantic', 'pacific', 'arctic'] logger.info("Starting real wave data fetch from GRIB sources") all_points = [] successful_regions = [] # Try ECMWF first for global data try: logger.info("Attempting ECMWF global wave data fetch") ecmwf_data = self._fetch_ecmwf_wave_data() if ecmwf_data: all_points.extend(ecmwf_data) successful_regions.append('ECMWF_Global') logger.info(f"Successfully fetched {len(ecmwf_data)} points from ECMWF") except Exception as e: logger.warning(f"ECMWF fetch failed: {e}") # Try NOAA regional data for region in regions: try: logger.info(f"Fetching NOAA {region} wave data") noaa_data = self._fetch_noaa_regional_data(region) if noaa_data: all_points.extend(noaa_data) successful_regions.append(f'NOAA_{region}') logger.info(f"Successfully fetched {len(noaa_data)} points from NOAA {region}") except Exception as e: logger.error(f"NOAA {region} fetch failed: {e}") if not all_points: raise Exception("No real wave data could be fetched from any source") return { 'points': all_points, 'metadata': { 'timestamp': datetime.utcnow().isoformat(), 'sources': successful_regions, 'total_points': len(all_points), 'data_type': 'REAL_GRIB_DATA' } } def _fetch_ecmwf_wave_data(self) -> List[Dict]: """ Fetch real wave data from ECMWF Open Data """ try: client = Client() # Try different ECMWF wave parameters that are actually available available_params = [ ["HTSGW"], # Significant height of combined wind waves and swell ["swh"], # Significant wave height ["tp"], # Total precipitation (fallback) ] for params in available_params: try: logger.info(f"Trying ECMWF parameters: {params}") client.retrieve( type="fc", step=[0, 6], param=params, target="ecmwf_wave_data.grib2" ) grib_file = "ecmwf_wave_data.grib2" return self._process_grib_file(grib_file, 'ECMWF') except Exception as e: logger.warning(f"ECMWF params {params} failed: {e}") continue raise Exception("No ECMWF wave parameters available") except Exception as e: logger.error(f"ECMWF data fetch error: {e}") return [] def _fetch_noaa_regional_data(self, region: str) -> List[Dict]: """ Fetch real NOAA wave data for specific region """ model_run = self._get_latest_model_run() grib_urls = self._get_noaa_grib_urls(region, model_run) regional_data = [] for url in grib_urls: try: logger.info(f"Downloading {os.path.basename(url)}") local_file = self._download_grib_file(url) if local_file: data_points = self._process_grib_file(local_file, f'NOAA_{region}') regional_data.extend(data_points) except Exception as e: logger.warning(f"Failed to process {url}: {e}") continue return regional_data def _get_latest_model_run(self) -> str: """ Get latest available NOAA model run """ now = datetime.utcnow() # NOAA runs at 00, 06, 12, 18 UTC model_hours = [0, 6, 12, 18] for hour in reversed(model_hours): model_time = now.replace(hour=hour, minute=0, second=0, microsecond=0) if model_time <= now - timedelta(hours=2): # Allow processing time return model_time.strftime('%Y%m%d%H') # Fallback to previous day prev_day = now - timedelta(days=1) return prev_day.replace(hour=18, minute=0, second=0, microsecond=0).strftime('%Y%m%d%H') def _get_noaa_grib_urls(self, region: str, model_run: str) -> List[str]: """ Get real NOAA GRIB file URLs """ date_str = model_run[:8] hour = model_run[8:] urls = [] if region == 'atlantic': # Try multiple NOAA sources base_gfs = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour}/wave/gridded" for fhour in [0, 6, 12]: # Atlantic basin url = f"{base_gfs}/gfswave.t{hour}z.atlantic.0p16.f{fhour:03d}.grib2" urls.append(url) elif region == 'pacific': base_gfs = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour}/wave/gridded" for fhour in [0, 6, 12]: # Pacific basin url = f"{base_gfs}/gfswave.t{hour}z.pacific.0p16.f{fhour:03d}.grib2" urls.append(url) elif region == 'arctic': base_gfs = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour}/wave/gridded" for fhour in [0, 6]: # Arctic basin url = f"{base_gfs}/gfswave.t{hour}z.arctic.9km.f{fhour:03d}.grib2" urls.append(url) return urls def _download_grib_file(self, url: str) -> Optional[str]: """ Download GRIB file from NOAA """ try: filename = os.path.basename(url) local_path = os.path.join(self.temp_dir, filename) response = requests.get(url, timeout=300) response.raise_for_status() with open(local_path, 'wb') as f: f.write(response.content) logger.info(f"Downloaded {filename} ({len(response.content)} bytes)") return local_path except requests.exceptions.RequestException as e: logger.error(f"Download failed for {url}: {e}") return None def _process_grib_file(self, grib_path: str, source: str) -> List[Dict]: """ Process GRIB file and extract real wave data """ points = [] try: # First try with cfgrib/xarray points = self._process_with_xarray(grib_path, source) if not points and 'arctic' in grib_path.lower(): # Special handling for Arctic polar stereographic points = self._process_arctic_with_pygrib(grib_path, source) except Exception as e: logger.error(f"Error processing {grib_path}: {e}") finally: # Clean up if os.path.exists(grib_path): os.remove(grib_path) return points def _process_with_xarray(self, grib_path: str, source: str) -> List[Dict]: """ Process GRIB with xarray/cfgrib """ try: ds = xr.open_dataset(grib_path, engine='cfgrib') # Extract wave parameters wave_data = {} # Wave height for var in ['swh', 'HTSGW', 'HTSGW_surface']: if var in ds.variables: wave_data['wave_height'] = ds[var].values break # Wave direction for var in ['mwd', 'WVDIR', 'WVDIR_surface']: if var in ds.variables: wave_data['wave_direction'] = ds[var].values break # Wave period for var in ['mwp', 'WVPER', 'WVPER_surface']: if var in ds.variables: wave_data['wave_period'] = ds[var].values break # Coordinates if 'latitude' in ds.coords and 'longitude' in ds.coords: lats = ds['latitude'].values lons = ds['longitude'].values elif 'lat' in ds.coords and 'lon' in ds.coords: lats = ds['lat'].values lons = ds['lon'].values else: raise Exception("No coordinate data found") ds.close() # Convert to points return self._extract_data_points(wave_data, lats, lons, source) except Exception as e: logger.error(f"xarray processing failed: {e}") return [] def _process_arctic_with_pygrib(self, grib_path: str, source: str) -> List[Dict]: """ Process Arctic GRIB with pygrib for polar stereographic """ try: grbs = pygrib.open(grib_path) points = [] for grb in grbs: if 'Significant height' in grb.name: lats, lons = grb.latlons() values = grb.values # Filter for valid Arctic region mask = (lats > 50.0) & (lats < 90.0) & (~np.isnan(values)) & (values > 0) valid_lats = lats[mask] valid_lons = lons[mask] valid_heights = values[mask] # Sample data to manage memory if len(valid_lats) > 500: indices = np.random.choice(len(valid_lats), 500, replace=False) valid_lats = valid_lats[indices] valid_lons = valid_lons[indices] valid_heights = valid_heights[indices] for lat, lon, height in zip(valid_lats, valid_lons, valid_heights): points.append({ 'lat': float(lat), 'lon': float(lon), 'wave_height': float(height), 'wave_direction': 0.0, # Default 'wave_period': 6.0, # Default 'region': source, 'u': 0.0, 'v': 0.0 }) break grbs.close() logger.info(f"Processed {len(points)} Arctic points with pygrib") return points except Exception as e: logger.error(f"pygrib Arctic processing failed: {e}") return [] def _extract_data_points(self, wave_data: Dict, lats, lons, source: str) -> List[Dict]: """ Extract data points from gridded wave data """ points = [] try: # Ensure we have 2D coordinate arrays if lats.ndim == 1 and lons.ndim == 1: lon_grid, lat_grid = np.meshgrid(lons, lats) else: lat_grid, lon_grid = lats, lons # Flatten arrays lat_flat = lat_grid.flatten() lon_flat = lon_grid.flatten() # Sample points to manage memory (max 1000 points per file) n_points = len(lat_flat) if n_points > 1000: indices = np.random.choice(n_points, 1000, replace=False) lat_flat = lat_flat[indices] lon_flat = lon_flat[indices] else: indices = np.arange(n_points) # Extract wave parameters wave_heights = None wave_directions = None wave_periods = None if 'wave_height' in wave_data: wave_heights = wave_data['wave_height'].flatten() if len(indices) < len(wave_heights): wave_heights = wave_heights[indices] if 'wave_direction' in wave_data: wave_directions = wave_data['wave_direction'].flatten() if len(indices) < len(wave_directions): wave_directions = wave_directions[indices] if 'wave_period' in wave_data: wave_periods = wave_data['wave_period'].flatten() if len(indices) < len(wave_periods): wave_periods = wave_periods[indices] # Create data points for i, (lat, lon) in enumerate(zip(lat_flat, lon_flat)): if np.isnan(lat) or np.isnan(lon): continue height = wave_heights[i] if wave_heights is not None and i < len(wave_heights) else 1.0 direction = wave_directions[i] if wave_directions is not None and i < len(wave_directions) else 0.0 period = wave_periods[i] if wave_periods is not None and i < len(wave_periods) else 6.0 if np.isnan(height) or height <= 0: continue # Calculate velocity components direction_rad = np.radians(direction) speed = height * 0.3 u = speed * np.sin(direction_rad) v = speed * np.cos(direction_rad) points.append({ 'lat': float(lat), 'lon': float(lon), 'wave_height': float(height), 'wave_direction': float(direction), 'wave_period': float(period), 'region': source, 'u': float(u), 'v': float(v) }) logger.info(f"Extracted {len(points)} valid data points from {source}") return points except Exception as e: logger.error(f"Data extraction failed: {e}") return [] def cleanup(self): """Clean up temporary files""" try: import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) except Exception as e: logger.warning(f"Cleanup warning: {e}") if __name__ == "__main__": fetcher = RealWaveDataFetcher() try: data = fetcher.fetch_global_wave_data(['atlantic']) print(f"Fetched {len(data['points'])} real wave data points") print(f"Sources: {data['metadata']['sources']}") finally: fetcher.cleanup()