Spaces:
Sleeping
Sleeping
import ee | |
import os | |
import json | |
from google.oauth2 import service_account | |
from typing import Dict | |
from datetime import datetime, timedelta | |
from geopy.distance import geodesic | |
import requests | |
import pandas as pd | |
import folium | |
import requests | |
from langchain_core.tools import tool | |
from langgraph.prebuilt import create_react_agent | |
from typing_extensions import TypedDict, Literal, Annotated | |
from typing import Optional | |
from collections import Counter | |
from langgraph.graph import StateGraph, END | |
from typing import TypedDict, Annotated | |
import operator | |
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage, AIMessage | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain.chat_models import init_chat_model | |
from langchain.schema import HumanMessage | |
from langchain.tools import tool | |
import gradio as gr | |
# Load JSON string from Hugging Face secret (as env variable) | |
SERVICE_KEY_JSON = os.environ.get("GEE_SERVICE_KEY") | |
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
FIRMS_API_KEY = os.getenv("FIRMS_API_KEY") | |
OPENCAGE_API_KEY = os.getenv("OPENCAGE_API_KEY") | |
if SERVICE_KEY_JSON is None: | |
raise RuntimeError("Missing Earth Engine service account key in environment.") | |
# Parse the JSON and create credentials | |
SERVICE_KEY_DICT = json.loads(SERVICE_KEY_JSON) | |
credentials = service_account.Credentials.from_service_account_info( | |
SERVICE_KEY_DICT, | |
scopes=['https://www.googleapis.com/auth/cloud-platform'] | |
) | |
# Initialize EE | |
ee.Initialize(credentials=credentials) | |
def get_fire_risk_map(place: str, opencage_key: str, firms_key: str, min_brightness: int = 300, min_confidence: int = 60) -> Optional[str]: | |
""" | |
Returns an HTML string of a folium map showing fire locations, nearest water bodies, and fire stations. | |
Args: | |
place: Name of the place to fetch the bounding box for. | |
opencage_key: opencage API key. | |
firms_key: FIRMS API key. | |
min_brightness (int, optional): Minimum fire brightness to filter on (e.g., 300). Defaults to 300. | |
min_confidence (int, optional): Minimum confidence level (0-100) to filter fire data. Defaults to 60. | |
Returns: | |
Returns an HTML string of a folium map showing fire locations, nearest water bodies, and fire stations. | |
""" | |
try: | |
opencage_key = os.environ["OPENCAGE_API_KEY"] | |
firms_key = os.environ["FIRMS_API_KEY"] | |
# Step 1: Get bounding box from OpenCage | |
url = f"https://api.opencagedata.com/geocode/v1/json?q={place}&key={opencage_key}" | |
resp = requests.get(url) | |
resp.raise_for_status() | |
data = resp.json() | |
bounds = data['results'][0]['bounds'] | |
bbox = { | |
'north': bounds['northeast']['lat'], | |
'south': bounds['southwest']['lat'], | |
'east': bounds['northeast']['lng'], | |
'west': bounds['southwest']['lng'] | |
} | |
bbox_str = f"{bbox['west']},{bbox['south']},{bbox['east']},{bbox['north']}" | |
# Step 2: Get FIRMS fire data | |
source = 'MODIS_NRT' | |
url = f'https://firms.modaps.eosdis.nasa.gov/api/area/csv/{firms_key}/{source}/{bbox_str}/3' | |
df = pd.read_csv(url) | |
now = datetime.utcnow() | |
df['acq_datetime'] = pd.to_datetime( | |
df['acq_date'] + ' ' + df['acq_time'].astype(str).str.zfill(4), | |
format="%Y-%m-%d %H%M" | |
) | |
df_fires_filtered = df[ | |
(df['brightness'] >= min_brightness) & | |
(df['confidence'] > min_confidence) & | |
(df['acq_datetime'] > (now - timedelta(hours=24))) | |
].dropna(subset=['latitude', 'longitude']).reset_index(drop=True) | |
if df_fires_filtered.empty: | |
return "<b>No significant fire activity detected in the past 24 hours for the specified location.</b>" | |
# Step 3: Water bodies (Earth Engine) | |
west, south, east, north = map(float, bbox_str.split(',')) | |
geom = ee.Geometry.BBox(west, south, east, north) | |
water = ee.Image('JRC/GSW1_4/GlobalSurfaceWater').select('occurrence') | |
water_mask = water.gt(50).selfMask() | |
water_clipped = water_mask.clip(geom) | |
sampled_points = water_clipped.stratifiedSample( | |
numPoints=500, | |
classBand='occurrence', | |
region=geom, | |
scale=500, | |
geometries=True, | |
seed=42 | |
).getInfo() | |
water_points = [ | |
{ | |
'latitude': f['geometry']['coordinates'][1], | |
'longitude': f['geometry']['coordinates'][0] | |
} | |
for f in sampled_points['features'] | |
] | |
df_water = pd.DataFrame(water_points).dropna().reset_index(drop=True) | |
# Step 4: Compute nearest water body | |
def get_nearest_water(fire_lat, fire_lon): | |
fire_point = (fire_lat, fire_lon) | |
min_dist = float('inf') | |
nearest = None | |
for _, row in df_water.iterrows(): | |
dist = geodesic(fire_point, (row['latitude'], row['longitude'])).km | |
if dist < min_dist: | |
min_dist = dist | |
nearest = row | |
return pd.Series({ | |
'nearest_water_lat': nearest['latitude'], | |
'nearest_water_lon': nearest['longitude'], | |
'distance_km': min_dist | |
}) | |
nearest_water = df_fires_filtered.apply( | |
lambda r: get_nearest_water(r['latitude'], r['longitude']), axis=1) | |
# Step 5: Fire stations using Overpass API | |
overpass_query = f""" | |
[out:json][timeout:25]; | |
( | |
node["amenity"="fire_station"]({south},{west},{north},{east}); | |
); | |
out body; | |
""" | |
res = requests.get("http://overpass-api.de/api/interpreter", params={"data": overpass_query}) | |
data = res.json() | |
stations = [ | |
{ | |
'name': e.get('tags', {}).get('name', 'Unnamed Station'), | |
'lat': e['lat'], | |
'lon': e['lon'] | |
} | |
for e in data.get('elements', []) if 'lat' in e and 'lon' in e | |
] | |
df_stations = pd.DataFrame(stations).dropna() | |
def get_nearest_station(fire_lat, fire_lon): | |
fire_point = (fire_lat, fire_lon) | |
min_dist = float('inf') | |
nearest = None | |
for _, row in df_stations.iterrows(): | |
dist = geodesic(fire_point, (row['lat'], row['lon'])).km | |
if dist < min_dist: | |
min_dist = dist | |
nearest = row | |
return pd.Series({ | |
'nearest_station_name': nearest['name'], | |
'nearest_station_lat': nearest['lat'], | |
'nearest_station_lon': nearest['lon'], | |
'station_distance_km': min_dist | |
}) | |
nearest_station = df_fires_filtered.apply( | |
lambda r: get_nearest_station(r['latitude'], r['longitude']), axis=1) | |
# Final dataframe | |
df_final = pd.concat([df_fires_filtered, nearest_water, nearest_station], axis=1) | |
# Step 6: Create a folium map | |
center_lat = (bbox["north"] + bbox["south"]) / 2 | |
center_lon = (bbox["east"] + bbox["west"]) / 2 | |
m = folium.Map(location=[center_lat, center_lon], zoom_start=6) | |
for _, row in df_final.iterrows(): | |
fire_loc = [row['latitude'], row['longitude']] | |
water_loc = [row['nearest_water_lat'], row['nearest_water_lon']] | |
station_loc = [row['nearest_station_lat'], row['nearest_station_lon']] | |
# ๐ฅ 1. Uncertainty Zone (large faint circle) FIRST | |
folium.Circle( | |
location=fire_loc, | |
radius=500, # 3 km | |
color='orange', | |
fill=True, | |
fill_opacity=0.2, | |
popup="โ ๏ธ Possible spread zone (~3 km radius)", | |
).add_to(m) | |
# ๐ด 2. Fire point (small red circle) SECOND | |
folium.CircleMarker( | |
location=fire_loc, | |
radius=6, | |
color='red', | |
fill=True, | |
fill_color='red', | |
popup=f"๐ฅ Brightness: {row['brightness']}, Confidence: {row['confidence']}, DateTime: {row['acq_datetime']} Latitude: {row['latitude']:.3f}, Longitude: {row['longitude']:.3f}", | |
).add_to(m) | |
# ๐ง 3. Nearest water point | |
folium.Marker( | |
location=water_loc, | |
icon=folium.Icon(color='blue', icon='tint', prefix='fa'), | |
popup=f"๐ง Nearest Water\nDistance: {row['distance_km']:.2f} km\nLat: {row['nearest_water_lat']:.3f}\nLon: {row['nearest_water_lon']:.3f}", | |
).add_to(m) | |
# ๐ข 4. Line between fire and water | |
folium.PolyLine( | |
locations=[fire_loc, water_loc], | |
color='green', | |
weight=2, | |
).add_to(m) | |
# ๐ 5. Nearest fire station (NEW) | |
folium.Marker( | |
location=station_loc, | |
icon=folium.Icon(color='darkred', icon='fire-extinguisher', prefix='fa'), | |
popup=f"๐ Nearest Fire Station\nDistance: {row['station_distance_km']:.2f} km\nLat: {row['nearest_station_lat']:.3f}\nLon: {row['nearest_station_lon']:.3f}", | |
).add_to(m) | |
# ๐งฏ 6. Line between fire and station (NEW) | |
folium.PolyLine( | |
locations=[fire_loc, station_loc], | |
color='purple', | |
weight=2, | |
dash_array='5,10' # dashed line | |
).add_to(m) | |
return m._repr_html_() | |
except Exception as e: | |
return f"<b>Error generating fire risk map:</b> {str(e)}" | |
class AgentState(TypedDict): | |
messages: Annotated[list[AnyMessage], operator.add] | |
class Agent: | |
def __init__(self, model, tools, system=""): | |
self.system = system | |
graph = StateGraph(AgentState) | |
graph.add_node("llm", self.call_mistral_ai) | |
graph.add_node("action", self.take_action) | |
graph.add_node("final", self.final_answer) | |
graph.add_conditional_edges( | |
"llm", | |
self.exists_action, | |
{True: "action", False: END} | |
) | |
graph.add_edge("action", "final") # ๐ | |
graph.add_edge("final", END) # ๐ | |
graph.set_entry_point("llm") | |
self.graph = graph.compile() | |
self.tools = {t.name: t for t in tools} | |
self.model = model.bind_tools(tools) | |
def exists_action(self, state: AgentState): | |
result = state['messages'][-1] | |
return len(result.tool_calls) > 0 | |
def call_mistral_ai(self, state: AgentState): | |
messages = state['messages'] | |
if self.system: | |
messages = [SystemMessage(content=self.system)] + messages | |
message = self.model.invoke(messages) | |
return {'messages': [message]} | |
def take_action(self, state: AgentState): | |
tool_calls = state['messages'][-1].tool_calls | |
results = [] | |
for t in tool_calls: | |
print(f"Calling: {t}") | |
if not t['name'] in self.tools: # check for bad tool name from LLM | |
print("\n ....bad tool name....") | |
result = "bad tool name, retry" # instruct LLM to retry if bad | |
else: | |
result = self.tools[t['name']].invoke(t['args']) | |
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result))) | |
return {'messages': results} | |
def final_answer(self, state: AgentState): | |
"""Return the final tool output cleanly.""" | |
return {"messages": [AIMessage(content=state['messages'][-1].content.strip())]} | |
prompt = """ You are a Wildfire Detection Assistant. | |
When users ask about fire spots, wildfires, or fire mapping in any region, | |
use the firespot_summary function to analyze the area and generate a fire map or status update. | |
""" | |
model = init_chat_model("mistral-large-latest", model_provider="mistralai") | |
abot = Agent(model, [get_fire_risk_map], system=prompt) | |
def process_prompt(prompt): | |
messages = [HumanMessage(content=prompt)] | |
result = abot.graph.invoke({"messages": messages}) | |
html_map = result['messages'][-1].content # assuming tool returns HTML string | |
return html_map | |
# Clear function | |
def clear_all(): | |
return "", "" # Reset both prompt and HTML output | |
# Build Gradio UI | |
with gr.Blocks(title="FireLink - Wildfire Intelligence Tool") as demo: | |
gr.Markdown("## ๐ฅ FireLink") | |
gr.Markdown("**Visualize recent wildfire activity and nearby response resourcesโpowered by satellite data, water occurrence maps, and open geospatial infrastructure.**" | |
) | |
gr.Markdown("**Ask about fire risks in a region and view the results as an interactive map.**") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### ๐ Input Prompt") | |
prompt_box = gr.Textbox( | |
label="Enter your request", | |
placeholder="e.g., Show fires in California with brightness > 300 and confidence > 80", | |
lines=4 | |
) | |
submit_btn = gr.Button("Generate Map") | |
clear_btn = gr.Button("Clear", variant="secondary") # Added Clear button | |
with gr.Column(scale=2): | |
gr.Markdown("### ๐บ๏ธ Fire Risk Map") | |
result_html = gr.HTML(label="Map Output") | |
# Button behavior | |
submit_btn.click(fn=process_prompt, inputs=prompt_box, outputs=result_html) | |
clear_btn.click(fn=clear_all, outputs=[prompt_box, result_html]) # Hook clear button | |
with gr.Accordion("โน๏ธ Notes & Disclaimers (click here)", open=False): | |
gr.Markdown(""" | |
- ๐ฅ **Fire data** is sourced from NASA FIRMS and represents **near real-time satellite detections** from MODIS and VIIRS sensors. | |
- There may be a **delay of up to 3 hours** depending on satellite pass and processing time. | |
- ๐ง **Water bodies** are sampled from the **JRC Global Surface Water dataset**, and represent **historically persistent** water locations (occurrence > 50%). | |
- This is **not guaranteed to reflect current water availability** or seasonal changes. | |
- ๐ **Fire station locations** are retrieved from **OpenStreetMap** via Overpass API and may vary in completeness or accuracy. | |
- ๐ Map pins and routes are intended to **assist awareness**, not for operational or emergency decision-making. | |
- ๐ Data is retrieved live; occasional delays or errors may occur if external APIs (e.g., FIRMS, OpenCage, Overpass) are temporarily unavailable. | |
""") | |
# Launch the app | |
demo.launch(share = True) |