Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import folium | |
| from datetime import datetime, timedelta | |
| import tempfile | |
| import os | |
| from PIL import Image, ImageDraw, ImageFont | |
| import io | |
| import boto3 | |
| import botocore | |
| # Montana Mountain Peaks coordinates (for map and quick access buttons) | |
| MONTANA_PEAKS = { | |
| "Lone Peak (Big Sky)": (45.27806, -111.45028), | |
| "Sacajawea Peak": (45.89583, -110.96861), | |
| "Pioneer Mountain": (45.231835, -111.450505) | |
| } | |
| def get_nexrad_file(station, product="N0S", hours_back=6): | |
| """ | |
| Connects to the NOAA NEXRAD Level-II S3 bucket (noaa-nexrad-level2) and | |
| retrieves the latest file (within the past `hours_back` hours) for the given | |
| station and product code. The files are stored under the path: | |
| {station}/{YYYY}/{MM}/{DD}/ | |
| and have filenames like: | |
| {station}_{YYYYMMDD}_{HHMM}_{product}.gz | |
| Returns a tuple of (local_file_path, s3_key) or (None, "") if no file was found. | |
| """ | |
| s3 = boto3.client('s3') | |
| bucket = "noaa-nexrad-level2" | |
| now = datetime.utcnow() - timedelta(minutes=20) # allow for delay | |
| start_time = now - timedelta(hours=hours_back) | |
| files = [] | |
| # Check each hour in the time window | |
| for i in range(hours_back + 1): | |
| dt = start_time + timedelta(hours=i) | |
| prefix = f"{station}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{dt.strftime('%d')}/" | |
| try: | |
| resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) | |
| except botocore.exceptions.ClientError as e: | |
| continue | |
| if "Contents" in resp: | |
| for obj in resp["Contents"]: | |
| key = obj["Key"] | |
| # Look for the desired product code in the filename. | |
| # Typical filename: KMSX_20250221_1320_N0S.gz (or similar) | |
| if f"_{product}." in key: | |
| try: | |
| parts = key.split("_") | |
| if len(parts) >= 3: | |
| # Combine the date and time parts | |
| timestamp_str = parts[1] # YYYYMMDD | |
| time_str = parts[2] # HHMM (might include additional info if not split by underscore) | |
| # Remove any suffix from time_str (e.g. if it ends with extra letters) | |
| time_str = ''.join(filter(str.isdigit, time_str)) | |
| file_dt = datetime.strptime(timestamp_str + time_str, "%Y%m%d%H%M") | |
| if start_time <= file_dt <= now: | |
| files.append((file_dt, key)) | |
| except Exception as e: | |
| continue | |
| if not files: | |
| return None, "" | |
| # Sort descending by file timestamp and choose the latest file | |
| files.sort(key=lambda x: x[0], reverse=True) | |
| latest_file_key = files[0][1] | |
| # Download the file to a temporary location | |
| tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".gz") | |
| s3.download_file(bucket, latest_file_key, tmp_file.name) | |
| return tmp_file.name, latest_file_key | |
| def get_noaa_forecast(lat, lon): | |
| """Get NOAA text forecast using the points API.""" | |
| try: | |
| points_url = f"https://api.weather.gov/points/{lat},{lon}" | |
| response = requests.get(points_url, timeout=10) | |
| forecast_url = response.json()['properties']['forecast'] | |
| forecast = requests.get(forecast_url, timeout=10).json() | |
| text = "Weather Forecast:\n\n" | |
| for period in forecast['properties']['periods']: | |
| text += f"{period['name']}:\n" | |
| text += f"Temperature: {period['temperature']}°{period['temperatureUnit']}\n" | |
| text += f"Wind: {period['windSpeed']} {period['windDirection']}\n" | |
| text += f"{period['detailedForecast']}\n\n" | |
| if any(word in period['detailedForecast'].lower() for word in | |
| ['snow', 'flurries', 'wintry mix', 'blizzard']): | |
| text += "⚠️ SNOW EVENT PREDICTED ⚠️\n\n" | |
| return text | |
| except Exception as e: | |
| return f"Error getting forecast: {str(e)}" | |
| def get_forecast_products(lat, lon): | |
| """Download and process various forecast product images.""" | |
| gallery_data = [] | |
| timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC') | |
| products = [ | |
| ("MaxT1_conus.png", "Maximum Temperature"), | |
| ("MinT1_conus.png", "Minimum Temperature"), | |
| ("QPF06_conus.png", "6-Hour Precipitation"), | |
| ("QPF12_conus.png", "12-Hour Precipitation"), | |
| ("QPF24_conus.png", "24-Hour Precipitation"), | |
| ("Snow1_conus.png", "Snowfall Amount"), | |
| ("Snow2_conus.png", "Snowfall Day 2"), | |
| ("Wx1_conus.png", "Weather Type"), | |
| ] | |
| base_url = "https://graphical.weather.gov/images/conus" | |
| for filename, title in products: | |
| try: | |
| url = f"{base_url}/{filename}" | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| img = Image.open(io.BytesIO(response.content)).convert('RGB') | |
| img = crop_to_region(img, lat, lon) | |
| draw = ImageDraw.Draw(img) | |
| text = f"{title}\n{timestamp}" | |
| draw_text_with_outline(draw, text, (10, 10)) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp: | |
| img.save(tmp.name) | |
| gallery_data.append(tmp.name) | |
| except Exception as e: | |
| continue | |
| return gallery_data | |
| def crop_to_region(img, lat, lon, zoom=1.5): | |
| """Crop image to focus on selected region.""" | |
| img_width, img_height = img.size | |
| lat_min, lat_max = 25.0, 50.0 | |
| lon_min, lon_max = -125.0, -65.0 | |
| x = (lon - lon_min) / (lon_max - lon_min) * img_width | |
| y = (lat_max - lat) / (lat_max - lat_min) * img_height | |
| crop_width = img_width / zoom | |
| crop_height = img_height / zoom | |
| x1 = max(0, x - crop_width / 2) | |
| y1 = max(0, y - crop_height / 2) | |
| x2 = min(img_width, x + crop_width / 2) | |
| y2 = min(img_height, y + crop_height / 2) | |
| if x1 < 0: | |
| x2 -= x1 | |
| x1 = 0 | |
| if y1 < 0: | |
| y2 -= y1 | |
| y1 = 0 | |
| if x2 > img_width: | |
| x1 -= (x2 - img_width) | |
| x2 = img_width | |
| if y2 > img_height: | |
| y1 -= (y2 - img_height) | |
| y2 = img_height | |
| cropped = img.crop((x1, y1, x2, y2)) | |
| return cropped.resize((img_width, img_height), Image.Resampling.LANCZOS) | |
| def draw_text_with_outline(draw, text, pos, font_size=20, center=False): | |
| """Draw text with an outline for better visibility.""" | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size) | |
| except: | |
| font = ImageFont.load_default() | |
| x, y = pos | |
| if center: | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| x = x - text_width // 2 | |
| y = y - text_height // 2 | |
| for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: | |
| draw.text((x + dx, y + dy), text, fill='black', font=font) | |
| draw.text((x, y), text, fill='white', font=font) | |
| def get_map(lat, lon): | |
| """Create a folium map centered on the coordinates with markers for Montana peaks.""" | |
| m = folium.Map(location=[lat, lon], zoom_start=9) | |
| for peak_name, coords in MONTANA_PEAKS.items(): | |
| folium.Marker( | |
| coords, | |
| popup=f"{peak_name}<br>Lat: {coords[0]:.4f}, Lon: {coords[1]:.4f}", | |
| tooltip=peak_name | |
| ).add_to(m) | |
| if (lat, lon) not in MONTANA_PEAKS.values(): | |
| folium.Marker([lat, lon], popup=f"Selected Location<br>Lat: {lat:.4f}, Lon: {lon:.4f}").add_to(m) | |
| m.add_child(folium.ClickForLatLng()) | |
| return m._repr_html_() | |
| def update_weather(lat, lon, station, product): | |
| """Update weather info and retrieve raw radar data from AWS.""" | |
| try: | |
| lat = float(lat) | |
| lon = float(lon) | |
| if not (-90 <= lat <= 90 and -180 <= lon <= 180): | |
| return "Invalid coordinates", [], "No radar data", get_map(45.5, -111.0), "" | |
| forecast_text = get_noaa_forecast(lat, lon) | |
| # Retrieve raw radar data file from AWS (returns local file path and S3 key) | |
| radar_file_path, radar_key = get_nexrad_file(station, product, hours_back=6) | |
| # Get forecast product images | |
| forecast_frames = get_forecast_products(lat, lon) | |
| gallery_data = forecast_frames | |
| map_html = get_map(lat, lon) | |
| return forecast_text, gallery_data, radar_file_path, map_html, radar_key | |
| except Exception as e: | |
| return f"Error: {str(e)}", [], "No radar data", get_map(45.5, -111.0), "" | |
| with gr.Blocks(title="Montana Mountain Weather") as demo: | |
| gr.Markdown("# Montana Mountain Weather") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| lat_input = gr.Number(label="Latitude", value=45.5, minimum=-90, maximum=90) | |
| lon_input = gr.Number(label="Longitude", value=-111.0, minimum=-180, maximum=180) | |
| station_input = gr.Textbox(label="Radar Station ID", value="KMSX") | |
| product_input = gr.Textbox(label="Radar Product Code", value="N0S") | |
| gr.Markdown("### Quick Access - Montana Peaks") | |
| peak_buttons = [] | |
| for peak_name in MONTANA_PEAKS: | |
| peak_buttons.append(gr.Button(f"📍 {peak_name}")) | |
| submit_btn = gr.Button("Get Weather", variant="primary") | |
| with gr.Column(scale=2): | |
| map_display = gr.HTML(get_map(45.5, -111.0)) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| forecast_output = gr.Textbox(label="Weather Forecast", lines=12, | |
| placeholder="Select a location to see the forecast...") | |
| with gr.Column(scale=2): | |
| radar_data_output = gr.Textbox(label="Raw Radar Data File Path", | |
| placeholder="Radar data file path will appear here") | |
| with gr.Row(): | |
| forecast_gallery = gr.Gallery(label="Forecast Products", show_label=True, | |
| columns=4, height=600, object_fit="contain") | |
| radar_key_output = gr.Textbox(label="S3 Key for Radar Data", | |
| placeholder="S3 key will appear here") | |
| submit_btn.click( | |
| fn=update_weather, | |
| inputs=[lat_input, lon_input, station_input, product_input], | |
| outputs=[forecast_output, forecast_gallery, radar_data_output, map_display, radar_key_output] | |
| ) | |
| for i, peak_name in enumerate(MONTANA_PEAKS.keys()): | |
| peak_buttons[i].click( | |
| fn=lambda name=peak_name: MONTANA_PEAKS[name], | |
| inputs=[], | |
| outputs=[lat_input, lon_input] | |
| ).then( | |
| fn=update_weather, | |
| inputs=[lat_input, lon_input, station_input, product_input], | |
| outputs=[forecast_output, forecast_gallery, radar_data_output, map_display, radar_key_output] | |
| ) | |
| gr.Markdown(""" | |
| ## Instructions | |
| 1. Use the quick access buttons to check specific Montana peaks. | |
| 2. Or enter coordinates manually (or click on the map). | |
| 3. Enter the Radar Station ID (e.g., KMSX) and Radar Product Code (e.g., N0S). | |
| 4. Click "Get Weather" to see the forecast, forecast product images, and to download the latest raw radar data file from NOAA’s AWS bucket. | |
| **Montana Peaks Included:** | |
| - Lone Peak (Big Sky): 45°16′41″N 111°27′01″W | |
| - Sacajawea Peak: 45°53′45″N 110°58′7″W | |
| - Pioneer Mountain: 45°13′55″N 111°27′2″W | |
| **Radar Data:** | |
| - This app now retrieves raw NEXRAD Level‑II data (e.g. the “N0S” product) from NOAA’s AWS S3 bucket. | |
| - You can process this raw file with external tools (like Py‑ART) to generate images. | |
| **Forecast Products:** | |
| - Temperature (Max/Min) | |
| - Precipitation (6/12/24 Hour) | |
| - Snowfall Amount | |
| - Weather Type | |
| **Note:** NOAA’s raw radar data is available via AWS and covers nearly all U.S. radars. For global coverage, you may need to explore additional sources. | |
| """) | |
| demo.queue().launch() |