refs_take5 / download_latest_refc.py
nakas's picture
Add RRFS REFC downloader + Gradio app with Leaflet overlay (REFC), NOAA S3 source; ignore data/ and GRIB files
e63bfab
#!/usr/bin/env python3
import os
import sys
import time
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
import requests
S3_BUCKET = "https://noaa-rrfs-pds.s3.amazonaws.com"
PREFIX_ROOT = "rrfs_a"
def list_bucket(prefix: str):
params = {"delimiter": "/", "prefix": prefix}
r = requests.get(S3_BUCKET + "/", params=params, timeout=20)
r.raise_for_status()
return ET.fromstring(r.text)
def find_latest_cycle(day_ymd: str) -> str | None:
root = list_bucket(f"{PREFIX_ROOT}/rrfs.{day_ymd}/")
hours = []
for cp in root.findall("{http://s3.amazonaws.com/doc/2006-03-01/}CommonPrefixes"):
pref = cp.find("{http://s3.amazonaws.com/doc/2006-03-01/}Prefix").text
parts = pref.strip("/").split("/")
if len(parts) >= 3:
hh = parts[2]
if hh.isdigit() and len(hh) == 2:
hours.append(hh)
return max(hours) if hours else None
def list_prslev_keys(day_ymd: str, hh: str) -> list[str]:
# Returns keys like rrfs_a/rrfs.YYYYMMDD/HH/rrfs.tHHz.prslev.2p5km.fNNN.DOM.grib2(.idx)
root = list_bucket(f"{PREFIX_ROOT}/rrfs.{day_ymd}/{hh}/")
keys = []
for ct in root.findall("{http://s3.amazonaws.com/doc/2006-03-01/}Contents"):
key = ct.find("{http://s3.amazonaws.com/doc/2006-03-01/}Key").text
if "/rrfs.t" in key and ".prslev" in key and key.endswith(".grib2"):
keys.append(key)
return keys
def choose_smallest_refc_candidate(keys: list[str]) -> str | None:
# Prefer smaller domains to keep downloads reasonable (hi, pr), then others
domain_order = ["hi", "pr", "ak", "conus", "na"]
# Prefer f000 first
sorted_keys = sorted(keys, key=lambda k: ("f000" not in k, next((i for i, d in enumerate(domain_order) if f".{d}.grib2" in k), 99), k))
return sorted_keys[0] if sorted_keys else None
def ensure_refc_in_idx(grib_url: str) -> bool:
idx_url = grib_url + ".idx"
r = requests.get(idx_url, timeout=20)
if r.status_code != 200:
return False
return "REFC:" in r.text
def download(url: str, out_path: str):
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
def main():
day = datetime.now(timezone.utc).strftime("%Y%m%d")
latest = find_latest_cycle(day)
if latest is None:
print(f"No cycles found for {day} under {S3_BUCKET}/{PREFIX_ROOT}", file=sys.stderr)
sys.exit(2)
keys = list_prslev_keys(day, latest)
if not keys:
print(f"No prslev GRIB2 keys found for {day} {latest}Z", file=sys.stderr)
sys.exit(2)
candidate = choose_smallest_refc_candidate(keys)
if candidate is None:
print("No candidate GRIB2 key found", file=sys.stderr)
sys.exit(2)
grib_url = f"{S3_BUCKET}/{candidate}"
if not ensure_refc_in_idx(grib_url):
print("Chosen file does not contain REFC in index; aborting per requirements.", file=sys.stderr)
sys.exit(3)
os.makedirs("data", exist_ok=True)
out = os.path.join("data", os.path.basename(candidate))
print(f"Downloading: {grib_url}\n -> {out}")
t0 = time.time()
download(grib_url, out)
dt = time.time() - t0
size_mb = os.path.getsize(out) / (1024 * 1024)
print(f"Done: {size_mb:.1f} MiB in {dt:.1f}s")
# Save index for quick verification
idx_path = out + ".idx"
r = requests.get(grib_url + ".idx", timeout=20)
r.raise_for_status()
with open(idx_path, "wb") as f:
f.write(r.content)
# Echo REFC lines
lines = [ln for ln in r.text.splitlines() if "REFC:" in ln]
print("REFC index lines:")
for ln in lines[:5]:
print(ln)
if __name__ == "__main__":
main()