Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import io | |
| import base64 | |
| import time | |
| import logging | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime, timezone | |
| from typing import List, Tuple, Optional | |
| import gradio as gr | |
| import requests | |
| import numpy as np | |
| import xarray as xr | |
| from PIL import Image | |
| from matplotlib import cm, colors | |
| from scipy.interpolate import griddata | |
| from scipy.ndimage import map_coordinates | |
| from eccodes import ( | |
| codes_grib_new_from_file, | |
| codes_release, | |
| codes_get, | |
| codes_get_values, | |
| codes_get_double_array, | |
| codes_get_long, | |
| codes_get_size, | |
| ) | |
| S3_BUCKET = "https://noaa-rrfs-pds.s3.amazonaws.com" | |
| PREFIX_ROOT = "rrfs_a" | |
| logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') | |
| LOG = logging.getLogger("rrfs") | |
| def list_bucket(prefix: str): | |
| params = {"delimiter": "/", "prefix": prefix} | |
| r = requests.get(S3_BUCKET + "/", params=params, timeout=20) | |
| r.raise_for_status() | |
| return ET.fromstring(r.text) | |
| def latest_day_and_cycle() -> Tuple[str, str]: | |
| day = datetime.now(timezone.utc).strftime("%Y%m%d") | |
| root = list_bucket(f"{PREFIX_ROOT}/rrfs.{day}/") | |
| hours = [] | |
| for cp in root.findall("{http://s3.amazonaws.com/doc/2006-03-01/}CommonPrefixes"): | |
| pref = cp.find("{http://s3.amazonaws.com/doc/2006-03-01/}Prefix").text | |
| parts = pref.strip("/").split("/") | |
| if len(parts) >= 3 and parts[2].isdigit(): | |
| hours.append(parts[2]) | |
| if not hours: | |
| raise gr.Error(f"No cycles found for {day}") | |
| return day, max(hours) | |
| def list_prslev(day: str, hh: str) -> List[str]: | |
| root = list_bucket(f"{PREFIX_ROOT}/rrfs.{day}/{hh}/") | |
| keys = [] | |
| for ct in root.findall("{http://s3.amazonaws.com/doc/2006-03-01/}Contents"): | |
| key = ct.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text | |
| if key.endswith(".grib2") and ".prslev" in key: | |
| keys.append(key) | |
| return sorted(keys) | |
| def parse_domains_and_hours(keys: List[str]) -> Tuple[List[str], List[str]]: | |
| domains = set() | |
| hours = set() | |
| for k in keys: | |
| m = re.search(r"\.f(\d{3})\.([a-z]+)\.grib2$", k) | |
| if m: | |
| hours.add(m.group(1)) | |
| domains.add(m.group(2)) | |
| return sorted(domains), sorted(hours) | |
| def _wrap_iframe(inner_html: str) -> str: | |
| b = inner_html.encode('utf-8') | |
| enc = base64.b64encode(b).decode('ascii') | |
| return f'<iframe src="data:text/html;base64,{enc}" style="width:100%; height:520px; border:none;"></iframe>' | |
| def build_key(day: str, hh: str, dom: str, fhr: str) -> str: | |
| # Build candidate keys for the selected domain | |
| if dom == "na": | |
| # Defer to NA resolver which tries multiple official patterns | |
| key, _ = resolve_na_key(day, hh, fhr) | |
| return key | |
| else: | |
| candidates = [ | |
| f"{PREFIX_ROOT}/rrfs.{day}/{hh}/rrfs.t{hh}z.prslev.2p5km.f{fhr}.{dom}.grib2", | |
| f"{PREFIX_ROOT}/rrfs.{day}/{hh}/rrfs.t{hh}z.prslev.f{fhr}.{dom}.grib2", | |
| ] | |
| for c in candidates: | |
| # check existence via idx (small) | |
| r = requests.get(f"{S3_BUCKET}/{c}.idx", timeout=20) | |
| if r.status_code == 200: | |
| return c | |
| raise gr.Error("No matching GRIB key found for selection") | |
| def ensure_refc_in_idx(key: str) -> Tuple[bool, str]: | |
| idx_url = f"{S3_BUCKET}/{key}.idx" | |
| r = requests.get(idx_url, timeout=20) | |
| if r.status_code != 200: | |
| return False, "Index not found" | |
| refc_lines = "\n".join([ln for ln in r.text.splitlines() if "REFC:" in ln]) | |
| return ("REFC:" in r.text), refc_lines | |
| def fetch_latest(dom: str, fhr: str, quality: str = "high"): | |
| t0 = time.time() | |
| debug: list[str] = [] | |
| def emit(msg: str): | |
| LOG.info(msg) | |
| debug.append(msg) | |
| day, hh = latest_day_and_cycle() | |
| emit(f"Cycle resolved: {day} {hh}Z; domain={dom}; fhr={fhr}") | |
| keys = list_prslev(day, hh) | |
| if not keys: | |
| raise gr.Error("No prslev keys available for latest cycle") | |
| if dom == "na": | |
| key, idx_text = resolve_na_key(day, hh, fhr) | |
| if "REFC:" not in idx_text: | |
| raise gr.Error("Selected NA file does not contain REFC") | |
| refc_lines = "\n".join([ln for ln in idx_text.splitlines() if "REFC:" in ln]) | |
| LOG.info(f"NA key: {key}") | |
| debug.append(f"NA key: {key}") | |
| else: | |
| key = build_key(day, hh, dom, fhr) | |
| ok, refc_lines = ensure_refc_in_idx(key) | |
| if not ok: | |
| raise gr.Error("Selected file does not contain REFC") | |
| emit(f"Key: {key}") | |
| url = f"{S3_BUCKET}/{key}" | |
| os.makedirs("data", exist_ok=True) | |
| # For NA, slice REFC-only messages using HTTP Range to keep size practical | |
| if dom == "na": | |
| out_path = os.path.join("data", os.path.basename(key).replace(".grib2", ".refc.grib2")) | |
| # idx_text already validated in resolve_na_key | |
| t1 = time.time() | |
| slice_refc_grib(url, idx_text, out_path, emit) | |
| emit(f"NA slice saved: {out_path} ({os.path.getsize(out_path)/1e6:.2f} MB) in {time.time()-t1:.2f}s") | |
| else: | |
| out_path = os.path.join("data", os.path.basename(key)) | |
| t1 = time.time() | |
| with requests.get(url, stream=True, timeout=60) as r: | |
| r.raise_for_status() | |
| with open(out_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| f.write(chunk) | |
| emit(f"Download complete: {out_path} ({os.path.getsize(out_path)/1e6:.2f} MB) in {time.time()-t1:.2f}s") | |
| size_mb = os.path.getsize(out_path) / (1024 * 1024) | |
| # Choose quality (stride + target grid) | |
| stride, (gx, gy) = quality_to_stride_and_grid(quality) | |
| emit(f"Quality: {quality} -> stride={stride}, grid={gx}x{gy}") | |
| t2 = time.time() | |
| html = generate_leaflet_overlay(out_path, emit, stride=stride, grid=(gx, gy)) | |
| emit(f"Render time: {time.time()-t2:.2f}s; total elapsed: {time.time()-t0:.2f}s") | |
| return ( | |
| f"Saved: {out_path} (\u2248 {size_mb:.1f} MiB)\nCycle: {day} {hh}Z\nURL: {url}", | |
| refc_lines or "(REFC present; see .idx for details)", | |
| url, | |
| html, | |
| "\n".join(debug), | |
| ) | |
| def resolve_na_key(day: str, hh: str, fhr: str) -> Tuple[str, str]: | |
| """Try multiple naming patterns for NA and return (key, idx_text) with REFC present.""" | |
| patterns = [ | |
| f"{PREFIX_ROOT}/rrfs.{day}/{hh}/rrfs.t{hh}z.prslev.3km.f{fhr}.na.grib2", | |
| f"{PREFIX_ROOT}/rrfs.{day}/{hh}/rrfs.t{hh}z.prslev.f{fhr}.grib2", # NA variant without domain tag | |
| f"{PREFIX_ROOT}/rrfs.{day}/{hh}/rrfs.t{hh}z.natlev.3km.f{fhr}.na.grib2", | |
| ] | |
| for key in patterns: | |
| idx_url = f"{S3_BUCKET}/{key}.idx" | |
| r = requests.get(idx_url, timeout=15) | |
| if r.status_code == 200 and "REFC:" in r.text: | |
| return key, r.text | |
| # If none matched, scan the hour directory for any *.na.grib2 with REFC | |
| listing = list_bucket(f"{PREFIX_ROOT}/rrfs.{day}/{hh}/") | |
| for ct in listing.findall("{http://s3.amazonaws.com/doc/2006-03-01/}Contents"): | |
| key = ct.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text | |
| if key.endswith(".grib2") and (key.endswith(".na.grib2") or key.endswith(".grib2")) and ".prslev" in key: | |
| r = requests.get(f"{S3_BUCKET}/{key}.idx", timeout=15) | |
| if r.status_code == 200 and "REFC:" in r.text: | |
| return key, r.text | |
| raise gr.Error("No NA GRIB with REFC found for the latest cycle; try a different hour") | |
| def slice_refc_grib(grib_url: str, idx_text: str, out_path: str, emit: Optional[callable] = None) -> None: | |
| # Parse idx lines: format "<n>:<offset>:d=...:<shortName>:..." | |
| offsets = [] | |
| entries = [] | |
| for ln in idx_text.splitlines(): | |
| parts = ln.split(":") | |
| if len(parts) < 4: | |
| continue | |
| try: | |
| off = int(parts[1]) | |
| except ValueError: | |
| continue | |
| short = parts[3] | |
| offsets.append(off) | |
| entries.append((off, short)) | |
| if not entries: | |
| raise gr.Error("Empty .idx; cannot slice") | |
| # Sort by offset | |
| offsets_sorted = sorted(offsets) | |
| # Determine the first REFC record only (keeps file small and simple) | |
| refc_offsets = sorted([off for off, short in entries if short == "REFC"]) | |
| if not refc_offsets: | |
| raise gr.Error("No REFC entries found in .idx") | |
| start = refc_offsets[0] | |
| # Read GRIB header to determine exact message length | |
| hdr = requests.get(grib_url, headers={"Range": f"bytes={start}-{start+15}"}, timeout=30) | |
| hdr.raise_for_status() | |
| if len(hdr.content) < 16 or hdr.content[:4] != b"GRIB": | |
| raise gr.Error("Could not read GRIB header for NA slice") | |
| total_len = int.from_bytes(hdr.content[8:16], "big") | |
| end = start + total_len - 1 | |
| if emit: | |
| emit(f"HTTP Range: {start}-{end} (len={total_len} bytes)") | |
| headers = {"Range": f"bytes={start}-{end}"} | |
| with requests.get(grib_url, headers=headers, stream=True, timeout=120) as r: | |
| if r.status_code not in (200, 206): | |
| r.raise_for_status() | |
| with open(out_path, "wb") as out: | |
| for chunk in r.iter_content(chunk_size=1024 * 512): | |
| if chunk: | |
| out.write(chunk) | |
| DEFAULT_NA_STRIDE = 1 # Use full resolution for accuracy | |
| def quality_to_stride_and_grid(quality: str) -> Tuple[int, Tuple[int, int]]: | |
| """Map quality preset to (stride, (nx, ny)) for rendering. | |
| - fast: stride=3, grid 900x540 (fast, ~20 sec) | |
| - balanced: stride=2, grid 1200x720 (good quality, ~30 sec) | |
| - high: stride=1, grid 1799x1059 (full RRFS NA resolution, ~45 sec) | |
| """ | |
| q = (quality or "balanced").strip().lower() | |
| if q in ("high", "hi"): | |
| # Full resolution: stride=1, full RRFS NA 1799x1059 grid | |
| return 1, (1799, 1059) | |
| if q in ("balanced", "bal", "med", "medium"): | |
| # Balanced: stride=2, high quality grid | |
| return 2, (1200, 720) | |
| # fast: stride=3 | |
| return 3, (900, 540) | |
| def generate_leaflet_overlay(grib_path: str, emit: Optional[callable] = None, stride: int = DEFAULT_NA_STRIDE, grid: Tuple[int, int] = (640, 480)) -> str: | |
| """Render a Leaflet PNG overlay from a GRIB file containing REFC. | |
| Uses eccodes lat/lon arrays (already accurate) with fast interpolation. | |
| """ | |
| try: | |
| ds = xr.open_dataset( | |
| grib_path, | |
| engine="cfgrib", | |
| backend_kwargs={ | |
| "indexpath": "", | |
| "filter_by_keys": {"shortName": "refc"}, | |
| }, | |
| ) | |
| if not list(ds.data_vars): | |
| raise RuntimeError("cfgrib returned empty dataset for REFC") | |
| if emit: | |
| emit("Parse path: cfgrib") | |
| # Pick the first data variable | |
| var_name = list(ds.data_vars)[0] | |
| da = ds[var_name] | |
| # Drop time dimension if present | |
| for dim in ["time", "valid_time", "step"]: | |
| if dim in da.dims and da.sizes.get(dim, 1) == 1: | |
| da = da.isel({dim: 0}) | |
| # Lat/lon variables | |
| lat = ds.get("latitude") or ds.coords.get("latitude") | |
| lon = ds.get("longitude") or ds.coords.get("longitude") | |
| if lat is None or lon is None: | |
| # Some cfgrib versions expose lat/lon on the dataarray | |
| lat = da.coords.get("latitude") | |
| lon = da.coords.get("longitude") | |
| if lat is None or lon is None: | |
| raise RuntimeError("No latitude/longitude coords in cfgrib dataset") | |
| latv = np.array(lat) | |
| lonv = np.array(lon) | |
| data = np.array(da) | |
| return _render_leaflet_from_fields(latv, lonv, data, emit, grid) | |
| except Exception: | |
| # Prefer accurate geolocation for NA using cached lat/lon arrays (built once) | |
| try: | |
| if grib_path.endswith(".refc.grib2"): | |
| lat2d, lon2d, meta = _get_or_build_na_latlon_cache(grib_path, stride, emit) | |
| # Read values from the slice and reshape/decimate to match cached arrays | |
| with open(grib_path, "rb") as f: | |
| gid = codes_grib_new_from_file(f) | |
| if gid is None: | |
| raise RuntimeError("Empty GRIB slice") | |
| try: | |
| nx = meta["nx"] | |
| ny = meta["ny"] | |
| vals = np.array(codes_get_values(gid)) | |
| finally: | |
| codes_release(gid) | |
| if nx * ny != vals.size: | |
| # Fallback: infer from message | |
| n = int(vals.size) | |
| nx = int(np.sqrt(n)) | |
| ny = max(1, n // max(1, nx)) | |
| vals2d = vals.reshape(ny, nx) | |
| s = meta.get("stride", DEFAULT_NA_STRIDE) | |
| vals2d = vals2d[::s, ::s] | |
| if emit: | |
| vmin = float(np.nanmin(vals2d)) if vals2d.size else float("nan") | |
| vmax = float(np.nanmax(vals2d)) if vals2d.size else float("nan") | |
| emit(f"Parse path: cached NA lat/lon; vals shape={vals2d.shape} range=[{vmin:.1f},{vmax:.1f}]") | |
| return _render_leaflet_from_fields(lat2d, lon2d, vals2d, emit, grid) | |
| except Exception as e_cache: | |
| if emit: | |
| emit(f"NA cache path failed: {e_cache}") | |
| # Try arrays decimated (no cache) | |
| try: | |
| # Use higher limit for full resolution: RRFS NA 3km is 1799x1059 = ~1.9M points | |
| latv, lonv, data = _read_refc_with_eccodes_arrays_decimated(grib_path, max_points=2_500_000) | |
| if emit: | |
| lat_min = float(np.nanmin(latv)) | |
| lat_max = float(np.nanmax(latv)) | |
| lon_min = float(np.nanmin(lonv)) | |
| lon_max = float(np.nanmax(lonv)) | |
| emit(f"Parse path: ecCodes arrays (decimated) extents lat=[{lat_min:.3f},{lat_max:.3f}] lon=[{lon_min:.3f},{lon_max:.3f}]") | |
| return _render_leaflet_from_fields(latv, lonv, data, emit, grid) | |
| except Exception as e_arr: | |
| if emit: | |
| emit(f"ecCodes arrays fallback failed: {e_arr}") | |
| # Last resort: fast bbox (may misalign on Lambert grids) | |
| is_slice = grib_path.endswith(".refc.grib2") | |
| lat_min, lat_max, lon_min, lon_max, grid = _read_refc_with_eccodes( | |
| grib_path, require_shortname=not is_slice | |
| ) | |
| if emit: | |
| emit("Parse path: ecCodes bbox") | |
| try: | |
| shape = np.array(grid).shape | |
| except Exception: | |
| shape = ("?", "?") | |
| emit(f"BBox raw lat=[{lat_min:.3f},{lat_max:.3f}] lon=[{lon_min:.3f},{lon_max:.3f}] grid_shape={shape}") | |
| return _render_leaflet_from_bbox_grid(lat_min, lat_max, lon_min, lon_max, grid, emit) | |
| def _render_leaflet_from_fields(latv: np.ndarray, lonv: np.ndarray, data: np.ndarray, emit: Optional[callable] = None, grid: Tuple[int, int] = (640, 480)) -> str: | |
| # Ensure 2D arrays; some products expose y/x dims named differently | |
| if data.ndim == 3: | |
| squeeze_axes = tuple(i for i, s in enumerate(data.shape) if s == 1) | |
| if squeeze_axes: | |
| data = np.squeeze(data, axis=squeeze_axes) | |
| if isinstance(latv, np.ndarray) and latv.ndim == len(squeeze_axes) + 2: | |
| latv = np.squeeze(latv, axis=squeeze_axes) | |
| if isinstance(lonv, np.ndarray) and lonv.ndim == len(squeeze_axes) + 2: | |
| lonv = np.squeeze(lonv, axis=squeeze_axes) | |
| # Keep full source resolution for maximum quality and alignment accuracy | |
| # (removed dynamic decimation that was degrading quality) | |
| # Normalize longitudes to [-180, 180] before interpolation | |
| try: | |
| lonv = ((lonv + 180.0) % 360.0) - 180.0 | |
| except Exception: | |
| pass | |
| # Mask fill values and unrealistic values | |
| data = np.where((data > 900) | (data < -100), np.nan, data) | |
| # Build a target regular lat/lon grid for Leaflet overlay | |
| lat_min = float(np.nanmin(latv)) | |
| lat_max = float(np.nanmax(latv)) | |
| lon_min = float(np.nanmin(lonv)) | |
| lon_max = float(np.nanmax(lonv)) | |
| if emit: | |
| emit(f"Field extents lat=[{lat_min:.3f},{lat_max:.3f}] lon=[{lon_min:.3f},{lon_max:.3f}]") | |
| # Output grid size per quality setting | |
| ny, nx = grid[1], grid[0] | |
| tgt_lats = np.linspace(lat_min, lat_max, ny) | |
| tgt_lons = np.linspace(lon_min, lon_max, nx) | |
| grid_lon, grid_lat = np.meshgrid(tgt_lons, tgt_lats) | |
| # Interpolate to regular grid | |
| points = np.column_stack((lonv.ravel(), latv.ravel())) | |
| values = data.ravel() | |
| # Mask missing/extreme values | |
| mask = np.isfinite(points[:, 0]) & np.isfinite(points[:, 1]) & np.isfinite(values) | |
| points = points[mask] | |
| values = values[mask] | |
| # Use linear interpolation (fast and accurate for eccodes lat/lon arrays) | |
| interp_method = "linear" | |
| try: | |
| grid = griddata(points, values, (grid_lon, grid_lat), method="linear") | |
| except Exception: | |
| interp_method = "nearest" | |
| grid = griddata(points, values, (grid_lon, grid_lat), method="nearest") | |
| if emit: | |
| emit(f"Interpolation: {interp_method}; {len(points)} pts → {ny}x{nx} grid") | |
| # Color mapping for reflectivity (0..75 dBZ); transparent under 5 dBZ | |
| vmin, vmax = 0.0, 75.0 | |
| norm = colors.Normalize(vmin=vmin, vmax=vmax) | |
| cmap = cm.get_cmap("turbo") | |
| rgba = cmap(norm(np.clip(grid, vmin, vmax))) # (ny, nx, 4) | |
| alpha = np.where(np.isnan(grid) | (grid < 5.0), 0.0, 0.65) | |
| rgba[..., 3] = alpha | |
| img = (rgba * 255).astype(np.uint8) | |
| image = Image.fromarray(img, mode="RGBA") | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| encoded = base64.b64encode(buf.getvalue()).decode("ascii") | |
| if emit: | |
| emit(f"Overlay PNG size: {len(buf.getvalue())} bytes") | |
| html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset=\"utf-8\" /> | |
| <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"/> | |
| <link rel=\"stylesheet\" href=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.css\"/> | |
| <style>#map {{ height: 520px; width: 100%; }}</style> | |
| </head> | |
| <body> | |
| <div id=\"map\"></div> | |
| <script src=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.js\"></script> | |
| <script> | |
| var map = L.map('map').setView([{(lat_min + lat_max)/2:.4f}, {(lon_min + lon_max)/2:.4f}], 6); | |
| L.tileLayer('https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{ | |
| maxZoom: 12, | |
| attribution: '© OpenStreetMap contributors' | |
| }}).addTo(map); | |
| var bounds = L.latLngBounds([[{lat_min:.6f}, {lon_min:.6f}], [{lat_max:.6f}, {lon_max:.6f}]]); | |
| var img = 'data:image/png;base64,{encoded}'; | |
| L.imageOverlay(img, bounds, {{opacity: 1.0, interactive: false}}).addTo(map); | |
| map.fitBounds(bounds); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return _wrap_iframe(html) | |
| def _read_refc_with_eccodes(grib_path: str, require_shortname: bool = True): | |
| """Read first REFC field in GRIB via ecCodes and return bbox and grid. | |
| Returns: (lat_min, lat_max, lon_min, lon_max, grid2d) | |
| """ | |
| def _dims_from_gid(gid): | |
| # Try Nx/Ny (Lambert), then Ni/Nj (lat-lon), else derive from numberOfPoints | |
| try: | |
| nx = int(codes_get(gid, "Nx")) | |
| ny = int(codes_get(gid, "Ny")) | |
| return nx, ny | |
| except Exception: | |
| pass | |
| try: | |
| nx = int(codes_get(gid, "Ni")) | |
| ny = int(codes_get(gid, "Nj")) | |
| return nx, ny | |
| except Exception: | |
| pass | |
| n = int(codes_get(gid, "numberOfPoints")) | |
| nx = int(np.sqrt(n)) or 1 | |
| ny = max(1, n // max(1, nx)) | |
| return nx, ny | |
| def _bbox_from_gid(gid): | |
| try: | |
| la1 = float(codes_get(gid, "latitudeOfFirstGridPointInDegrees")) | |
| lo1 = float(codes_get(gid, "longitudeOfFirstGridPointInDegrees")) | |
| la2 = float(codes_get(gid, "latitudeOfLastGridPointInDegrees")) | |
| lo2 = float(codes_get(gid, "longitudeOfLastGridPointInDegrees")) | |
| return min(la1, la2), max(la1, la2), min(lo1, lo2), max(lo1, lo2) | |
| except Exception: | |
| # Broad fallback | |
| return -90.0, 90.0, -180.0, 180.0 | |
| with open(grib_path, "rb") as f: | |
| if not require_shortname: | |
| gid = codes_grib_new_from_file(f) | |
| if gid is None: | |
| raise gr.Error("Empty GRIB file") | |
| nx, ny = _dims_from_gid(gid) | |
| vals = np.array(codes_get_values(gid)) | |
| lat_min, lat_max, lon_min, lon_max = _bbox_from_gid(gid) | |
| codes_release(gid) | |
| try: | |
| grid = vals.reshape(ny, nx) | |
| except Exception: | |
| grid = vals.reshape(-1, nx) | |
| return lat_min, lat_max, lon_min, lon_max, grid | |
| # Require a REFC message: scan until found | |
| while True: | |
| gid = codes_grib_new_from_file(f) | |
| if gid is None: | |
| break | |
| try: | |
| short = str(codes_get(gid, "shortName")).lower() | |
| except Exception: | |
| codes_release(gid) | |
| continue | |
| if short != "refc": | |
| codes_release(gid) | |
| continue | |
| nx, ny = _dims_from_gid(gid) | |
| vals = np.array(codes_get_values(gid)) | |
| lat_min, lat_max, lon_min, lon_max = _bbox_from_gid(gid) | |
| codes_release(gid) | |
| try: | |
| grid = vals.reshape(ny, nx) | |
| except Exception: | |
| grid = vals.reshape(-1, nx) | |
| return lat_min, lat_max, lon_min, lon_max, grid | |
| raise gr.Error("REFC field not found in GRIB file (eccodes)") | |
| def _render_leaflet_from_bbox_grid(lat_min: float, lat_max: float, lon_min: float, lon_max: float, grid: np.ndarray, emit: Optional[callable] = None) -> str: | |
| # Normalize, colorize, and overlay with bbox extents | |
| grid = np.array(grid) | |
| vmin, vmax = 0.0, 75.0 | |
| norm = colors.Normalize(vmin=vmin, vmax=vmax) | |
| cmap = cm.get_cmap("turbo") | |
| rgba = cmap(norm(np.clip(grid, vmin, vmax))) | |
| alpha = np.where(np.isnan(grid) | (grid < 5.0), 0.0, 0.65) | |
| rgba[..., 3] = alpha | |
| image = Image.fromarray((rgba * 255).astype(np.uint8), mode="RGBA") | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| encoded = base64.b64encode(buf.getvalue()).decode("ascii") | |
| # Normalize longitudes to [-180, 180] so Leaflet positions overlay correctly | |
| def _norm_lon(x): | |
| # Convert any [0, 360] longitudes to [-180, 180] | |
| return ((x + 180.0) % 360.0) - 180.0 | |
| lon_min_n = _norm_lon(lon_min) | |
| lon_max_n = _norm_lon(lon_max) | |
| # If normalization inverted order, swap | |
| if lon_min_n > lon_max_n: | |
| lon_min_n, lon_max_n = lon_max_n, lon_min_n | |
| if emit: | |
| try: | |
| gshape = grid.shape | |
| except Exception: | |
| gshape = ("?","?") | |
| LOG.info(f"BBox normalized lat=[{lat_min:.3f},{lat_max:.3f}] lon=[{lon_min_n:.3f},{lon_max_n:.3f}] grid={gshape}") | |
| html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset=\"utf-8\" /> | |
| <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"/> | |
| <link rel=\"stylesheet\" href=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.css\"/> | |
| <style>#map {{ height: 520px; width: 100%; }}</style> | |
| </head> | |
| <body> | |
| <div id=\"map\"></div> | |
| <script src=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.js\"></script> | |
| <script> | |
| var map = L.map('map').setView([{(lat_min + lat_max)/2:.4f}, {((_norm_lon(lon_min) + _norm_lon(lon_max))/2):.4f}], 4); | |
| L.tileLayer('https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{ | |
| maxZoom: 9, | |
| attribution: '© OpenStreetMap contributors' | |
| }}).addTo(map); | |
| var bounds = L.latLngBounds([[{lat_min:.6f}, {lon_min_n:.6f}], [{lat_max:.6f}, {lon_max_n:.6f}]]); | |
| var img = 'data:image/png;base64,{encoded}'; | |
| L.imageOverlay(img, bounds, {{opacity: 1.0, interactive: false}}).addTo(map); | |
| map.fitBounds(bounds); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return _wrap_iframe(html) | |
| def _read_refc_with_eccodes_arrays_decimated(grib_path: str, max_points: int = 200_000): | |
| """Read REFC values and lat/lon arrays from GRIB (first message) and decimate to <= max_points. | |
| Returns lat2d, lon2d, vals2d (possibly decimated) suitable for interpolation. | |
| """ | |
| with open(grib_path, "rb") as f: | |
| # Read first message; for slices it's the REFC message | |
| gid = codes_grib_new_from_file(f) | |
| if gid is None: | |
| raise RuntimeError("Empty GRIB") | |
| try: | |
| # Validate or fallback shortName | |
| try: | |
| sn = str(codes_get(gid, "shortName")).lower() | |
| except Exception: | |
| sn = "" | |
| # Extract arrays | |
| vals = np.array(codes_get_values(gid)) # (npoints,) | |
| # Some environments use codes_get_double_array for coordinate arrays | |
| lats = np.array(codes_get_double_array(gid, "latitudes")) | |
| lons = np.array(codes_get_double_array(gid, "longitudes")) | |
| # Determine shape | |
| nx = codes_get_long(gid, "Nx") or codes_get_long(gid, "Ni") | |
| ny = codes_get_long(gid, "Ny") or codes_get_long(gid, "Nj") | |
| if not nx or not ny: | |
| n = int(codes_get_size(gid, "values")) | |
| nx = int(np.sqrt(n)) | |
| ny = max(1, n // max(1, nx)) | |
| nx, ny = int(nx), int(ny) | |
| finally: | |
| codes_release(gid) | |
| # Reshape | |
| # Safeguard: if lengths mismatch, trim to min length | |
| m = min(len(vals), len(lats), len(lons)) | |
| vals = vals[:m] | |
| lats = lats[:m] | |
| lons = lons[:m] | |
| # Reshape if possible | |
| if nx * ny == m: | |
| vals2d = vals.reshape(ny, nx) | |
| lat2d = lats.reshape(ny, nx) | |
| lon2d = lons.reshape(ny, nx) | |
| else: | |
| # Fallback to 1D usage | |
| lat2d = lats | |
| lon2d = lons | |
| vals2d = vals | |
| # Normalize lon to [-180, 180] | |
| lon2d = ((lon2d + 180.0) % 360.0) - 180.0 | |
| # Decimate to limit points for interpolation | |
| if lat2d.ndim == 2: | |
| ny0, nx0 = lat2d.shape | |
| s = int(np.ceil(np.sqrt((ny0 * nx0) / max_points))) | |
| s = max(1, s) | |
| lat2d = lat2d[::s, ::s] | |
| lon2d = lon2d[::s, ::s] | |
| vals2d = vals2d[::s, ::s] | |
| else: | |
| # 1D: random subsample to max_points | |
| n = lat2d.shape[0] | |
| if n > max_points: | |
| idx = np.linspace(0, n - 1, max_points, dtype=int) | |
| lat2d = lat2d[idx] | |
| lon2d = lon2d[idx] | |
| vals2d = vals2d[idx] | |
| return lat2d, lon2d, vals2d | |
| def _na_cache_path(stride: int) -> str: | |
| os.makedirs("data/cache", exist_ok=True) | |
| return os.path.join("data/cache", f"rrfs_na_3km_latlon_s{stride}.npz") | |
| def _grid_key_from_gid(gid) -> str: | |
| parts = [] | |
| for k in [ | |
| "gridType", | |
| "Nx", | |
| "Ny", | |
| "Ni", | |
| "Nj", | |
| "latitudeOfFirstGridPointInDegrees", | |
| "longitudeOfFirstGridPointInDegrees", | |
| "latitudeOfLastGridPointInDegrees", | |
| "longitudeOfLastGridPointInDegrees", | |
| "scanningMode", | |
| "iScansNegatively", | |
| "jScansPositively", | |
| "jPointsAreConsecutive", | |
| "LoV", | |
| "Latin1", | |
| "Latin2", | |
| "angleOfRotation", | |
| "latitudeOfSouthernPoleInDegrees", | |
| "longitudeOfSouthernPoleInDegrees", | |
| ]: | |
| try: | |
| v = codes_get(gid, k) | |
| parts.append(f"{k}={v}") | |
| except Exception: | |
| continue | |
| return "|".join(parts) | |
| def _get_or_build_na_latlon_cache(grib_path: str, stride: int, emit: Optional[callable] = None): | |
| cache_path = _na_cache_path(stride) | |
| meta = None | |
| if os.path.exists(cache_path): | |
| try: | |
| data = np.load(cache_path) | |
| lat2d = data["lat"] | |
| lon2d = data["lon"] | |
| meta = {k: data[k].item() if k in data else None for k in data.files if k not in ("lat", "lon")} | |
| if emit: | |
| emit(f"Loaded NA lat/lon cache: {cache_path} shape={lat2d.shape}") | |
| return lat2d, lon2d, meta or {} | |
| except Exception as e: | |
| if emit: | |
| emit(f"Failed to load cache ({e}); rebuilding") | |
| # Build from the provided GRIB | |
| with open(grib_path, "rb") as f: | |
| gid = codes_grib_new_from_file(f) | |
| if gid is None: | |
| raise RuntimeError("Empty GRIB slice for cache build") | |
| try: | |
| key = _grid_key_from_gid(gid) | |
| try: | |
| gt = codes_get(gid, "gridType") | |
| except Exception: | |
| gt = "?" | |
| nx = codes_get_long(gid, "Nx") or codes_get_long(gid, "Ni") | |
| ny = codes_get_long(gid, "Ny") or codes_get_long(gid, "Nj") | |
| if not nx or not ny: | |
| n = int(codes_get_size(gid, "values")) | |
| nx = int(np.sqrt(n)) | |
| ny = max(1, n // max(1, nx)) | |
| nx, ny = int(nx), int(ny) | |
| lats = np.array(codes_get_double_array(gid, "latitudes")) | |
| lons = np.array(codes_get_double_array(gid, "longitudes")) | |
| finally: | |
| codes_release(gid) | |
| # Reshape and decimate | |
| if nx * ny == lats.size == lons.size: | |
| lat2d = lats.reshape(ny, nx) | |
| lon2d = lons.reshape(ny, nx) | |
| else: | |
| raise RuntimeError("Unexpected lat/lon array sizes for NA grid") | |
| # Normalize lon to [-180, 180] | |
| lon2d = ((lon2d + 180.0) % 360.0) - 180.0 | |
| s = max(1, int(stride)) | |
| lat2d_d = lat2d[::s, ::s] | |
| lon2d_d = lon2d[::s, ::s] | |
| meta = {"nx": nx, "ny": ny, "stride": s, "grid_key": key} | |
| # Save cache | |
| try: | |
| np.savez_compressed(cache_path, lat=lat2d_d, lon=lon2d_d, **meta) | |
| if emit: | |
| emit(f"Built NA lat/lon cache: {cache_path} shape={lat2d_d.shape} stride={s}; gridType={gt}") | |
| except Exception as e: | |
| if emit: | |
| emit(f"Failed to save cache ({e})") | |
| return lat2d_d, lon2d_d, meta | |
| def build_ui(): | |
| with gr.Blocks(title="RRFS REFC Downloader (NOAA S3)") as demo: | |
| gr.Markdown(""" | |
| Downloads a current Rapid Refresh Forecast System (RRFS) GRIB2 file that contains REFC from NOAA’s official S3 (noaa-rrfs-pds). | |
| """) | |
| with gr.Row(): | |
| dom = gr.Dropdown(label="Domain", choices=["hi", "pr", "na"], value="na", info="NA uses accurate eccodes lat/lon arrays") | |
| fhr = gr.Dropdown(label="Forecast Hour", choices=[f"{i:03d}" for i in range(0, 10)], value="000") | |
| quality = gr.Dropdown(label="Quality", choices=["fast", "balanced", "high"], value="balanced", info="balanced=30s, high=45s (full res)") | |
| run = gr.Button("Fetch Latest RRFS REFC GRIB") | |
| status = gr.Textbox(label="Download Status", interactive=False) | |
| idx = gr.Textbox(label="REFC lines from .idx", lines=6, interactive=False) | |
| link = gr.Textbox(label="Source URL", interactive=False) | |
| leaflet = gr.HTML(label="Leaflet Map Overlay") | |
| debug = gr.Textbox(label="Debug Log", lines=12, interactive=False) | |
| run.click(fn=fetch_latest, inputs=[dom, fhr, quality], outputs=[status, idx, link, leaflet, debug]) | |
| return demo | |
| if __name__ == "__main__": | |
| app = build_ui() | |
| app.launch() | |