| | """ |
| | GAIA Agent Implementation Template |
| | |
| | This file contains the core logic for your GAIA agent. |
| | You can customize this implementation with your own approach: |
| | |
| | 1. Simple prompt-based approach |
| | 2. Tool-using agent with function calling |
| | 3. Multi-step reasoning agent |
| | 4. Agent with external API calls |
| | 5. Custom reasoning chains |
| | |
| | Choose the approach that best fits your skills and goals! |
| | """ |
| |
|
| | import requests |
| | from typing import Dict, List, Any |
| | import json |
| | import re |
| | import time |
| | from urllib.parse import quote_plus |
| |
|
| | class BaseGAIAAgent: |
| | """Base class for GAIA agents""" |
| | |
| | def __init__(self): |
| | self.api_base_url = "https://gaia-benchmark.vercel.app/api" |
| | |
| | def download_file(self, task_id: str) -> str: |
| | """Download a file associated with a task""" |
| | try: |
| | response = requests.get(f"{self.api_base_url}/files/{task_id}") |
| | response.raise_for_status() |
| | return response.text |
| | except Exception as e: |
| | print(f"Error downloading file for task {task_id}: {e}") |
| | return "" |
| | |
| | def generate_answer(self, question: Dict) -> str: |
| | """ |
| | Generate an answer for a given question. |
| | Override this method with your implementation. |
| | """ |
| | raise NotImplementedError("Subclasses must implement generate_answer") |
| |
|
| | class SimplePromptAgent(BaseGAIAAgent): |
| | """ |
| | Agentic agent that can search, reason, and provide intelligent answers for any question. |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__() |
| | self.search_cache = {} |
| | self.reasoning_steps = [] |
| | |
| | def generate_answer(self, question: Dict) -> str: |
| | task_id = question.get("task_id", "") |
| | question_text = question.get("question", "") |
| | |
| | print(f"🤖 Agent processing: {question_text[:100]}...") |
| | |
| | |
| | file_content = self.download_file(task_id) |
| | |
| | |
| | question_analysis = self._analyze_question(question_text) |
| | |
| | |
| | search_results = self._search_for_information(question_text, question_analysis) |
| | |
| | |
| | reasoning = self._reason_about_question(question_text, search_results, file_content, question_analysis) |
| | |
| | |
| | answer = self._generate_final_answer(question_text, reasoning, search_results, file_content) |
| | |
| | print(f"✅ Agent answer: {answer[:100]}...") |
| | return answer |
| | |
| | def _analyze_question(self, question: str) -> Dict[str, Any]: |
| | """Analyze the question to understand what we need to find""" |
| | question_lower = question.lower() |
| | |
| | analysis = { |
| | "type": "general", |
| | "entities": [], |
| | "time_period": None, |
| | "numbers": [], |
| | "keywords": [], |
| | "requires_search": True, |
| | "question_words": [] |
| | } |
| | |
| | |
| | entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question) |
| | analysis["entities"] = entities |
| | |
| | |
| | time_patterns = [ |
| | r'(\d{4})\s*-\s*(\d{4})', |
| | r'between\s+(\d{4})\s+and\s+(\d{4})', |
| | r'in\s+(\d{4})', |
| | r'(\d{4})\s*to\s*(\d{4})', |
| | ] |
| | |
| | for pattern in time_patterns: |
| | match = re.search(pattern, question_lower) |
| | if match: |
| | if len(match.groups()) == 2: |
| | analysis["time_period"] = (int(match.group(1)), int(match.group(2))) |
| | else: |
| | analysis["time_period"] = (int(match.group(1)), int(match.group(1))) |
| | break |
| | |
| | |
| | numbers = re.findall(r'\d+', question) |
| | analysis["numbers"] = [int(n) for n in numbers] |
| | |
| | |
| | question_words = ["how", "what", "when", "where", "who", "which", "why"] |
| | found_question_words = [word for word in question_words if word in question_lower] |
| | analysis["question_words"] = found_question_words |
| | |
| | if "how many" in question_lower: |
| | analysis["type"] = "count" |
| | elif "when" in question_lower or "date" in question_lower: |
| | analysis["type"] = "temporal" |
| | elif "where" in question_lower or "location" in question_lower: |
| | analysis["type"] = "spatial" |
| | elif "what" in question_lower or "which" in question_lower: |
| | analysis["type"] = "factual" |
| | elif "who" in question_lower: |
| | analysis["type"] = "person" |
| | elif "why" in question_lower: |
| | analysis["type"] = "reasoning" |
| | |
| | |
| | stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "how", "many", "what", "when", "where", "who", "which", "why", "were", "was", "is", "are", "between", "and", "included", "can", "you", "use", "latest", "version", "english", "wikipedia"} |
| | words = re.findall(r'\b\w+\b', question_lower) |
| | keywords = [word for word in words if word not in stop_words and len(word) > 2] |
| | analysis["keywords"] = keywords |
| | |
| | return analysis |
| | |
| | def _search_for_information(self, question: str, analysis: Dict) -> List[Dict]: |
| | """Search for relevant information using multiple sources""" |
| | search_results = [] |
| | |
| | |
| | search_queries = self._generate_search_queries(question, analysis) |
| | |
| | for query in search_queries: |
| | try: |
| | |
| | wiki_results = self._search_wikipedia(query) |
| | if wiki_results: |
| | search_results.extend(wiki_results) |
| | |
| | |
| | web_results = self._search_web(query) |
| | if web_results: |
| | search_results.extend(web_results) |
| | |
| | except Exception as e: |
| | print(f"Search error for query '{query}': {e}") |
| | |
| | return search_results |
| | |
| | def _generate_search_queries(self, question: str, analysis: Dict) -> List[str]: |
| | """Generate effective search queries""" |
| | queries = [] |
| | |
| | |
| | if analysis["entities"]: |
| | for entity in analysis["entities"]: |
| | if analysis["time_period"]: |
| | start_year, end_year = analysis["time_period"] |
| | queries.append(f"{entity} {start_year} {end_year}") |
| | queries.append(f"{entity} timeline {start_year} {end_year}") |
| | else: |
| | queries.append(entity) |
| | |
| | |
| | if analysis["keywords"]: |
| | queries.extend(analysis["keywords"][:3]) |
| | |
| | |
| | if analysis["type"] == "count": |
| | if analysis["entities"]: |
| | for entity in analysis["entities"]: |
| | queries.append(f"{entity} count") |
| | queries.append(f"how many {entity}") |
| | |
| | |
| | queries.append(question) |
| | |
| | return list(set(queries)) |
| | |
| | def _search_wikipedia(self, query: str) -> List[Dict]: |
| | """Search Wikipedia for information""" |
| | try: |
| | |
| | search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" |
| | page_title = query.replace(" ", "_") |
| | |
| | response = requests.get(f"{search_url}{page_title}", timeout=10) |
| | if response.status_code == 200: |
| | data = response.json() |
| | return [{ |
| | "source": "Wikipedia", |
| | "title": data.get("title", ""), |
| | "content": data.get("extract", ""), |
| | "url": data.get("content_urls", {}).get("desktop", {}).get("page", "") |
| | }] |
| | except Exception as e: |
| | print(f"Wikipedia search error: {e}") |
| | |
| | return [] |
| | |
| | def _search_web(self, query: str) -> List[Dict]: |
| | """Simulate web search (in a real implementation, use Google Search API)""" |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | query_lower = query.lower() |
| | |
| | |
| | if "count" in query_lower or "how many" in query_lower: |
| | return [{ |
| | "source": "Web Search", |
| | "title": f"Search results for: {query}", |
| | "content": f"Searching for count information related to: {query}", |
| | "url": "https://example.com/search" |
| | }] |
| | |
| | return [{ |
| | "source": "Web Search", |
| | "title": f"Search results for: {query}", |
| | "content": f"Searching for information related to: {query}", |
| | "url": "https://example.com/search" |
| | }] |
| | |
| | def _reason_about_question(self, question: str, search_results: List[Dict], file_content: str, analysis: Dict) -> Dict: |
| | """Reason about the question using available information""" |
| | reasoning = { |
| | "steps": [], |
| | "key_facts": [], |
| | "confidence": 0.0, |
| | "answer_type": analysis["type"], |
| | "extracted_info": {} |
| | } |
| | |
| | |
| | for result in search_results: |
| | reasoning["key_facts"].append(result["content"]) |
| | |
| | |
| | if file_content: |
| | reasoning["steps"].append("Analyzed provided file content") |
| | reasoning["key_facts"].append(file_content) |
| | |
| | |
| | if analysis["type"] == "count": |
| | reasoning["steps"].append("Counting relevant items in the specified context") |
| | count = self._extract_count_from_facts(reasoning["key_facts"], analysis) |
| | reasoning["extracted_info"]["count"] = count |
| | reasoning["confidence"] = 0.7 if count is not None else 0.3 |
| | |
| | elif analysis["type"] == "temporal": |
| | reasoning["steps"].append("Identifying temporal information") |
| | dates = self._extract_dates_from_facts(reasoning["key_facts"], analysis) |
| | reasoning["extracted_info"]["dates"] = dates |
| | reasoning["confidence"] = 0.6 if dates else 0.3 |
| | |
| | elif analysis["type"] == "spatial": |
| | reasoning["steps"].append("Identifying location information") |
| | locations = self._extract_locations_from_facts(reasoning["key_facts"]) |
| | reasoning["extracted_info"]["locations"] = locations |
| | reasoning["confidence"] = 0.6 if locations else 0.3 |
| | |
| | else: |
| | reasoning["steps"].append("Extracting general information") |
| | reasoning["confidence"] = 0.5 if reasoning["key_facts"] else 0.2 |
| | |
| | return reasoning |
| | |
| | def _extract_count_from_facts(self, facts: List[str], analysis: Dict) -> int: |
| | """Extract count information from facts""" |
| | for fact in facts: |
| | |
| | numbers = re.findall(r'\d+', fact) |
| | if numbers: |
| | |
| | return int(numbers[0]) |
| | return None |
| | |
| | def _extract_dates_from_facts(self, facts: List[str], analysis: Dict) -> List[str]: |
| | """Extract date information from facts""" |
| | dates = [] |
| | for fact in facts: |
| | |
| | years = re.findall(r'\b(19|20)\d{2}\b', fact) |
| | dates.extend(years) |
| | return list(set(dates)) |
| | |
| | def _extract_locations_from_facts(self, facts: List[str]) -> List[str]: |
| | """Extract location information from facts""" |
| | locations = [] |
| | for fact in facts: |
| | |
| | potential_locations = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', fact) |
| | locations.extend(potential_locations) |
| | return list(set(locations)) |
| | |
| | def _generate_final_answer(self, question: str, reasoning: Dict, search_results: List[Dict], file_content: str) -> str: |
| | """Generate the final answer based on reasoning""" |
| | |
| | |
| | if reasoning["answer_type"] == "count" and reasoning["extracted_info"].get("count") is not None: |
| | return str(reasoning["extracted_info"]["count"]) |
| | |
| | |
| | if reasoning["answer_type"] == "temporal" and reasoning["extracted_info"].get("dates"): |
| | dates = reasoning["extracted_info"]["dates"] |
| | if len(dates) == 1: |
| | return dates[0] |
| | else: |
| | return f"Relevant dates: {', '.join(dates)}" |
| | |
| | |
| | if reasoning["answer_type"] == "spatial" and reasoning["extracted_info"].get("locations"): |
| | locations = reasoning["extracted_info"]["locations"] |
| | if len(locations) == 1: |
| | return locations[0] |
| | else: |
| | return f"Relevant locations: {', '.join(locations[:3])}" |
| | |
| | |
| | if reasoning["key_facts"]: |
| | |
| | best_fact = reasoning["key_facts"][0] |
| | if len(best_fact) > 200: |
| | best_fact = best_fact[:200] + "..." |
| | return f"Based on my research: {best_fact}" |
| | |
| | |
| | return "I need more information to provide an accurate answer to this question." |
| |
|
| | class ToolUsingAgent(BaseGAIAAgent): |
| | """ |
| | Agent that can use tools and function calling. |
| | More advanced approach for intermediate users. |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__() |
| | self.tools = { |
| | "web_search": self.web_search, |
| | "calculator": self.calculator, |
| | "file_reader": self.file_reader, |
| | "date_parser": self.date_parser |
| | } |
| | |
| | def web_search(self, query: str) -> str: |
| | """Simulate web search (implement with actual search API)""" |
| | |
| | return f"Search results for: {query}" |
| | |
| | def calculator(self, expression: str) -> str: |
| | """Evaluate mathematical expressions""" |
| | try: |
| | |
| | result = eval(expression) |
| | return str(result) |
| | except: |
| | return "Error: Invalid expression" |
| | |
| | def file_reader(self, content: str) -> str: |
| | """Extract information from file content""" |
| | return f"Processed file content: {content[:100]}..." |
| | |
| | def date_parser(self, date_string: str) -> str: |
| | """Parse and format dates""" |
| | |
| | return f"Parsed date: {date_string}" |
| | |
| | def generate_answer(self, question: Dict) -> str: |
| | task_id = question.get("task_id", "") |
| | question_text = question.get("question", "") |
| | |
| | |
| | file_content = self.download_file(task_id) |
| | |
| | |
| | needed_tools = self.analyze_question(question_text, file_content) |
| | |
| | |
| | results = [] |
| | for tool_name, args in needed_tools: |
| | if tool_name in self.tools: |
| | result = self.tools[tool_name](*args) |
| | results.append(f"{tool_name}: {result}") |
| | |
| | |
| | answer = self.synthesize_answer(question_text, results, file_content) |
| | |
| | return answer |
| | |
| | def analyze_question(self, question: str, file_content: str) -> List[tuple]: |
| | """Analyze question to determine which tools to use""" |
| | tools_needed = [] |
| | |
| | |
| | if any(word in question.lower() for word in ["calculate", "math", "sum", "total", "percentage"]): |
| | tools_needed.append(("calculator", ["2+2"])) |
| | |
| | if any(word in question.lower() for word in ["search", "find", "look up"]): |
| | tools_needed.append(("web_search", [question])) |
| | |
| | if file_content: |
| | tools_needed.append(("file_reader", [file_content])) |
| | |
| | return tools_needed |
| | |
| | def synthesize_answer(self, question: str, tool_results: List[str], file_content: str) -> str: |
| | """Synthesize final answer from tool results""" |
| | if not tool_results: |
| | return "I was unable to find relevant information to answer this question." |
| | |
| | |
| | combined_results = " ".join(tool_results) |
| | |
| | |
| | if "calculator:" in combined_results: |
| | |
| | calc_match = re.search(r'calculator: (.+?)(?:\s|$)', combined_results) |
| | if calc_match: |
| | return f"The calculated result is: {calc_match.group(1)}" |
| | |
| | if "web_search:" in combined_results: |
| | |
| | search_match = re.search(r'web_search: (.+?)(?:\s|$)', combined_results) |
| | if search_match: |
| | return f"Based on search results: {search_match.group(1)}" |
| | |
| | if "file_reader:" in combined_results: |
| | |
| | file_match = re.search(r'file_reader: (.+?)(?:\s|$)', combined_results) |
| | if file_match: |
| | return f"Based on file analysis: {file_match.group(1)}" |
| | |
| | |
| | return f"Based on available tools and information: {combined_results[:200]}..." |
| |
|
| | class MultiStepReasoningAgent(BaseGAIAAgent): |
| | """ |
| | Agent that breaks down complex questions into multiple reasoning steps. |
| | Advanced approach for experienced users. |
| | """ |
| | |
| | def generate_answer(self, question: Dict) -> str: |
| | task_id = question.get("task_id", "") |
| | question_text = question.get("question", "") |
| | |
| | |
| | file_content = self.download_file(task_id) |
| | |
| | |
| | question_type = self.analyze_question_type(question_text) |
| | |
| | |
| | relevant_info = self.extract_relevant_info(question_text, file_content) |
| | |
| | |
| | reasoning_steps = self.generate_reasoning_steps(question_text, question_type, relevant_info) |
| | |
| | |
| | answer = self.generate_final_answer(reasoning_steps, question_text) |
| | |
| | return answer |
| | |
| | def analyze_question_type(self, question: str) -> str: |
| | """Determine the type of question""" |
| | question_lower = question.lower() |
| | |
| | if any(word in question_lower for word in ["calculate", "compute", "sum", "total"]): |
| | return "calculation" |
| | elif any(word in question_lower for word in ["when", "date", "time"]): |
| | return "temporal" |
| | elif any(word in question_lower for word in ["where", "location", "place"]): |
| | return "spatial" |
| | elif any(word in question_lower for word in ["what", "which", "who"]): |
| | return "factual" |
| | else: |
| | return "general" |
| | |
| | def extract_relevant_info(self, question: str, file_content: str) -> Dict[str, Any]: |
| | """Extract relevant information from question and files""" |
| | info = { |
| | "question_keywords": self.extract_keywords(question), |
| | "file_data": file_content if file_content else "", |
| | "numbers": self.extract_numbers(question + " " + file_content), |
| | "dates": self.extract_dates(question + " " + file_content) |
| | } |
| | return info |
| | |
| | def extract_keywords(self, text: str) -> List[str]: |
| | """Extract important keywords from text""" |
| | |
| | words = re.findall(r'\b\w+\b', text.lower()) |
| | |
| | stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"} |
| | keywords = [word for word in words if word not in stop_words and len(word) > 2] |
| | return keywords |
| | |
| | def extract_numbers(self, text: str) -> List[float]: |
| | """Extract numbers from text""" |
| | numbers = re.findall(r'\d+\.?\d*', text) |
| | return [float(num) for num in numbers] |
| | |
| | def extract_dates(self, text: str) -> List[str]: |
| | """Extract dates from text""" |
| | |
| | date_patterns = [ |
| | r'\d{1,2}/\d{1,2}/\d{2,4}', |
| | r'\d{4}-\d{2}-\d{2}', |
| | r'\w+ \d{1,2},? \d{4}' |
| | ] |
| | dates = [] |
| | for pattern in date_patterns: |
| | dates.extend(re.findall(pattern, text)) |
| | return dates |
| | |
| | def generate_reasoning_steps(self, question: str, question_type: str, info: Dict) -> List[str]: |
| | """Generate reasoning steps based on question type""" |
| | steps = [] |
| | |
| | if question_type == "calculation": |
| | steps = [ |
| | "Identify the mathematical operation needed", |
| | "Extract numerical values from the question", |
| | "Perform the calculation step by step", |
| | "Verify the result makes sense" |
| | ] |
| | elif question_type == "temporal": |
| | steps = [ |
| | "Identify the time-related information", |
| | "Parse dates and times mentioned", |
| | "Determine the temporal relationship", |
| | "Calculate the required time period" |
| | ] |
| | elif question_type == "spatial": |
| | steps = [ |
| | "Identify location-related information", |
| | "Extract geographical references", |
| | "Determine spatial relationships", |
| | "Provide the specific location" |
| | ] |
| | else: |
| | steps = [ |
| | "Understand what the question is asking", |
| | "Identify relevant information sources", |
| | "Extract key facts and details", |
| | "Synthesize the information into an answer" |
| | ] |
| | |
| | return steps |
| | |
| | def generate_final_answer(self, reasoning_steps: List[str], question: str) -> str: |
| | """Generate final answer based on reasoning steps""" |
| | question_lower = question.lower() |
| | |
| | |
| | if "how many" in question_lower: |
| | return f"Based on the reasoning steps ({', '.join(reasoning_steps)}), I need to count the relevant items. However, I require more specific information to provide an accurate count." |
| | |
| | elif "when" in question_lower or "date" in question_lower: |
| | return f"Following the reasoning steps ({', '.join(reasoning_steps)}), I need to identify temporal information. The answer depends on the specific dates mentioned in the question context." |
| | |
| | elif "where" in question_lower or "location" in question_lower: |
| | return f"Based on the reasoning approach ({', '.join(reasoning_steps)}), I need to determine the spatial location. The answer requires geographical or location-specific information." |
| | |
| | elif "what" in question_lower or "which" in question_lower: |
| | return f"Using the reasoning framework ({', '.join(reasoning_steps)}), I need to identify the specific information being requested. The answer depends on the context and available data." |
| | |
| | elif "who" in question_lower: |
| | return f"Following the reasoning process ({', '.join(reasoning_steps)}), I need to identify the person or entity being referenced. The answer requires information about the specific individual mentioned." |
| | |
| | else: |
| | return f"Based on the multi-step reasoning approach ({', '.join(reasoning_steps)}), I need to analyze the question systematically. The answer depends on the specific information available in the context." |
| |
|
| | |
| | def create_agent(agent_type: str = "simple") -> BaseGAIAAgent: |
| | """ |
| | Create an agent of the specified type. |
| | |
| | Args: |
| | agent_type: One of "simple", "tool_using", or "multi_step" |
| | |
| | Returns: |
| | An instance of the specified agent type |
| | """ |
| | if agent_type == "simple": |
| | return SimplePromptAgent() |
| | elif agent_type == "tool_using": |
| | return ToolUsingAgent() |
| | elif agent_type == "multi_step": |
| | return MultiStepReasoningAgent() |
| | else: |
| | raise ValueError(f"Unknown agent type: {agent_type}") |
| |
|
| | |
| | if __name__ == "__main__": |
| | |
| | agent = create_agent("simple") |
| | |
| | test_question = { |
| | "task_id": "test_001", |
| | "question": "What is 2 + 2?" |
| | } |
| | |
| | answer = agent.generate_answer(test_question) |
| | print(f"Question: {test_question['question']}") |
| | print(f"Answer: {answer}") |