Spaces:
Runtime error
Runtime error
""" | |
API for MCP agents. | |
This module provides a FastAPI application to expose the MCP agent functionality. | |
""" | |
import logging | |
import asyncio | |
import os | |
from typing import List, Dict, Any, Optional, Literal | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from deep_research_integration import post_question_generation_hook, post_hypothesis_generation_hook | |
from cognee_integration import search_knowledge_graph | |
from ai_co_scientist_integration import generate_and_enhance_hypotheses, get_active_hypotheses | |
from hypothesis_questions import generate_questions_from_hypothesis | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger("mcp-agents-api") | |
# Create FastAPI app | |
app = FastAPI( | |
title="MCP Agents API", | |
description="API for specialized MCP agents to enhance the research cycle", | |
version="1.0.0" | |
) | |
# Define request and response models | |
class ResearchQuestionsRequest(BaseModel): | |
query: str | |
questions: List[str] | |
domain_context: Optional[str] = None | |
class ResearchQuestionsResponse(BaseModel): | |
domain: str | |
enhanced_questions: Dict[str, Any] | |
class HypothesisItem(BaseModel): | |
id: str | |
title: str | |
text: str | |
novelty_review: Optional[str] = None | |
feasibility_review: Optional[str] = None | |
elo_score: Optional[float] = None | |
review_comments: Optional[List[str]] = None | |
references: Optional[List[str]] = None | |
is_active: Optional[bool] = None | |
parent_ids: Optional[List[str]] = None | |
class HypothesesRequest(BaseModel): | |
query: str | |
hypotheses: List[HypothesisItem] | |
research_goal: str | |
class HypothesesResponse(BaseModel): | |
domain: str | |
enhanced_hypotheses: Dict[str, Any] | |
class ChatRequest(BaseModel): | |
message: str | |
searchType: Literal['GRAPH_COMPLETION', 'COMPLETION', 'INSIGHTS'] = 'GRAPH_COMPLETION' | |
class ChatResponse(BaseModel): | |
response: str | |
sources: Optional[List[Dict[str, Any]]] = None | |
async def root(): | |
"""Root endpoint.""" | |
return {"message": "MCP Agents API is running"} | |
async def enhance_questions(request: ResearchQuestionsRequest): | |
""" | |
Enhance research questions using the appropriate specialized agent. | |
Args: | |
request (ResearchQuestionsRequest): The request containing the query and questions. | |
Returns: | |
ResearchQuestionsResponse: Enhanced research questions with explanations and context. | |
""" | |
try: | |
logger.info(f"Enhancing questions for query: {request.query}") | |
result = post_question_generation_hook(request.query, request.questions) | |
return result | |
except Exception as e: | |
logger.error(f"Error enhancing questions: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error enhancing questions: {str(e)}") | |
async def enhance_hypotheses(request: HypothesesRequest): | |
""" | |
Enhance hypotheses using the appropriate specialized agent. | |
Args: | |
request (HypothesesRequest): The request containing the query, hypotheses, and research goal. | |
Returns: | |
HypothesesResponse: Enhanced hypotheses with explanations and context. | |
""" | |
try: | |
logger.info(f"Enhancing hypotheses for query: {request.query}") | |
# Convert Pydantic models to dictionaries | |
hypotheses = [h.dict() for h in request.hypotheses] | |
result = post_hypothesis_generation_hook(request.query, hypotheses, request.research_goal) | |
return result | |
except Exception as e: | |
logger.error(f"Error enhancing hypotheses: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error enhancing hypotheses: {str(e)}") | |
async def chat(request: ChatRequest): | |
""" | |
Process a chat message using the knowledge graph. | |
Args: | |
request (ChatRequest): The chat message and search type. | |
Returns: | |
ChatResponse: The response from the knowledge graph. | |
""" | |
try: | |
logger.info(f"Processing chat message: {request.message}") | |
result = await search_knowledge_graph(request.message, request.searchType) | |
return result | |
except Exception as e: | |
logger.error(f"Error processing chat message: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error processing chat message: {str(e)}") | |
class AgentConfig(BaseModel): | |
type: str | |
provider: str | |
model: str | |
apiKey: str | |
class ResearchConfig(BaseModel): | |
provider: str | |
model: str | |
apiKey: str | |
class ConfigRequest(BaseModel): | |
agent: AgentConfig | |
research: ResearchConfig | |
class ConfigResponse(BaseModel): | |
success: bool | |
message: str | |
async def update_config(request: ConfigRequest): | |
""" | |
Update the agent and research configuration. | |
Args: | |
request (ConfigRequest): The configuration to update. | |
Returns: | |
ConfigResponse: The response indicating success or failure. | |
""" | |
try: | |
logger.info(f"Updating configuration for agent type: {request.agent.type}") | |
# Set environment variables for the agent | |
os.environ["SELECTED_AGENT"] = request.agent.type | |
os.environ["AGENT_PROVIDER"] = request.agent.provider | |
os.environ["AGENT_MODEL"] = request.agent.model | |
os.environ["AGENT_API_KEY"] = request.agent.apiKey | |
# Set environment variables for research | |
os.environ["RESEARCH_PROVIDER"] = request.research.provider | |
os.environ["RESEARCH_MODEL"] = request.research.model | |
os.environ["RESEARCH_API_KEY"] = request.research.apiKey | |
return ConfigResponse(success=True, message="Configuration updated successfully") | |
except Exception as e: | |
logger.error(f"Error updating configuration: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error updating configuration: {str(e)}") | |
async def generate_hypotheses_endpoint(request: HypothesesRequest): | |
""" | |
Generate and enhance hypotheses using the AI Co-Scientist. | |
Args: | |
request (HypothesesRequest): The request containing the query, hypotheses, and research goal. | |
Returns: | |
HypothesesResponse: Enhanced hypotheses with explanations and context. | |
""" | |
try: | |
logger.info(f"Generating hypotheses for query: {request.query}") | |
result = await generate_and_enhance_hypotheses( | |
query=request.query, | |
research_goal=request.research_goal, | |
num_hypotheses=len(request.hypotheses) if request.hypotheses else 3 | |
) | |
return result | |
except Exception as e: | |
logger.error(f"Error generating hypotheses: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating hypotheses: {str(e)}") | |
async def active_hypotheses_endpoint(): | |
""" | |
Get the active hypotheses from the AI Co-Scientist. | |
Returns: | |
List[HypothesisItem]: The active hypotheses. | |
""" | |
try: | |
logger.info("Getting active hypotheses") | |
hypotheses = get_active_hypotheses() | |
return hypotheses | |
except Exception as e: | |
logger.error(f"Error getting active hypotheses: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error getting active hypotheses: {str(e)}") | |
async def generate_questions_from_hypothesis_endpoint(request: HypothesesRequest): | |
""" | |
Generate research questions from a hypothesis. | |
Args: | |
request (HypothesesRequest): The request containing the hypothesis and query. | |
Returns: | |
Dict[str, Any]: The generated research questions. | |
""" | |
try: | |
logger.info(f"Generating questions from hypothesis: {request.hypotheses[0].title if request.hypotheses else 'Unknown'}") | |
if not request.hypotheses or len(request.hypotheses) == 0: | |
raise HTTPException(status_code=400, detail="Hypothesis is required") | |
# Convert the first hypothesis to a dictionary | |
hypothesis = request.hypotheses[0].dict() | |
# Generate questions from the hypothesis | |
questions = await generate_questions_from_hypothesis( | |
hypothesis=hypothesis, | |
query=request.query | |
) | |
return {"questions": questions} | |
except Exception as e: | |
logger.error(f"Error generating questions from hypothesis: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error generating questions from hypothesis: {str(e)}") | |
async def health_check(): | |
"""Health check endpoint.""" | |
return {"status": "healthy"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8080) | |