import os from typing import Any, Dict, List, Optional, Tuple from fastapi import FastAPI, HTTPException, Response from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import folium import json from collections import OrderedDict try: # When Backend is treated as a package (e.g., uvicorn Backend.api:app from repo root) from .chatbot_backend import GroqRAGChatbot except Exception: # When running inside Backend directory (e.g., uvicorn api:app) from chatbot_backend import GroqRAGChatbot # Initialize services chatbot = GroqRAGChatbot() app = FastAPI(title="SIH Groundwater API", version="1.0.0") # CORS for Next.js app frontend_origin = os.getenv("FRONTEND_ORIGIN", "http://localhost:3000") app.add_middleware( CORSMiddleware, allow_origins=[frontend_origin, "http://localhost:3000", "http://127.0.0.1:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) class ChatRequest(BaseModel): query: str def _normalize_query(q: Optional[str]) -> str: return (q or "").strip().lower() # Simple LRU cache for last chat results by exact query _CHAT_CACHE_MAX = 50 _chat_cache: "OrderedDict[str, List[Dict[str, Any]]]" = OrderedDict() def _cache_put(query: str, rows: List[Dict[str, Any]]) -> None: key = _normalize_query(query) if not key: return if key in _chat_cache: del _chat_cache[key] _chat_cache[key] = rows or [] while len(_chat_cache) > _CHAT_CACHE_MAX: _chat_cache.popitem(last=False) def _cache_get(query: Optional[str]) -> List[Dict[str, Any]]: key = _normalize_query(query) if not key: return [] rows = _chat_cache.get(key) if rows is None: return [] # move to end (recently used) del _chat_cache[key] _chat_cache[key] = rows return rows @app.get("/health") def health() -> Dict[str, Any]: ok = chatbot.get_db_connection() return {"ok": ok} @app.post("/chat") def chat(req: ChatRequest) -> Dict[str, Any]: if not req.query or not req.query.strip(): raise HTTPException(status_code=400, detail="Query is required") result = chatbot.chat(req.query.strip()) if not result.get("success"): raise HTTPException(status_code=502, detail=result.get("response") or "Failed to process query") try: _cache_put(req.query, result.get("results") or []) except Exception: pass return result @app.get("/stats") def stats() -> Dict[str, Any]: return chatbot.get_quick_stats() class MapQuery(BaseModel): query: Optional[str] = None limit: Optional[int] = 100 @app.post("/map-data") def map_data(req: MapQuery) -> Dict[str, Any]: """ Returns lightweight map-ready data from Supabase rows. - id: synthetic id - name: district (title-cased) - state: state (title-cased) - area: st_area_shape (float or None) - perimeter: st_length_shape (float or None) - geometry: WKT/GeoJSON string stored in DB (passed through) """ user_query = (req.query or "").strip() or "top districts by area" intent = chatbot.analyze_user_intent(user_query) # Ensure geography focus for better map ranking intent["intent_type"] = "geographic" query = chatbot.build_supabase_query(user_query, intent) # Override limit if provided if req.limit and isinstance(req.limit, int): query = query.limit(max(1, min(500, req.limit))) rows = chatbot.execute_supabase_query(query) or [] features: List[Dict[str, Any]] = [] for idx, r in enumerate(rows): name = (r.get("district") or "").title() if r.get("district") else None state = (r.get("state") or "").title() if r.get("state") else None # Best-effort numeric parsing def to_float(x: Any) -> Optional[float]: try: if x in (None, ""): return None return float(x) except Exception: return None features.append({ "id": idx + 1, "name": name, "state": state, "area": to_float(r.get("st_area_shape")), "perimeter": to_float(r.get("st_length_shape")), "geometry": r.get("geometry") }) return { "count": len(features), "features": features } # Uvicorn entrypoint: `python -m uvicorn Backend.api:app --reload --host 0.0.0.0 --port 8000` @app.get("/api/map") def map_html(query: Optional[str] = None, limit: int = 100) -> Response: """ Builds an HTML map using folium. Uses the SAME rows as chat (chatbot.chat(query)['results']) to ensure identical filtering/ordering. Frontend component `MapPlaceholder` expects this endpoint to return HTML. """ try: user_query = (query or "").strip() features_data: List[Dict[str, Any]] = [] # 1) Primary path: use EXACT results previously produced by /chat for the same query rows: List[Dict[str, Any]] = _cache_get(user_query) # 2) Fallback: if no rows from chat, reuse map-data builder if not rows: payload = MapQuery(query=user_query, limit=limit) data = map_data(payload) features_data = data.get("features") or [] # 3) If still nothing, as a last attempt, run chat now and cache it if not rows and not features_data and user_query: chat_out = chatbot.chat(user_query) rows = chat_out.get("results") or [] _cache_put(user_query, rows) # Convert rows -> features if we have rows if rows and not features_data: for idx, r in enumerate(rows[: max(1, min(500, limit))]): def to_float(x: Any) -> Optional[float]: try: if x in (None, ""): return None return float(x) except Exception: return None features_data.append({ "id": idx + 1, "name": ((r.get("district") or "").title() if r.get("district") else None), "state": ((r.get("state") or "").title() if r.get("state") else None), "area": to_float(r.get("st_area_shape")), "perimeter": to_float(r.get("st_length_shape")), "geometry": r.get("geometry"), }) # Initialize map centered on India fmap = folium.Map(location=[22.9734, 78.6569], zoom_start=5, tiles="OpenStreetMap") fg = folium.FeatureGroup(name="Underground Coverage") # Add features; draw GeoJSON when available, otherwise add a label-only marker # Geometry parsing helpers def parse_geometry(geom: Any) -> Optional[Any]: if geom is None: return None # Already a mapping (GeoJSON-like) if isinstance(geom, (dict, list)): return geom if isinstance(geom, str): s = geom.strip() # JSON string if s.startswith('{') or s.startswith('['): try: return json.loads(s) except Exception: pass # WKT detection wkt_prefixes = ("POLYGON", "MULTIPOLYGON", "LINESTRING", "MULTILINESTRING", "POINT", "MULTIPOINT") if any(s.upper().startswith(p) for p in wkt_prefixes): try: # Try shapely if available from shapely import wkt as _wkt from shapely.geometry import mapping as _mapping shape_obj = _wkt.loads(s) return _mapping(shape_obj) except Exception: return None return None for f in features_data: name = f.get("name") or "Unknown" state = f.get("state") or "" area = f.get("area") perimeter = f.get("perimeter") geometry = f.get("geometry") popup = folium.Popup( f"{name}, {state}
Area: {area or 'N/A'}
Perimeter: {perimeter or 'N/A'}", max_width=300 ) # Try to parse geometry (JSON or WKT -> GeoJSON-like) and render parsed = parse_geometry(geometry) if parsed is not None: try: folium.GeoJson( parsed, name=name, tooltip=name, popup=popup, style_function=lambda _: {"fillColor": "#3186cc", "color": "#3186cc", "weight": 1, "fillOpacity": 0.4} ).add_to(fg) continue except Exception: pass # Fallback: no geometry or not JSON — add a generic marker at India center (avoids failure) folium.Marker( location=[22.9734, 78.6569], tooltip=name, popup=popup, icon=folium.Icon(color="blue", icon="info-sign") ).add_to(fg) fg.add_to(fmap) folium.LayerControl().add_to(fmap) html = fmap.get_root().render() return Response(content=html, media_type="text/html") except Exception as e: return Response(content=f"
Failed to render map: {str(e)}
", media_type="text/html", status_code=500) @app.get("/results") def results(query: Optional[str] = None, limit: int = 100) -> Dict[str, Any]: """ Returns the exact rows used by chat for a given query. If absent, runs chat once. Also returns a 'table' projection suited for the Explore page. """ user_query = (query or "").strip() rows: List[Dict[str, Any]] = _cache_get(user_query) if not rows and user_query: chat_out = chatbot.chat(user_query) rows = chat_out.get("results") or [] _cache_put(user_query, rows) # Clamp and normalize rows = (rows or [])[: max(1, min(500, limit))] def to_float(x: Any) -> Optional[float]: try: if x in (None, ""): return None return float(x) except Exception: return None def derive_status(stage: Optional[float]) -> str: if stage is None: return "safe" if stage > 100: return "over-exploited" if 80 <= stage <= 100: return "critical" if 60 <= stage < 80: return "semi-critical" return "safe" table = [] for r in rows: stage = to_float(r.get("stage_of_development")) draft_total = to_float(r.get("annual_gw_draft_total")) underground_area = to_float(r.get("st_area_shape")) table.append({ "district": (r.get("district") or "").title() if r.get("district") else "", "state": (r.get("state") or "").title() if r.get("state") else "", "development_stage": round(stage, 1) if isinstance(stage, (int, float)) else None, "draft_total": draft_total, "availability": to_float(r.get("net_gw_availability")), "underground_area": underground_area, "status": derive_status(stage), }) return {"count": len(rows), "rows": rows, "table": table}