kahsuen's picture
Upload 1083 files
cf0f589 verified
"""
API for MCP agents.
This module provides a FastAPI application to expose the MCP agent functionality.
"""
import logging
import asyncio
import os
from typing import List, Dict, Any, Optional, Literal
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from deep_research_integration import post_question_generation_hook, post_hypothesis_generation_hook
from cognee_integration import search_knowledge_graph
from ai_co_scientist_integration import generate_and_enhance_hypotheses, get_active_hypotheses
from hypothesis_questions import generate_questions_from_hypothesis
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("mcp-agents-api")
# Create FastAPI app
app = FastAPI(
title="MCP Agents API",
description="API for specialized MCP agents to enhance the research cycle",
version="1.0.0"
)
# Define request and response models
class ResearchQuestionsRequest(BaseModel):
query: str
questions: List[str]
domain_context: Optional[str] = None
class ResearchQuestionsResponse(BaseModel):
domain: str
enhanced_questions: Dict[str, Any]
class HypothesisItem(BaseModel):
id: str
title: str
text: str
novelty_review: Optional[str] = None
feasibility_review: Optional[str] = None
elo_score: Optional[float] = None
review_comments: Optional[List[str]] = None
references: Optional[List[str]] = None
is_active: Optional[bool] = None
parent_ids: Optional[List[str]] = None
class HypothesesRequest(BaseModel):
query: str
hypotheses: List[HypothesisItem]
research_goal: str
class HypothesesResponse(BaseModel):
domain: str
enhanced_hypotheses: Dict[str, Any]
class ChatRequest(BaseModel):
message: str
searchType: Literal['GRAPH_COMPLETION', 'COMPLETION', 'INSIGHTS'] = 'GRAPH_COMPLETION'
class ChatResponse(BaseModel):
response: str
sources: Optional[List[Dict[str, Any]]] = None
@app.get("/")
async def root():
"""Root endpoint."""
return {"message": "MCP Agents API is running"}
@app.post("/enhance-questions", response_model=ResearchQuestionsResponse)
async def enhance_questions(request: ResearchQuestionsRequest):
"""
Enhance research questions using the appropriate specialized agent.
Args:
request (ResearchQuestionsRequest): The request containing the query and questions.
Returns:
ResearchQuestionsResponse: Enhanced research questions with explanations and context.
"""
try:
logger.info(f"Enhancing questions for query: {request.query}")
result = post_question_generation_hook(request.query, request.questions)
return result
except Exception as e:
logger.error(f"Error enhancing questions: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error enhancing questions: {str(e)}")
@app.post("/enhance-hypotheses", response_model=HypothesesResponse)
async def enhance_hypotheses(request: HypothesesRequest):
"""
Enhance hypotheses using the appropriate specialized agent.
Args:
request (HypothesesRequest): The request containing the query, hypotheses, and research goal.
Returns:
HypothesesResponse: Enhanced hypotheses with explanations and context.
"""
try:
logger.info(f"Enhancing hypotheses for query: {request.query}")
# Convert Pydantic models to dictionaries
hypotheses = [h.dict() for h in request.hypotheses]
result = post_hypothesis_generation_hook(request.query, hypotheses, request.research_goal)
return result
except Exception as e:
logger.error(f"Error enhancing hypotheses: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error enhancing hypotheses: {str(e)}")
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Process a chat message using the knowledge graph.
Args:
request (ChatRequest): The chat message and search type.
Returns:
ChatResponse: The response from the knowledge graph.
"""
try:
logger.info(f"Processing chat message: {request.message}")
result = await search_knowledge_graph(request.message, request.searchType)
return result
except Exception as e:
logger.error(f"Error processing chat message: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing chat message: {str(e)}")
class AgentConfig(BaseModel):
type: str
provider: str
model: str
apiKey: str
class ResearchConfig(BaseModel):
provider: str
model: str
apiKey: str
class ConfigRequest(BaseModel):
agent: AgentConfig
research: ResearchConfig
class ConfigResponse(BaseModel):
success: bool
message: str
@app.post("/config", response_model=ConfigResponse)
async def update_config(request: ConfigRequest):
"""
Update the agent and research configuration.
Args:
request (ConfigRequest): The configuration to update.
Returns:
ConfigResponse: The response indicating success or failure.
"""
try:
logger.info(f"Updating configuration for agent type: {request.agent.type}")
# Set environment variables for the agent
os.environ["SELECTED_AGENT"] = request.agent.type
os.environ["AGENT_PROVIDER"] = request.agent.provider
os.environ["AGENT_MODEL"] = request.agent.model
os.environ["AGENT_API_KEY"] = request.agent.apiKey
# Set environment variables for research
os.environ["RESEARCH_PROVIDER"] = request.research.provider
os.environ["RESEARCH_MODEL"] = request.research.model
os.environ["RESEARCH_API_KEY"] = request.research.apiKey
return ConfigResponse(success=True, message="Configuration updated successfully")
except Exception as e:
logger.error(f"Error updating configuration: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error updating configuration: {str(e)}")
@app.post("/generate-hypotheses")
async def generate_hypotheses_endpoint(request: HypothesesRequest):
"""
Generate and enhance hypotheses using the AI Co-Scientist.
Args:
request (HypothesesRequest): The request containing the query, hypotheses, and research goal.
Returns:
HypothesesResponse: Enhanced hypotheses with explanations and context.
"""
try:
logger.info(f"Generating hypotheses for query: {request.query}")
result = await generate_and_enhance_hypotheses(
query=request.query,
research_goal=request.research_goal,
num_hypotheses=len(request.hypotheses) if request.hypotheses else 3
)
return result
except Exception as e:
logger.error(f"Error generating hypotheses: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error generating hypotheses: {str(e)}")
@app.get("/active-hypotheses")
async def active_hypotheses_endpoint():
"""
Get the active hypotheses from the AI Co-Scientist.
Returns:
List[HypothesisItem]: The active hypotheses.
"""
try:
logger.info("Getting active hypotheses")
hypotheses = get_active_hypotheses()
return hypotheses
except Exception as e:
logger.error(f"Error getting active hypotheses: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error getting active hypotheses: {str(e)}")
@app.post("/generate-questions-from-hypothesis")
async def generate_questions_from_hypothesis_endpoint(request: HypothesesRequest):
"""
Generate research questions from a hypothesis.
Args:
request (HypothesesRequest): The request containing the hypothesis and query.
Returns:
Dict[str, Any]: The generated research questions.
"""
try:
logger.info(f"Generating questions from hypothesis: {request.hypotheses[0].title if request.hypotheses else 'Unknown'}")
if not request.hypotheses or len(request.hypotheses) == 0:
raise HTTPException(status_code=400, detail="Hypothesis is required")
# Convert the first hypothesis to a dictionary
hypothesis = request.hypotheses[0].dict()
# Generate questions from the hypothesis
questions = await generate_questions_from_hypothesis(
hypothesis=hypothesis,
query=request.query
)
return {"questions": questions}
except Exception as e:
logger.error(f"Error generating questions from hypothesis: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error generating questions from hypothesis: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)