""" Integration module for connecting MCP agents to the research cycle. """ import asyncio import json import logging from enum import Enum from typing import List, Dict, Any, Optional from academic_research_agent import process_academic_research_questions, enhance_academic_hypotheses from problem_solving_agent import process_problem_solving_questions, enhance_problem_solving_hypotheses from health_research_agent import process_health_research_questions, enhance_health_hypotheses # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("mcp-integration") class ResearchDomain(Enum): """Enum for research domains.""" ACADEMIC = "academic" PROBLEM_SOLVING = "problem_solving" HEALTH = "health" GENERAL = "general" # Default/fallback async def process_research_questions( research_questions: List[str], domain: ResearchDomain = ResearchDomain.GENERAL, domain_context: Optional[str] = None ) -> Dict[str, Any]: """ Process research questions using the appropriate specialized agent based on the domain. Args: research_questions (List[str]): List of research questions to process. domain (ResearchDomain): The research domain to use for processing. domain_context (str, optional): Additional domain context to provide. Returns: Dict[str, Any]: Enhanced research questions with explanations and context. """ logger.info(f"Processing research questions with {domain.value} agent") if domain == ResearchDomain.ACADEMIC: return await process_academic_research_questions(research_questions, domain_context) elif domain == ResearchDomain.PROBLEM_SOLVING: return await process_problem_solving_questions(research_questions, domain_context) elif domain == ResearchDomain.HEALTH: return await process_health_research_questions(research_questions, domain_context) else: # Default to academic research agent for general queries logger.info("Using academic research agent as default for general domain") return await process_academic_research_questions(research_questions, domain_context) async def enhance_hypotheses( hypotheses: List[Dict[str, Any]], research_goal: str, domain: ResearchDomain = ResearchDomain.GENERAL ) -> Dict[str, Any]: """ Enhance hypotheses using the appropriate specialized agent based on the domain. Args: hypotheses (List[Dict[str, Any]]): List of hypotheses to enhance. research_goal (str): The research goal. domain (ResearchDomain): The research domain to use for processing. Returns: Dict[str, Any]: Enhanced hypotheses with explanations and context. """ logger.info(f"Enhancing hypotheses with {domain.value} agent") if domain == ResearchDomain.ACADEMIC: return await enhance_academic_hypotheses(hypotheses, research_goal) elif domain == ResearchDomain.PROBLEM_SOLVING: return await enhance_problem_solving_hypotheses(hypotheses, research_goal) elif domain == ResearchDomain.HEALTH: return await enhance_health_hypotheses(hypotheses, research_goal) else: # Default to academic research agent for general queries logger.info("Using academic research agent as default for general domain") return await enhance_academic_hypotheses(hypotheses, research_goal) def detect_research_domain(query: str) -> ResearchDomain: """ Detect the research domain based on the query content. Args: query (str): The research query or goal. Returns: ResearchDomain: The detected research domain. """ query_lower = query.lower() # Health-related keywords health_keywords = [ "health", "medical", "clinical", "patient", "disease", "treatment", "therapy", "drug", "medicine", "hospital", "doctor", "nurse", "diagnosis", "symptom", "cancer", "diabetes", "covid", "pandemic", "vaccine", "immunity", "mental health", "psychology", "wellbeing" ] # Problem-solving keywords problem_solving_keywords = [ "problem", "solution", "optimize", "improve", "efficiency", "system", "design", "implement", "strategy", "plan", "decision", "policy", "management", "organization", "business", "industry", "technology", "innovation", "sustainability", "climate change", "energy", "transportation" ] # Academic research keywords academic_keywords = [ "theory", "framework", "literature", "study", "analysis", "methodology", "experiment", "data", "evidence", "hypothesis", "research", "publication", "journal", "university", "academic", "scholar", "discipline", "field", "philosophy", "history", "sociology", "anthropology", "linguistics" ] # Count keyword matches for each domain health_count = sum(1 for keyword in health_keywords if keyword in query_lower) problem_solving_count = sum(1 for keyword in problem_solving_keywords if keyword in query_lower) academic_count = sum(1 for keyword in academic_keywords if keyword in query_lower) # Determine the domain with the most matches max_count = max(health_count, problem_solving_count, academic_count) if max_count == 0: return ResearchDomain.GENERAL elif health_count == max_count: return ResearchDomain.HEALTH elif problem_solving_count == max_count: return ResearchDomain.PROBLEM_SOLVING else: return ResearchDomain.ACADEMIC if __name__ == "__main__": # Example usage async def main(): # Example research questions research_questions = [ "What are the effects of climate change on biodiversity in tropical rainforests?", "How do socioeconomic factors influence access to higher education?", "What role does artificial intelligence play in modern scientific discovery?" ] # Detect domain query = "Investigating the impact of climate change on biodiversity and ecosystem services" domain = detect_research_domain(query) print(f"Detected domain: {domain.value}") # Process questions result = await process_research_questions(research_questions, domain) print(json.dumps(result, indent=2)) asyncio.run(main())