|
|
""" |
|
|
Query Executor Service |
|
|
|
|
|
Handles query processing with intent detection, data querying, and response generation. |
|
|
Uses semantic search for scalable dataset discovery and session-scoped layer storage. |
|
|
""" |
|
|
|
|
|
from backend.core.llm_gateway import LLMGateway |
|
|
from backend.services.data_loader import get_data_loader |
|
|
from backend.core.geo_engine import get_geo_engine |
|
|
from backend.services.response_formatter import ResponseFormatter |
|
|
from backend.core.session_store import get_session_store |
|
|
from backend.core.semantic_search import get_semantic_search |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
from backend.core.query_planner import get_query_planner |
|
|
from typing import List, Dict, Any, Optional |
|
|
import json |
|
|
import datetime |
|
|
import uuid |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
DEFAULT_SESSION_ID = "default-session" |
|
|
|
|
|
|
|
|
class QueryExecutor: |
|
|
def __init__(self): |
|
|
self.llm = LLMGateway() |
|
|
self.data_loader = get_data_loader() |
|
|
self.geo_engine = get_geo_engine() |
|
|
self.session_store = get_session_store() |
|
|
self.semantic_search = get_semantic_search() |
|
|
self.catalog = get_data_catalog() |
|
|
self.query_planner = get_query_planner() |
|
|
|
|
|
def _get_schema_context(self) -> str: |
|
|
"""Returns the database schema for the LLM context.""" |
|
|
return self.data_loader.get_schema_context() |
|
|
|
|
|
async def process_query_with_context(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
|
|
""" |
|
|
Orchestrates the full query processing flow with conversation context. |
|
|
""" |
|
|
|
|
|
intent = await self.llm.detect_intent(query, history) |
|
|
print(f"[GeoQuery] Detected intent: {intent}") |
|
|
|
|
|
|
|
|
if intent == "GENERAL_CHAT": |
|
|
return await self._handle_general_chat(query, history) |
|
|
elif intent in ["DATA_QUERY", "MAP_REQUEST"]: |
|
|
|
|
|
return await self._handle_data_query(query, history, include_map=True) |
|
|
elif intent == "SPATIAL_OP": |
|
|
return await self._handle_spatial_op(query, history) |
|
|
elif intent == "STAT_QUERY": |
|
|
return await self._handle_stat_query(query, history) |
|
|
else: |
|
|
return await self._handle_general_chat(query, history) |
|
|
|
|
|
async def process_query_stream(self, query: str, history: List[Dict[str, str]]): |
|
|
""" |
|
|
Streamable version of process_query_with_context. |
|
|
Yields: {"event": "status"|"thought"|"chunk"|"result", "data": ...} |
|
|
""" |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ง Understanding intent..."})} |
|
|
|
|
|
intent = "GENERAL_CHAT" |
|
|
intent_buffer = "" |
|
|
|
|
|
try: |
|
|
async for chunk in self.llm.stream_intent(query, history): |
|
|
if chunk["type"] == "thought": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
|
|
elif chunk["type"] == "content": |
|
|
intent_buffer += chunk["text"] |
|
|
except Exception as e: |
|
|
print(f"Intent stream error: {e}") |
|
|
|
|
|
intent = intent_buffer.strip().upper() |
|
|
if not intent: |
|
|
intent = "GENERAL_CHAT" |
|
|
|
|
|
|
|
|
for valid in ["GENERAL_CHAT", "DATA_QUERY", "MAP_REQUEST", "SPATIAL_OP", "STAT_QUERY"]: |
|
|
if valid in intent: |
|
|
intent = valid |
|
|
break |
|
|
|
|
|
yield {"event": "intent", "data": json.dumps({"intent": intent})} |
|
|
print(f"[GeoQuery] Detected intent: {intent}") |
|
|
|
|
|
if intent == "GENERAL_CHAT": |
|
|
async for chunk in self.llm.generate_response_stream(query, history): |
|
|
|
|
|
if chunk.get("type") == "content": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk.get("text")})} |
|
|
elif chunk.get("type") == "thought": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk.get("content")})} |
|
|
|
|
|
|
|
|
yield {"event": "result", "data": json.dumps({"response": ""})} |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if intent in ["DATA_QUERY", "MAP_REQUEST", "STAT_QUERY"]: |
|
|
include_map = intent != "STAT_QUERY" |
|
|
session_id = DEFAULT_SESSION_ID |
|
|
|
|
|
|
|
|
complexity = self.query_planner.detect_complexity(query) |
|
|
|
|
|
if complexity["is_complex"]: |
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Complex query detected, planning steps..."})} |
|
|
logger.info(f"Complex query detected: {complexity['reason']}") |
|
|
|
|
|
|
|
|
async for event in self._execute_multi_step_query(query, history, include_map, session_id): |
|
|
yield event |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Searching data catalog..."})} |
|
|
|
|
|
|
|
|
candidate_tables = self.semantic_search.search_table_names(query, top_k=15) |
|
|
|
|
|
if candidate_tables: |
|
|
|
|
|
candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
|
|
else: |
|
|
|
|
|
candidate_summaries = self.catalog.get_all_table_summaries() |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Identifying relevant tables..."})} |
|
|
relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
|
|
|
|
|
|
|
|
if relevant_tables: |
|
|
yield {"event": "status", "data": json.dumps({"status": f"๐พ Loading tables: {', '.join(relevant_tables)}..."})} |
|
|
|
|
|
feature_tables = [] |
|
|
for table in relevant_tables: |
|
|
if self.geo_engine.ensure_table_loaded(table): |
|
|
feature_tables.append(table) |
|
|
|
|
|
|
|
|
table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing SQL query..."})} |
|
|
|
|
|
sql_buffer = "" |
|
|
async for chunk in self.llm.stream_analytical_sql(query, table_schema, history): |
|
|
if chunk["type"] == "thought": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
|
|
elif chunk["type"] == "content": |
|
|
sql_buffer += chunk["text"] |
|
|
|
|
|
sql = sql_buffer.replace("```sql", "").replace("```", "").strip() |
|
|
|
|
|
|
|
|
if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): |
|
|
yield {"event": "status", "data": json.dumps({"status": "โน๏ธ Data not available"})} |
|
|
|
|
|
requested = "the requested data" |
|
|
available = "administrative boundaries (provinces, districts, corregimientos)" |
|
|
|
|
|
for line in sql.split("\n"): |
|
|
if "Requested:" in line: |
|
|
requested = line.split("Requested:")[-1].strip() |
|
|
elif "Available:" in line: |
|
|
available = line.split("Available:")[-1].strip() |
|
|
|
|
|
error_response = f"""I couldn't find data for **{requested}** in the current database. |
|
|
|
|
|
**Available datasets include:** |
|
|
- {available} |
|
|
|
|
|
If you need additional data, please let me know and I can help you understand what's currently available or suggest alternative queries.""" |
|
|
|
|
|
yield { |
|
|
"event": "result", |
|
|
"data": json.dumps({ |
|
|
"response": error_response, |
|
|
"sql_query": sql, |
|
|
"geojson": None, |
|
|
"data_citations": [], |
|
|
"chart_data": None, |
|
|
"raw_data": [] |
|
|
}) |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "โก Executing query..."})} |
|
|
|
|
|
geojson = None |
|
|
features = [] |
|
|
error_message = None |
|
|
|
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
yield {"event": "status", "data": json.dumps({"status": f"โ
Found {len(features)} results"})} |
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
yield {"event": "status", "data": json.dumps({"status": "โ ๏ธ Query error, attempting repair..."})} |
|
|
try: |
|
|
sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
error_message = None |
|
|
except Exception as e2: |
|
|
print(f"Repair failed: {e2}") |
|
|
|
|
|
if error_message: |
|
|
yield { |
|
|
"event": "result", |
|
|
"data": json.dumps({ |
|
|
"response": f"I was unable to process your request because the data query failed. \n\nError details: {error_message}", |
|
|
"sql_query": sql, |
|
|
"geojson": None, |
|
|
"data_citations": [], |
|
|
"chart_data": None, |
|
|
"raw_data": [] |
|
|
}) |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
citations = ResponseFormatter.generate_citations(relevant_tables, features) |
|
|
|
|
|
|
|
|
chart_data = ResponseFormatter.generate_chart_data(sql, features) |
|
|
if intent == "STAT_QUERY" and not chart_data and features: |
|
|
chart_data = ResponseFormatter.generate_chart_data("GROUP BY forced", features) |
|
|
|
|
|
|
|
|
raw_data = ResponseFormatter.prepare_raw_data(features) |
|
|
|
|
|
|
|
|
if include_map and features and geojson: |
|
|
|
|
|
layer_info = await self.llm.generate_layer_name(query, sql) |
|
|
layer_name_ai = layer_info.get("name", "Map Layer") |
|
|
layer_emoji = layer_info.get("emoji", "๐") |
|
|
point_style = layer_info.get("pointStyle", None) |
|
|
geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
|
|
|
|
|
try: |
|
|
table_name = self.geo_engine.register_layer(layer_id, geojson) |
|
|
self.session_store.add_layer(session_id, { |
|
|
"id": layer_id, |
|
|
"name": layer_name, |
|
|
"table_name": table_name, |
|
|
"timestamp": datetime.datetime.now().isoformat() |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to register layer: {e}") |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ฌ Generating explanation..."})} |
|
|
|
|
|
data_summary = ResponseFormatter.generate_data_summary(features) |
|
|
|
|
|
explanation_buffer = "" |
|
|
|
|
|
async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): |
|
|
if chunk["type"] == "thought": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
|
|
elif chunk["type"] == "content": |
|
|
explanation_buffer += chunk["text"] |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
|
|
|
|
|
|
|
|
yield {"event": "result", "data": json.dumps({ |
|
|
"response": explanation_buffer, |
|
|
"sql_query": sql, |
|
|
"geojson": geojson if include_map and features else None, |
|
|
"chart_data": chart_data, |
|
|
"raw_data": raw_data, |
|
|
"data_citations": citations |
|
|
})} |
|
|
|
|
|
elif intent == "SPATIAL_OP": |
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Preparing spatial operation..."})} |
|
|
session_id = DEFAULT_SESSION_ID |
|
|
|
|
|
|
|
|
candidate_tables = self.semantic_search.search_table_names(query, top_k=15) |
|
|
if candidate_tables: |
|
|
candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
|
|
else: |
|
|
candidate_summaries = self.catalog.get_all_table_summaries() |
|
|
|
|
|
|
|
|
relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
|
|
|
|
|
|
|
|
for table in relevant_tables: |
|
|
self.geo_engine.ensure_table_loaded(table) |
|
|
|
|
|
|
|
|
base_table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
|
|
|
session_layers = self.session_store.get_layers(session_id) |
|
|
layer_context = "User-Created Layers:\n" |
|
|
if not session_layers: |
|
|
layer_context += "(No user layers created yet.)\n" |
|
|
else: |
|
|
for i, layer in enumerate(session_layers): |
|
|
layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" |
|
|
|
|
|
|
|
|
full_context = f"{base_table_schema}\n\n{layer_context}" |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing spatial SQL..."})} |
|
|
sql = await self.llm.generate_spatial_sql(query, full_context, history) |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Processing geometry..."})} |
|
|
error_message = None |
|
|
geojson = None |
|
|
features = [] |
|
|
|
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
yield {"event": "status", "data": json.dumps({"status": f"โ
Result contains {len(features)} features"})} |
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
yield {"event": "status", "data": json.dumps({"status": "โ ๏ธ Spatial error, attempting repair..."})} |
|
|
try: |
|
|
sql = await self.llm.correct_sql(query, sql, error_message, full_context) |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
error_message = None |
|
|
except Exception as e2: |
|
|
yield { |
|
|
"event": "result", |
|
|
"data": json.dumps({ |
|
|
"response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", |
|
|
"sql_query": sql, |
|
|
"geojson": None, |
|
|
"data_citations": [], |
|
|
"chart_data": None, |
|
|
"raw_data": [] |
|
|
}) |
|
|
} |
|
|
return |
|
|
|
|
|
|
|
|
if features: |
|
|
|
|
|
layer_info = await self.llm.generate_layer_name(query, sql) |
|
|
layer_name_ai = layer_info.get("name", "Map Layer") |
|
|
layer_emoji = layer_info.get("emoji", "๐") |
|
|
point_style = layer_info.get("pointStyle", None) |
|
|
geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
|
|
|
|
|
try: |
|
|
table_name = self.geo_engine.register_layer(layer_id, geojson) |
|
|
self.session_store.add_layer(session_id, { |
|
|
"id": layer_id, |
|
|
"name": layer_name, |
|
|
"table_name": table_name, |
|
|
"timestamp": datetime.datetime.now().isoformat() |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to register layer: {e}") |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ฌ Explaining results..."})} |
|
|
data_summary = f"Spatial operation resulted in {len(features)} features." |
|
|
|
|
|
explanation_buffer = "" |
|
|
async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): |
|
|
if chunk["type"] == "thought": |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} |
|
|
elif chunk["type"] == "content": |
|
|
explanation_buffer += chunk["text"] |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
|
|
|
|
|
|
|
|
yield {"event": "result", "data": json.dumps({ |
|
|
"response": explanation_buffer, |
|
|
"sql_query": sql, |
|
|
"geojson": geojson, |
|
|
"chart_data": None, |
|
|
"raw_data": [], |
|
|
"data_citations": [] |
|
|
})} |
|
|
return |
|
|
|
|
|
else: |
|
|
|
|
|
yield {"event": "chunk", "data": json.dumps({"type": "text", "content": "I'm not sure how to handle this query yet."})} |
|
|
|
|
|
async def _handle_general_chat(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
|
|
"""Handles general conversational queries.""" |
|
|
|
|
|
enhanced_query = f"""The user is asking about Panama geographic data. |
|
|
|
|
|
Available data: {len(self.data_loader.admin1)} provinces, {len(self.data_loader.admin2)} districts, {len(self.data_loader.admin3)} corregimientos. |
|
|
|
|
|
User question: {query} |
|
|
|
|
|
Respond helpfully as GeoQuery, the territorial intelligence assistant.""" |
|
|
|
|
|
response = await self.llm.generate_response(enhanced_query, history) |
|
|
|
|
|
return { |
|
|
"response": response, |
|
|
"sql_query": None, |
|
|
"geojson": None, |
|
|
"data_citations": [], |
|
|
"intent": "GENERAL_CHAT" |
|
|
} |
|
|
|
|
|
async def _handle_data_query(self, query: str, history: List[Dict[str, str]], include_map: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Handles data queries using text-to-SQL with SOTA Smart Discovery. |
|
|
""" |
|
|
print(f"[GeoQuery] Starting Data Query: {query}") |
|
|
|
|
|
|
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
catalog = get_data_catalog() |
|
|
|
|
|
|
|
|
summaries = catalog.get_all_table_summaries() |
|
|
|
|
|
|
|
|
relevant_tables = await self.llm.identify_relevant_tables(query, summaries) |
|
|
|
|
|
|
|
|
feature_tables = [] |
|
|
for table in relevant_tables: |
|
|
if self.geo_engine.ensure_table_loaded(table): |
|
|
feature_tables.append(table) |
|
|
else: |
|
|
print(f"[GeoQuery] Warning: Could not load relevant table '{table}'") |
|
|
|
|
|
|
|
|
table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
|
|
|
if len(table_schema) < 50: |
|
|
print("[GeoQuery] GeoEngine schema empty. Fetching from Catalog Metadata.") |
|
|
fallback_tables = list(set(feature_tables + ["pan_admin1", "pan_admin2", "pan_admin3"])) |
|
|
table_schema = catalog.get_specific_table_schemas(fallback_tables) |
|
|
|
|
|
|
|
|
print(f"[GeoQuery] Generating SQL with context size: {len(table_schema)} chars") |
|
|
sql = await self.llm.generate_analytical_sql(query, table_schema, history) |
|
|
|
|
|
|
|
|
if sql.startswith("-- Error"): |
|
|
available_data = ", ".join(feature_tables) if feature_tables else "Administrative Boundaries" |
|
|
return { |
|
|
"response": f"I couldn't find the specific data you asked for. I have access to: {available_data}. \n\nOriginal request: {query}", |
|
|
"sql_query": sql, |
|
|
"intent": "DATA_QUERY" |
|
|
} |
|
|
|
|
|
|
|
|
error_message = None |
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
print(f"[GeoQuery] Query returned {len(features)} features") |
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
print(f"[GeoQuery] SQL execution error: {error_message}") |
|
|
|
|
|
|
|
|
try: |
|
|
sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
error_message = None |
|
|
except Exception as e2: |
|
|
return { |
|
|
"response": f"The SQL query failed to execute even after an automatic repair attempt.\nOriginal Error: {error_message}\nRepair Error: {str(e2)}", |
|
|
"sql_query": sql, |
|
|
"intent": "DATA_QUERY" |
|
|
} |
|
|
|
|
|
|
|
|
citations = ResponseFormatter.generate_citations(relevant_tables, features) |
|
|
data_summary = ResponseFormatter.generate_data_summary(features) |
|
|
|
|
|
|
|
|
explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
|
|
|
|
|
|
|
|
if include_map and features: |
|
|
|
|
|
layer_info = await self.llm.generate_layer_name(query, sql) |
|
|
layer_name_ai = layer_info.get("name", "Map Layer") |
|
|
layer_emoji = layer_info.get("emoji", "๐") |
|
|
point_style = layer_info.get("pointStyle", None) |
|
|
geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
|
|
|
|
|
try: |
|
|
table_name = self.geo_engine.register_layer(layer_id, geojson) |
|
|
self.session_store.add_layer(DEFAULT_SESSION_ID, { |
|
|
"id": layer_id, |
|
|
"name": layer_name, |
|
|
"table_name": table_name, |
|
|
"timestamp": datetime.datetime.now().isoformat() |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to register layer in GeoEngine: {e}") |
|
|
|
|
|
|
|
|
chart_data = ResponseFormatter.generate_chart_data(sql, features) |
|
|
|
|
|
|
|
|
raw_data = ResponseFormatter.prepare_raw_data(features) |
|
|
|
|
|
return { |
|
|
"response": explanation, |
|
|
"sql_query": sql, |
|
|
"geojson": geojson if include_map and features else None, |
|
|
"data_citations": citations, |
|
|
"chart_data": chart_data, |
|
|
"raw_data": raw_data, |
|
|
"intent": "DATA_QUERY" if not include_map else "MAP_REQUEST" |
|
|
} |
|
|
|
|
|
async def _handle_spatial_op(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
|
|
"""Handles spatial operations (Difference, Intersection, etc) using GeoEngine.""" |
|
|
|
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
catalog = get_data_catalog() |
|
|
summaries = catalog.get_all_table_summaries() |
|
|
|
|
|
|
|
|
relevant_tables = await self.llm.identify_relevant_tables(query, summaries) |
|
|
|
|
|
|
|
|
for table in relevant_tables: |
|
|
self.geo_engine.ensure_table_loaded(table) |
|
|
|
|
|
|
|
|
base_table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
|
|
|
session_layers = self.session_store.get_layers(DEFAULT_SESSION_ID) |
|
|
layer_context = "User-Created Layers:\n" |
|
|
if not session_layers: |
|
|
layer_context += "(No user layers created yet.)\n" |
|
|
else: |
|
|
for i, layer in enumerate(session_layers): |
|
|
layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" |
|
|
|
|
|
|
|
|
full_context = f"{base_table_schema}\n\n{layer_context}" |
|
|
|
|
|
|
|
|
sql = await self.llm.generate_spatial_sql(query, full_context, history) |
|
|
|
|
|
|
|
|
error_message = None |
|
|
geojson = None |
|
|
features = [] |
|
|
|
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
try: |
|
|
sql = await self.llm.correct_sql(query, sql, error_message, full_context) |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
error_message = None |
|
|
except Exception as e2: |
|
|
return { |
|
|
"response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", |
|
|
"sql_query": sql, |
|
|
"intent": "SPATIAL_OP" |
|
|
} |
|
|
|
|
|
|
|
|
if features: |
|
|
|
|
|
layer_info = await self.llm.generate_layer_name(query, sql) |
|
|
layer_name_ai = layer_info.get("name", "Map Layer") |
|
|
layer_emoji = layer_info.get("emoji", "๐") |
|
|
point_style = layer_info.get("pointStyle", None) |
|
|
geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) |
|
|
table_name = self.geo_engine.register_layer(layer_id, geojson) |
|
|
self.session_store.add_layer(DEFAULT_SESSION_ID, { |
|
|
"id": layer_id, |
|
|
"name": layer_name, |
|
|
"table_name": table_name, |
|
|
"timestamp": datetime.datetime.now().isoformat() |
|
|
}) |
|
|
|
|
|
data_summary = f"Spatial operation resulted in {len(features)} features." |
|
|
explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
|
|
|
|
|
return { |
|
|
"response": explanation, |
|
|
"sql_query": sql, |
|
|
"geojson": geojson, |
|
|
"data_citations": [], |
|
|
"intent": "SPATIAL_OP" |
|
|
} |
|
|
|
|
|
async def _handle_stat_query(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: |
|
|
""" |
|
|
Handles statistical queries where charts/tables are more important than maps. |
|
|
""" |
|
|
|
|
|
result = await self._handle_data_query(query, history, include_map=False) |
|
|
result["intent"] = "STAT_QUERY" |
|
|
|
|
|
|
|
|
if not result.get("chart_data") and result.get("raw_data"): |
|
|
|
|
|
features_mock = [{"properties": d} for d in result["raw_data"]] |
|
|
result["chart_data"] = ResponseFormatter.generate_chart_data(result.get("sql_query", ""), features_mock) |
|
|
|
|
|
return result |
|
|
|
|
|
async def _execute_multi_step_query( |
|
|
self, |
|
|
query: str, |
|
|
history: List[Dict[str, str]], |
|
|
include_map: bool, |
|
|
session_id: str |
|
|
): |
|
|
""" |
|
|
Execute a complex query by breaking it into multiple steps. |
|
|
|
|
|
Yields streaming events throughout the multi-step process. |
|
|
""" |
|
|
import asyncio |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Discovering relevant datasets..."})} |
|
|
|
|
|
candidate_tables = self.semantic_search.search_table_names(query, top_k=20) |
|
|
if not candidate_tables: |
|
|
candidate_tables = list(self.catalog.catalog.keys()) |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Creating execution plan..."})} |
|
|
|
|
|
plan = await self.query_planner.plan_query(query, candidate_tables, self.llm) |
|
|
|
|
|
if not plan.is_complex or not plan.steps: |
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ Executing as simple query..."})} |
|
|
|
|
|
candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) |
|
|
relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) |
|
|
|
|
|
for table in relevant_tables: |
|
|
self.geo_engine.ensure_table_loaded(table) |
|
|
|
|
|
table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "โ๏ธ Writing SQL query..."})} |
|
|
sql = await self.llm.generate_analytical_sql(query, table_schema, history) |
|
|
sql = sql.replace("```sql", "").replace("```", "").strip() |
|
|
|
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
except Exception as e: |
|
|
yield {"event": "result", "data": json.dumps({ |
|
|
"response": f"Query execution failed: {str(e)}", |
|
|
"sql_query": sql |
|
|
})} |
|
|
return |
|
|
|
|
|
data_summary = ResponseFormatter.generate_data_summary(features) |
|
|
explanation = await self.llm.generate_explanation(query, sql, data_summary, history) |
|
|
|
|
|
yield {"event": "result", "data": json.dumps({ |
|
|
"response": explanation, |
|
|
"sql_query": sql, |
|
|
"geojson": geojson if include_map and features else None, |
|
|
"chart_data": ResponseFormatter.generate_chart_data(sql, features), |
|
|
"raw_data": ResponseFormatter.prepare_raw_data(features), |
|
|
"data_citations": [] |
|
|
})} |
|
|
return |
|
|
|
|
|
|
|
|
step_descriptions = [f"Step {i+1}: {s.description}" for i, s in enumerate(plan.steps)] |
|
|
yield {"event": "chunk", "data": json.dumps({ |
|
|
"type": "thought", |
|
|
"content": f"Planning multi-step execution:\n" + "\n".join(step_descriptions) |
|
|
})} |
|
|
|
|
|
|
|
|
all_tables = set() |
|
|
for step in plan.steps: |
|
|
all_tables.update(step.tables_needed) |
|
|
|
|
|
if all_tables: |
|
|
yield {"event": "status", "data": json.dumps({"status": f"๐พ Loading {len(all_tables)} datasets..."})} |
|
|
for table in all_tables: |
|
|
self.geo_engine.ensure_table_loaded(table) |
|
|
|
|
|
|
|
|
intermediate_results = {} |
|
|
all_features = [] |
|
|
all_sql = [] |
|
|
|
|
|
for group_idx, group in enumerate(plan.parallel_groups): |
|
|
group_steps = [s for s in plan.steps if s.step_id in group] |
|
|
|
|
|
yield {"event": "status", "data": json.dumps({ |
|
|
"status": f"โก Executing step group {group_idx + 1}/{len(plan.parallel_groups)}..." |
|
|
})} |
|
|
|
|
|
|
|
|
for step in group_steps: |
|
|
yield {"event": "status", "data": json.dumps({ |
|
|
"status": f"๐ {step.description}..." |
|
|
})} |
|
|
|
|
|
|
|
|
table_schema = self.geo_engine.get_table_schemas() |
|
|
|
|
|
|
|
|
step_query = f"""Execute this step: {step.description} |
|
|
|
|
|
Original user request: {query} |
|
|
|
|
|
SQL Hint: {step.sql_template or 'None'} |
|
|
|
|
|
Previous step results available: {list(intermediate_results.keys())}""" |
|
|
|
|
|
sql = await self.llm.generate_analytical_sql(step_query, table_schema, history) |
|
|
sql = sql.replace("```sql", "").replace("```", "").strip() |
|
|
|
|
|
|
|
|
if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): |
|
|
logger.warning(f"Step {step.step_id} indicated data unavailable") |
|
|
intermediate_results[step.result_name] = {"features": [], "sql": sql} |
|
|
continue |
|
|
|
|
|
try: |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
|
|
|
intermediate_results[step.result_name] = { |
|
|
"features": features, |
|
|
"sql": sql, |
|
|
"geojson": geojson |
|
|
} |
|
|
all_features.extend(features) |
|
|
all_sql.append(f"-- {step.description}\n{sql}") |
|
|
|
|
|
yield {"event": "status", "data": json.dumps({ |
|
|
"status": f"โ
Step got {len(features)} results" |
|
|
})} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Step {step.step_id} failed: {e}") |
|
|
|
|
|
try: |
|
|
sql = await self.llm.correct_sql(step_query, sql, str(e), table_schema) |
|
|
geojson = self.geo_engine.execute_spatial_query(sql) |
|
|
features = geojson.get("features", []) |
|
|
intermediate_results[step.result_name] = { |
|
|
"features": features, |
|
|
"sql": sql, |
|
|
"geojson": geojson |
|
|
} |
|
|
all_features.extend(features) |
|
|
all_sql.append(f"-- {step.description} (repaired)\n{sql}") |
|
|
except Exception as e2: |
|
|
logger.error(f"Step repair also failed: {e2}") |
|
|
intermediate_results[step.result_name] = {"features": [], "sql": sql, "error": str(e2)} |
|
|
|
|
|
|
|
|
yield {"event": "status", "data": json.dumps({"status": "๐ฌ Generating combined analysis..."})} |
|
|
|
|
|
|
|
|
result_summary = [] |
|
|
for name, result in intermediate_results.items(): |
|
|
features = result.get("features", []) |
|
|
result_summary.append(f"{name}: {len(features)} records") |
|
|
|
|
|
combined_summary = f"""Multi-step query completed with {len(plan.steps)} steps. |
|
|
|
|
|
Results: |
|
|
{chr(10).join(result_summary)} |
|
|
|
|
|
Combination logic: {plan.final_combination_logic}""" |
|
|
|
|
|
|
|
|
explanation_buffer = "" |
|
|
async for chunk in self.llm.stream_explanation(query, "\n\n".join(all_sql), combined_summary, history): |
|
|
if chunk["type"] == "content": |
|
|
explanation_buffer += chunk["text"] |
|
|
yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} |
|
|
|
|
|
|
|
|
best_geojson = None |
|
|
best_features = [] |
|
|
for name, result in intermediate_results.items(): |
|
|
features = result.get("features", []) |
|
|
if len(features) > len(best_features): |
|
|
best_features = features |
|
|
best_geojson = result.get("geojson") |
|
|
|
|
|
|
|
|
if include_map and best_features and best_geojson: |
|
|
layer_info = await self.llm.generate_layer_name(query, all_sql[0] if all_sql else "") |
|
|
layer_name_ai = layer_info.get("name", "Multi-Step Result") |
|
|
layer_emoji = layer_info.get("emoji", "๐") |
|
|
best_geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer( |
|
|
query, best_geojson, best_features, layer_name_ai, layer_emoji |
|
|
) |
|
|
|
|
|
try: |
|
|
table_name = self.geo_engine.register_layer(layer_id, best_geojson) |
|
|
self.session_store.add_layer(session_id, { |
|
|
"id": layer_id, |
|
|
"name": layer_name, |
|
|
"table_name": table_name, |
|
|
"timestamp": datetime.datetime.now().isoformat() |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to register multi-step layer: {e}") |
|
|
|
|
|
|
|
|
chart_data = ResponseFormatter.generate_chart_data("\n".join(all_sql), best_features) |
|
|
raw_data = ResponseFormatter.prepare_raw_data(best_features) |
|
|
|
|
|
|
|
|
yield {"event": "result", "data": json.dumps({ |
|
|
"response": explanation_buffer, |
|
|
"sql_query": "\n\n".join(all_sql), |
|
|
"geojson": best_geojson if include_map and best_features else None, |
|
|
"chart_data": chart_data, |
|
|
"raw_data": raw_data, |
|
|
"data_citations": [], |
|
|
"multi_step": True, |
|
|
"steps_executed": len(plan.steps) |
|
|
})} |
|
|
|