kahsuen's picture
Upload 1083 files
cf0f589 verified
from typing import List, Optional, Dict, Any
from fastapi import FastAPI, HTTPException, Security, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from supabase import create_client, Client
from pydantic import BaseModel
from dotenv import load_dotenv
from pathlib import Path
import httpx
import sys
import os
from pydantic_ai.messages import (
ModelRequest,
ModelResponse,
UserPromptPart,
TextPart
)
from pydantic_mcp_agent import get_pydantic_ai_agent
# Load environment variables
load_dotenv()
mcp_agent = None
# Define a lifespan context manager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: initialize resources
global mcp_agent
mcp_agent = await get_pydantic_ai_agent()
yield
# Initialize FastAPI app
app = FastAPI(lifespan=lifespan)
security = HTTPBearer()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Supabase setup
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Request/Response Models
class AgentRequest(BaseModel):
query: str
user_id: str
request_id: str
session_id: str
class AgentResponse(BaseModel):
success: bool
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> bool:
"""Verify the bearer token against environment variable."""
expected_token = os.getenv("API_BEARER_TOKEN")
if not expected_token:
raise HTTPException(
status_code=500,
detail="API_BEARER_TOKEN environment variable not set"
)
if credentials.credentials != expected_token:
raise HTTPException(
status_code=401,
detail="Invalid authentication token"
)
return True
async def fetch_conversation_history(session_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Fetch the most recent conversation history for a session."""
try:
response = supabase.table("messages") \
.select("*") \
.eq("session_id", session_id) \
.order("created_at", desc=True) \
.limit(limit) \
.execute()
# Convert to list and reverse to get chronological order
messages = response.data[::-1]
return messages
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch conversation history: {str(e)}")
async def store_message(session_id: str, message_type: str, content: str, data: Optional[Dict] = None):
"""Store a message in the Supabase messages table."""
message_obj = {
"type": message_type,
"content": content
}
if data:
message_obj["data"] = data
try:
supabase.table("messages").insert({
"session_id": session_id,
"message": message_obj
}).execute()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to store message: {str(e)}")
@app.post("/api/pydantic-mcp-agent", response_model=AgentResponse)
async def pydantic_mcp_agent(
request: AgentRequest,
authenticated: bool = Depends(verify_token)
):
try:
# Fetch conversation history
conversation_history = await fetch_conversation_history(request.session_id)
# Convert conversation history to format expected by agent
messages = []
for msg in conversation_history:
msg_data = msg["message"]
msg_type = msg_data["type"]
msg_content = msg_data["content"]
msg = ModelRequest(parts=[UserPromptPart(content=msg_content)]) if msg_type == "human" else ModelResponse(parts=[TextPart(content=msg_content)])
messages.append(msg)
# Store user's query
await store_message(
session_id=request.session_id,
message_type="human",
content=request.query
)
# Run the agent with conversation history
result = await mcp_agent.run(
request.query,
message_history=messages
)
# Store agent's response
await store_message(
session_id=request.session_id,
message_type="ai",
content=result.data,
data={"request_id": request.request_id}
)
# Update memories based on the last user message and agent response
memory_messages = [
{"role": "user", "content": request.query},
{"role": "assistant", "content": result.data}
]
return AgentResponse(success=True)
except Exception as e:
print(f"Error processing agent request: {str(e)}")
# Store error message in conversation
await store_message(
session_id=request.session_id,
message_type="ai",
content="I apologize, but I encountered an error processing your request.",
data={"error": str(e), "request_id": request.request_id}
)
return AgentResponse(success=False)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)