import os
from typing import Any, Dict, List, Optional, Tuple
from fastapi import FastAPI, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import folium
import json
from collections import OrderedDict
try:
# When Backend is treated as a package (e.g., uvicorn Backend.api:app from repo root)
from .chatbot_backend import GroqRAGChatbot
except Exception:
# When running inside Backend directory (e.g., uvicorn api:app)
from chatbot_backend import GroqRAGChatbot
# Initialize services
chatbot = GroqRAGChatbot()
app = FastAPI(title="SIH Groundwater API", version="1.0.0")
# CORS for Next.js app
frontend_origin = os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")
app.add_middleware(
CORSMiddleware,
allow_origins=[frontend_origin, "http://localhost:3000", "http://127.0.0.1:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
class ChatRequest(BaseModel):
query: str
def _normalize_query(q: Optional[str]) -> str:
return (q or "").strip().lower()
# Simple LRU cache for last chat results by exact query
_CHAT_CACHE_MAX = 50
_chat_cache: "OrderedDict[str, List[Dict[str, Any]]]" = OrderedDict()
def _cache_put(query: str, rows: List[Dict[str, Any]]) -> None:
key = _normalize_query(query)
if not key:
return
if key in _chat_cache:
del _chat_cache[key]
_chat_cache[key] = rows or []
while len(_chat_cache) > _CHAT_CACHE_MAX:
_chat_cache.popitem(last=False)
def _cache_get(query: Optional[str]) -> List[Dict[str, Any]]:
key = _normalize_query(query)
if not key:
return []
rows = _chat_cache.get(key)
if rows is None:
return []
# move to end (recently used)
del _chat_cache[key]
_chat_cache[key] = rows
return rows
@app.get("/health")
def health() -> Dict[str, Any]:
ok = chatbot.get_db_connection()
return {"ok": ok}
@app.post("/chat")
def chat(req: ChatRequest) -> Dict[str, Any]:
if not req.query or not req.query.strip():
raise HTTPException(status_code=400, detail="Query is required")
result = chatbot.chat(req.query.strip())
if not result.get("success"):
raise HTTPException(status_code=502, detail=result.get("response") or "Failed to process query")
try:
_cache_put(req.query, result.get("results") or [])
except Exception:
pass
return result
@app.get("/stats")
def stats() -> Dict[str, Any]:
return chatbot.get_quick_stats()
class MapQuery(BaseModel):
query: Optional[str] = None
limit: Optional[int] = 100
@app.post("/map-data")
def map_data(req: MapQuery) -> Dict[str, Any]:
"""
Returns lightweight map-ready data from Supabase rows.
- id: synthetic id
- name: district (title-cased)
- state: state (title-cased)
- area: st_area_shape (float or None)
- perimeter: st_length_shape (float or None)
- geometry: WKT/GeoJSON string stored in DB (passed through)
"""
user_query = (req.query or "").strip() or "top districts by area"
intent = chatbot.analyze_user_intent(user_query)
# Ensure geography focus for better map ranking
intent["intent_type"] = "geographic"
query = chatbot.build_supabase_query(user_query, intent)
# Override limit if provided
if req.limit and isinstance(req.limit, int):
query = query.limit(max(1, min(500, req.limit)))
rows = chatbot.execute_supabase_query(query) or []
features: List[Dict[str, Any]] = []
for idx, r in enumerate(rows):
name = (r.get("district") or "").title() if r.get("district") else None
state = (r.get("state") or "").title() if r.get("state") else None
# Best-effort numeric parsing
def to_float(x: Any) -> Optional[float]:
try:
if x in (None, ""):
return None
return float(x)
except Exception:
return None
features.append({
"id": idx + 1,
"name": name,
"state": state,
"area": to_float(r.get("st_area_shape")),
"perimeter": to_float(r.get("st_length_shape")),
"geometry": r.get("geometry")
})
return {
"count": len(features),
"features": features
}
# Uvicorn entrypoint: `python -m uvicorn Backend.api:app --reload --host 0.0.0.0 --port 8000`
@app.get("/api/map")
def map_html(query: Optional[str] = None, limit: int = 100) -> Response:
"""
Builds an HTML map using folium.
Uses the SAME rows as chat (chatbot.chat(query)['results']) to ensure identical filtering/ordering.
Frontend component `MapPlaceholder` expects this endpoint to return HTML.
"""
try:
user_query = (query or "").strip()
features_data: List[Dict[str, Any]] = []
# 1) Primary path: use EXACT results previously produced by /chat for the same query
rows: List[Dict[str, Any]] = _cache_get(user_query)
# 2) Fallback: if no rows from chat, reuse map-data builder
if not rows:
payload = MapQuery(query=user_query, limit=limit)
data = map_data(payload)
features_data = data.get("features") or []
# 3) If still nothing, as a last attempt, run chat now and cache it
if not rows and not features_data and user_query:
chat_out = chatbot.chat(user_query)
rows = chat_out.get("results") or []
_cache_put(user_query, rows)
# Convert rows -> features if we have rows
if rows and not features_data:
for idx, r in enumerate(rows[: max(1, min(500, limit))]):
def to_float(x: Any) -> Optional[float]:
try:
if x in (None, ""):
return None
return float(x)
except Exception:
return None
features_data.append({
"id": idx + 1,
"name": ((r.get("district") or "").title() if r.get("district") else None),
"state": ((r.get("state") or "").title() if r.get("state") else None),
"area": to_float(r.get("st_area_shape")),
"perimeter": to_float(r.get("st_length_shape")),
"geometry": r.get("geometry"),
})
# Initialize map centered on India
fmap = folium.Map(location=[22.9734, 78.6569], zoom_start=5, tiles="OpenStreetMap")
fg = folium.FeatureGroup(name="Underground Coverage")
# Add features; draw GeoJSON when available, otherwise add a label-only marker
# Geometry parsing helpers
def parse_geometry(geom: Any) -> Optional[Any]:
if geom is None:
return None
# Already a mapping (GeoJSON-like)
if isinstance(geom, (dict, list)):
return geom
if isinstance(geom, str):
s = geom.strip()
# JSON string
if s.startswith('{') or s.startswith('['):
try:
return json.loads(s)
except Exception:
pass
# WKT detection
wkt_prefixes = ("POLYGON", "MULTIPOLYGON", "LINESTRING", "MULTILINESTRING", "POINT", "MULTIPOINT")
if any(s.upper().startswith(p) for p in wkt_prefixes):
try:
# Try shapely if available
from shapely import wkt as _wkt
from shapely.geometry import mapping as _mapping
shape_obj = _wkt.loads(s)
return _mapping(shape_obj)
except Exception:
return None
return None
for f in features_data:
name = f.get("name") or "Unknown"
state = f.get("state") or ""
area = f.get("area")
perimeter = f.get("perimeter")
geometry = f.get("geometry")
popup = folium.Popup(
f"{name}, {state}
Area: {area or 'N/A'}
Perimeter: {perimeter or 'N/A'}",
max_width=300
)
# Try to parse geometry (JSON or WKT -> GeoJSON-like) and render
parsed = parse_geometry(geometry)
if parsed is not None:
try:
folium.GeoJson(
parsed,
name=name,
tooltip=name,
popup=popup,
style_function=lambda _:
{"fillColor": "#3186cc", "color": "#3186cc", "weight": 1, "fillOpacity": 0.4}
).add_to(fg)
continue
except Exception:
pass
# Fallback: no geometry or not JSON — add a generic marker at India center (avoids failure)
folium.Marker(
location=[22.9734, 78.6569],
tooltip=name,
popup=popup,
icon=folium.Icon(color="blue", icon="info-sign")
).add_to(fg)
fg.add_to(fmap)
folium.LayerControl().add_to(fmap)
html = fmap.get_root().render()
return Response(content=html, media_type="text/html")
except Exception as e:
return Response(content=f"
Failed to render map: {str(e)}", media_type="text/html", status_code=500) @app.get("/results") def results(query: Optional[str] = None, limit: int = 100) -> Dict[str, Any]: """ Returns the exact rows used by chat for a given query. If absent, runs chat once. Also returns a 'table' projection suited for the Explore page. """ user_query = (query or "").strip() rows: List[Dict[str, Any]] = _cache_get(user_query) if not rows and user_query: chat_out = chatbot.chat(user_query) rows = chat_out.get("results") or [] _cache_put(user_query, rows) # Clamp and normalize rows = (rows or [])[: max(1, min(500, limit))] def to_float(x: Any) -> Optional[float]: try: if x in (None, ""): return None return float(x) except Exception: return None def derive_status(stage: Optional[float]) -> str: if stage is None: return "safe" if stage > 100: return "over-exploited" if 80 <= stage <= 100: return "critical" if 60 <= stage < 80: return "semi-critical" return "safe" table = [] for r in rows: stage = to_float(r.get("stage_of_development")) draft_total = to_float(r.get("annual_gw_draft_total")) underground_area = to_float(r.get("st_area_shape")) table.append({ "district": (r.get("district") or "").title() if r.get("district") else "", "state": (r.get("state") or "").title() if r.get("state") else "", "development_stage": round(stage, 1) if isinstance(stage, (int, float)) else None, "draft_total": draft_total, "availability": to_float(r.get("net_gw_availability")), "underground_area": underground_area, "status": derive_status(stage), }) return {"count": len(rows), "rows": rows, "table": table}