Spaces:
Sleeping
Sleeping
| import ee | |
| import os | |
| import json | |
| from google.oauth2 import service_account | |
| from typing import Dict | |
| from datetime import datetime, timedelta | |
| from geopy.distance import geodesic | |
| import requests | |
| import pandas as pd | |
| import folium | |
| import requests | |
| from langchain_core.tools import tool | |
| from langgraph.prebuilt import create_react_agent | |
| from typing_extensions import TypedDict, Literal, Annotated | |
| from typing import Optional | |
| from collections import Counter | |
| from langgraph.graph import StateGraph, END | |
| from typing import TypedDict, Annotated | |
| import operator | |
| from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage, AIMessage | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain.chat_models import init_chat_model | |
| from langchain.schema import HumanMessage | |
| from langchain.tools import tool | |
| import gradio as gr | |
| # Load JSON string from Hugging Face secret (as env variable) | |
| SERVICE_KEY_JSON = os.environ.get("GEE_SERVICE_KEY") | |
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
| FIRMS_API_KEY = os.getenv("FIRMS_API_KEY") | |
| OPENCAGE_API_KEY = os.getenv("OPENCAGE_API_KEY") | |
| if SERVICE_KEY_JSON is None: | |
| raise RuntimeError("Missing Earth Engine service account key in environment.") | |
| # Parse the JSON and create credentials | |
| SERVICE_KEY_DICT = json.loads(SERVICE_KEY_JSON) | |
| credentials = service_account.Credentials.from_service_account_info( | |
| SERVICE_KEY_DICT, | |
| scopes=['https://www.googleapis.com/auth/cloud-platform'] | |
| ) | |
| # Initialize EE | |
| ee.Initialize(credentials=credentials) | |
| def get_fire_risk_map(place: str, opencage_key: str, firms_key: str, min_brightness: int = 300, min_confidence: int = 60) -> Optional[str]: | |
| """ | |
| Returns an HTML string of a folium map showing fire locations, nearest water bodies, and fire stations. | |
| Args: | |
| place: Name of the place to fetch the bounding box for. | |
| opencage_key: opencage API key. | |
| firms_key: FIRMS API key. | |
| min_brightness (int, optional): Minimum fire brightness to filter on (e.g., 300). Defaults to 300. | |
| min_confidence (int, optional): Minimum confidence level (0-100) to filter fire data. Defaults to 60. | |
| Returns: | |
| Returns an HTML string of a folium map showing fire locations, nearest water bodies, and fire stations. | |
| """ | |
| try: | |
| opencage_key = os.environ["OPENCAGE_API_KEY"] | |
| firms_key = os.environ["FIRMS_API_KEY"] | |
| # Step 1: Get bounding box from OpenCage | |
| url = f"https://api.opencagedata.com/geocode/v1/json?q={place}&key={opencage_key}" | |
| resp = requests.get(url) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| bounds = data['results'][0]['bounds'] | |
| bbox = { | |
| 'north': bounds['northeast']['lat'], | |
| 'south': bounds['southwest']['lat'], | |
| 'east': bounds['northeast']['lng'], | |
| 'west': bounds['southwest']['lng'] | |
| } | |
| bbox_str = f"{bbox['west']},{bbox['south']},{bbox['east']},{bbox['north']}" | |
| # Step 2: Get FIRMS fire data | |
| source = 'MODIS_NRT' | |
| url = f'https://firms.modaps.eosdis.nasa.gov/api/area/csv/{firms_key}/{source}/{bbox_str}/3' | |
| df = pd.read_csv(url) | |
| now = datetime.utcnow() | |
| df['acq_datetime'] = pd.to_datetime( | |
| df['acq_date'] + ' ' + df['acq_time'].astype(str).str.zfill(4), | |
| format="%Y-%m-%d %H%M" | |
| ) | |
| df_fires_filtered = df[ | |
| (df['brightness'] >= min_brightness) & | |
| (df['confidence'] > min_confidence) & | |
| (df['acq_datetime'] > (now - timedelta(hours=24))) | |
| ].dropna(subset=['latitude', 'longitude']).reset_index(drop=True) | |
| if df_fires_filtered.empty: | |
| return "<b>No significant fire activity detected in the past 24 hours for the specified location.</b>" | |
| # Step 3: Water bodies (Earth Engine) | |
| west, south, east, north = map(float, bbox_str.split(',')) | |
| geom = ee.Geometry.BBox(west, south, east, north) | |
| water = ee.Image('JRC/GSW1_4/GlobalSurfaceWater').select('occurrence') | |
| water_mask = water.gt(50).selfMask() | |
| water_clipped = water_mask.clip(geom) | |
| sampled_points = water_clipped.stratifiedSample( | |
| numPoints=500, | |
| classBand='occurrence', | |
| region=geom, | |
| scale=500, | |
| geometries=True, | |
| seed=42 | |
| ).getInfo() | |
| water_points = [ | |
| { | |
| 'latitude': f['geometry']['coordinates'][1], | |
| 'longitude': f['geometry']['coordinates'][0] | |
| } | |
| for f in sampled_points['features'] | |
| ] | |
| df_water = pd.DataFrame(water_points).dropna().reset_index(drop=True) | |
| # Step 4: Compute nearest water body | |
| def get_nearest_water(fire_lat, fire_lon): | |
| fire_point = (fire_lat, fire_lon) | |
| min_dist = float('inf') | |
| nearest = None | |
| for _, row in df_water.iterrows(): | |
| dist = geodesic(fire_point, (row['latitude'], row['longitude'])).km | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest = row | |
| return pd.Series({ | |
| 'nearest_water_lat': nearest['latitude'], | |
| 'nearest_water_lon': nearest['longitude'], | |
| 'distance_km': min_dist | |
| }) | |
| nearest_water = df_fires_filtered.apply( | |
| lambda r: get_nearest_water(r['latitude'], r['longitude']), axis=1) | |
| # Step 5: Fire stations using Overpass API | |
| overpass_query = f""" | |
| [out:json][timeout:25]; | |
| ( | |
| node["amenity"="fire_station"]({south},{west},{north},{east}); | |
| ); | |
| out body; | |
| """ | |
| res = requests.get("http://overpass-api.de/api/interpreter", params={"data": overpass_query}) | |
| data = res.json() | |
| stations = [ | |
| { | |
| 'name': e.get('tags', {}).get('name', 'Unnamed Station'), | |
| 'lat': e['lat'], | |
| 'lon': e['lon'] | |
| } | |
| for e in data.get('elements', []) if 'lat' in e and 'lon' in e | |
| ] | |
| df_stations = pd.DataFrame(stations).dropna() | |
| def get_nearest_station(fire_lat, fire_lon): | |
| fire_point = (fire_lat, fire_lon) | |
| min_dist = float('inf') | |
| nearest = None | |
| for _, row in df_stations.iterrows(): | |
| dist = geodesic(fire_point, (row['lat'], row['lon'])).km | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest = row | |
| return pd.Series({ | |
| 'nearest_station_name': nearest['name'], | |
| 'nearest_station_lat': nearest['lat'], | |
| 'nearest_station_lon': nearest['lon'], | |
| 'station_distance_km': min_dist | |
| }) | |
| nearest_station = df_fires_filtered.apply( | |
| lambda r: get_nearest_station(r['latitude'], r['longitude']), axis=1) | |
| # Final dataframe | |
| df_final = pd.concat([df_fires_filtered, nearest_water, nearest_station], axis=1) | |
| # Step 6: Create a folium map | |
| center_lat = (bbox["north"] + bbox["south"]) / 2 | |
| center_lon = (bbox["east"] + bbox["west"]) / 2 | |
| m = folium.Map(location=[center_lat, center_lon], zoom_start=6) | |
| for _, row in df_final.iterrows(): | |
| fire_loc = [row['latitude'], row['longitude']] | |
| water_loc = [row['nearest_water_lat'], row['nearest_water_lon']] | |
| station_loc = [row['nearest_station_lat'], row['nearest_station_lon']] | |
| # ๐ฅ 1. Uncertainty Zone (large faint circle) FIRST | |
| folium.Circle( | |
| location=fire_loc, | |
| radius=500, # 3 km | |
| color='orange', | |
| fill=True, | |
| fill_opacity=0.2, | |
| popup="โ ๏ธ Possible spread zone (~3 km radius)", | |
| ).add_to(m) | |
| # ๐ด 2. Fire point (small red circle) SECOND | |
| folium.CircleMarker( | |
| location=fire_loc, | |
| radius=6, | |
| color='red', | |
| fill=True, | |
| fill_color='red', | |
| popup=f"๐ฅ Brightness: {row['brightness']}, Confidence: {row['confidence']}, DateTime: {row['acq_datetime']} Latitude: {row['latitude']:.3f}, Longitude: {row['longitude']:.3f}", | |
| ).add_to(m) | |
| # ๐ง 3. Nearest water point | |
| folium.Marker( | |
| location=water_loc, | |
| icon=folium.Icon(color='blue', icon='tint', prefix='fa'), | |
| popup=f"๐ง Nearest Water\nDistance: {row['distance_km']:.2f} km\nLat: {row['nearest_water_lat']:.3f}\nLon: {row['nearest_water_lon']:.3f}", | |
| ).add_to(m) | |
| # ๐ข 4. Line between fire and water | |
| folium.PolyLine( | |
| locations=[fire_loc, water_loc], | |
| color='green', | |
| weight=2, | |
| ).add_to(m) | |
| # ๐ 5. Nearest fire station (NEW) | |
| folium.Marker( | |
| location=station_loc, | |
| icon=folium.Icon(color='darkred', icon='fire-extinguisher', prefix='fa'), | |
| popup=f"๐ Nearest Fire Station\nDistance: {row['station_distance_km']:.2f} km\nLat: {row['nearest_station_lat']:.3f}\nLon: {row['nearest_station_lon']:.3f}", | |
| ).add_to(m) | |
| # ๐งฏ 6. Line between fire and station (NEW) | |
| folium.PolyLine( | |
| locations=[fire_loc, station_loc], | |
| color='purple', | |
| weight=2, | |
| dash_array='5,10' # dashed line | |
| ).add_to(m) | |
| return m._repr_html_() | |
| except Exception as e: | |
| return f"<b>Error generating fire risk map:</b> {str(e)}" | |
| class AgentState(TypedDict): | |
| messages: Annotated[list[AnyMessage], operator.add] | |
| class Agent: | |
| def __init__(self, model, tools, system=""): | |
| self.system = system | |
| graph = StateGraph(AgentState) | |
| graph.add_node("llm", self.call_mistral_ai) | |
| graph.add_node("action", self.take_action) | |
| graph.add_node("final", self.final_answer) | |
| graph.add_conditional_edges( | |
| "llm", | |
| self.exists_action, | |
| {True: "action", False: END} | |
| ) | |
| graph.add_edge("action", "final") # ๐ | |
| graph.add_edge("final", END) # ๐ | |
| graph.set_entry_point("llm") | |
| self.graph = graph.compile() | |
| self.tools = {t.name: t for t in tools} | |
| self.model = model.bind_tools(tools) | |
| def exists_action(self, state: AgentState): | |
| result = state['messages'][-1] | |
| return len(result.tool_calls) > 0 | |
| def call_mistral_ai(self, state: AgentState): | |
| messages = state['messages'] | |
| if self.system: | |
| messages = [SystemMessage(content=self.system)] + messages | |
| message = self.model.invoke(messages) | |
| return {'messages': [message]} | |
| def take_action(self, state: AgentState): | |
| tool_calls = state['messages'][-1].tool_calls | |
| results = [] | |
| for t in tool_calls: | |
| print(f"Calling: {t}") | |
| if not t['name'] in self.tools: # check for bad tool name from LLM | |
| print("\n ....bad tool name....") | |
| result = "bad tool name, retry" # instruct LLM to retry if bad | |
| else: | |
| result = self.tools[t['name']].invoke(t['args']) | |
| results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result))) | |
| return {'messages': results} | |
| def final_answer(self, state: AgentState): | |
| """Return the final tool output cleanly.""" | |
| return {"messages": [AIMessage(content=state['messages'][-1].content.strip())]} | |
| prompt = """ You are a Wildfire Detection Assistant. | |
| When users ask about fire spots, wildfires, or fire mapping in any region, | |
| use the firespot_summary function to analyze the area and generate a fire map or status update. | |
| """ | |
| model = init_chat_model("mistral-large-latest", model_provider="mistralai") | |
| abot = Agent(model, [get_fire_risk_map], system=prompt) | |
| def process_prompt(prompt): | |
| messages = [HumanMessage(content=prompt)] | |
| result = abot.graph.invoke({"messages": messages}) | |
| html_map = result['messages'][-1].content # assuming tool returns HTML string | |
| return html_map | |
| # Clear function | |
| def clear_all(): | |
| return "", "" # Reset both prompt and HTML output | |
| # Build Gradio UI | |
| with gr.Blocks(title="FireLink - Wildfire Intelligence Tool") as demo: | |
| gr.Markdown("## ๐ฅ FireLink") | |
| gr.Markdown("**Visualize recent wildfire activity and nearby response resourcesโpowered by satellite data, water occurrence maps, and open geospatial infrastructure.**" | |
| ) | |
| gr.Markdown("**Ask about fire risks in a region and view the results as an interactive map.**") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ๐ Input Prompt") | |
| prompt_box = gr.Textbox( | |
| label="Enter your request", | |
| placeholder="e.g., Show fires in California with brightness > 300 and confidence > 80", | |
| lines=4 | |
| ) | |
| submit_btn = gr.Button("Generate Map") | |
| clear_btn = gr.Button("Clear", variant="secondary") # Added Clear button | |
| with gr.Column(scale=2): | |
| gr.Markdown("### ๐บ๏ธ Fire Risk Map") | |
| result_html = gr.HTML(label="Map Output") | |
| # Button behavior | |
| submit_btn.click(fn=process_prompt, inputs=prompt_box, outputs=result_html) | |
| clear_btn.click(fn=clear_all, outputs=[prompt_box, result_html]) # Hook clear button | |
| with gr.Accordion("โน๏ธ Notes & Disclaimers (click here)", open=False): | |
| gr.Markdown(""" | |
| - ๐ฅ **Fire data** is sourced from NASA FIRMS and represents **near real-time satellite detections** from MODIS and VIIRS sensors. | |
| - There may be a **delay of up to 3 hours** depending on satellite pass and processing time. | |
| - ๐ง **Water bodies** are sampled from the **JRC Global Surface Water dataset**, and represent **historically persistent** water locations (occurrence > 50%). | |
| - This is **not guaranteed to reflect current water availability** or seasonal changes. | |
| - ๐ **Fire station locations** are retrieved from **OpenStreetMap** via Overpass API and may vary in completeness or accuracy. | |
| - ๐ Map pins and routes are intended to **assist awareness**, not for operational or emergency decision-making. | |
| - ๐ Data is retrieved live; occasional delays or errors may occur if external APIs (e.g., FIRMS, OpenCage, Overpass) are temporarily unavailable. | |
| """) | |
| # Launch the app | |
| demo.launch(share = True) |