|
|
from fastapi import APIRouter |
|
|
from pydantic import BaseModel |
|
|
from typing import Optional |
|
|
from backend.services.executor import QueryExecutor |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
class MessageHistory(BaseModel): |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
|
|
|
class ChatRequest(BaseModel): |
|
|
message: str |
|
|
history: list[MessageHistory] = [] |
|
|
|
|
|
|
|
|
class ChartData(BaseModel): |
|
|
type: str |
|
|
title: Optional[str] = None |
|
|
data: list[dict] = [] |
|
|
xKey: Optional[str] = None |
|
|
yKey: Optional[str] = None |
|
|
lines: Optional[list[dict]] = None |
|
|
|
|
|
|
|
|
class ChatResponse(BaseModel): |
|
|
response: str |
|
|
sql_query: Optional[str] = None |
|
|
geojson: Optional[dict] = None |
|
|
data_citations: list[str] = [] |
|
|
intent: Optional[str] = None |
|
|
chart_data: Optional[ChartData] = None |
|
|
|
|
|
|
|
|
@router.post("/", response_model=ChatResponse) |
|
|
async def chat(request: ChatRequest): |
|
|
""" |
|
|
Main chat endpoint that handles conversation with context. |
|
|
Routes to appropriate handler based on detected intent. |
|
|
""" |
|
|
executor = QueryExecutor() |
|
|
|
|
|
|
|
|
history = [{"role": h.role, "content": h.content} for h in request.history] |
|
|
|
|
|
|
|
|
result = await executor.process_query_with_context( |
|
|
query=request.message, |
|
|
history=history |
|
|
) |
|
|
|
|
|
return ChatResponse( |
|
|
response=result.get("response", "I processed your request."), |
|
|
sql_query=result.get("sql_query"), |
|
|
geojson=result.get("geojson"), |
|
|
data_citations=result.get("data_citations", []), |
|
|
intent=result.get("intent"), |
|
|
chart_data=result.get("chart_data"), |
|
|
raw_data=result.get("raw_data") |
|
|
) |
|
|
|
|
|
|
|
|
from sse_starlette.sse import EventSourceResponse |
|
|
import json |
|
|
import asyncio |
|
|
|
|
|
@router.post("/stream") |
|
|
async def chat_stream(request: ChatRequest): |
|
|
""" |
|
|
Streaming chat endpoint that returns Server-Sent Events (SSE). |
|
|
""" |
|
|
executor = QueryExecutor() |
|
|
history = [{"role": h.role, "content": h.content} for h in request.history] |
|
|
|
|
|
async def event_generator(): |
|
|
try: |
|
|
|
|
|
async for event in executor.process_query_stream(request.message, history): |
|
|
yield event |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Stream error: {e}") |
|
|
yield { |
|
|
"event": "chunk", |
|
|
"data": json.dumps({"type": "text", "content": f"\n\nError: {str(e)}"}) |
|
|
} |
|
|
|
|
|
return EventSourceResponse(event_generator()) |
|
|
|