import os import re import json import requests import pandas as pd from pathlib import Path from typing import Optional, Union, Dict, Any, List from dotenv import load_dotenv load_dotenv() # Simple tool-based agent without LangGraph for now class SimpleAgent: """Simple agent with tool capabilities""" def __init__(self, llm): self.llm = llm self.tools = { 'search_web': self.search_web, 'search_wikipedia': self.search_wikipedia, 'execute_python': self.execute_python, 'read_excel_file': self.read_excel_file, 'read_text_file': self.read_text_file, } def search_web(self, query: str) -> str: """Search the web using DuckDuckGo for current information.""" try: search_url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1" response = requests.get(search_url, timeout=10) if response.status_code == 200: data = response.json() results = [] if data.get("AbstractText"): results.append(f"Abstract: {data['AbstractText']}") if data.get("RelatedTopics"): for topic in data["RelatedTopics"][:3]: if isinstance(topic, dict) and topic.get("Text"): results.append(f"Related: {topic['Text']}") if results: return "\n".join(results) else: return f"Search performed for '{query}' but no specific results found." else: return f"Search failed with status code {response.status_code}" except Exception as e: return f"Search error: {str(e)}" def search_wikipedia(self, query: str) -> str: """Search Wikipedia for factual information.""" try: search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + query.replace(" ", "_") response = requests.get(search_url, timeout=10) if response.status_code == 200: data = response.json() extract = data.get("extract", "") if extract: return f"Wikipedia: {extract[:500]}..." else: return f"Wikipedia page found for '{query}' but no extract available." else: return f"Wikipedia search failed for '{query}'" except Exception as e: return f"Wikipedia search error: {str(e)}" def execute_python(self, code: str) -> str: """Execute Python code and return the result.""" try: import io import sys safe_globals = { '__builtins__': { 'print': print, 'len': len, 'str': str, 'int': int, 'float': float, 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, 'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs, 'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, }, 'math': __import__('math'), 'json': __import__('json'), } old_stdout = sys.stdout sys.stdout = mystdout = io.StringIO() try: exec(code, safe_globals) output = mystdout.getvalue() finally: sys.stdout = old_stdout return output if output else "Code executed successfully (no output)" except Exception as e: return f"Python execution error: {str(e)}" def read_excel_file(self, file_path: str, sheet_name: Optional[str] = None) -> str: """Read an Excel file and return its contents.""" try: file_path_obj = Path(file_path) if not file_path_obj.exists(): return f"Error: File not found at {file_path}" if sheet_name and sheet_name.isdigit(): sheet_name = int(sheet_name) elif sheet_name is None: sheet_name = 0 df = pd.read_excel(file_path, sheet_name=sheet_name) if len(df) > 20: result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" result += "First 10 rows:\n" + df.head(10).to_string(index=False) result += f"\n\n... ({len(df) - 20} rows omitted) ...\n\n" result += "Last 10 rows:\n" + df.tail(10).to_string(index=False) else: result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" result += df.to_string(index=False) return result except Exception as e: return f"Error reading Excel file: {str(e)}" def read_text_file(self, file_path: str) -> str: """Read a text file and return its contents.""" try: file_path_obj = Path(file_path) if not file_path_obj.exists(): return f"Error: File not found at {file_path}" encodings = ['utf-8', 'utf-16', 'iso-8859-1', 'cp1252'] for encoding in encodings: try: with open(file_path_obj, 'r', encoding=encoding) as f: content = f.read() return f"File content ({encoding} encoding):\n\n{content}" except UnicodeDecodeError: continue return f"Error: Could not decode file with any standard encoding" except Exception as e: return f"Error reading file: {str(e)}" def run(self, question: str) -> str: """Run the agent with tool usage""" # First, try to answer directly direct_response = self.llm(f""" Question: {question} Think step by step. If this question requires: - Web search for current information, say "NEED_SEARCH: " - Mathematical calculation, say "NEED_PYTHON: " - Wikipedia lookup, say "NEED_WIKI: " - File analysis (if file path mentioned), say "NEED_FILE: " Otherwise, provide a direct answer. Your response:""") # Check if tools are needed if "NEED_SEARCH:" in direct_response: search_query = direct_response.split("NEED_SEARCH:")[1].strip() search_result = self.search_web(search_query) return self.llm(f"Question: {question}\n\nSearch results: {search_result}\n\nFinal answer:") elif "NEED_PYTHON:" in direct_response: code = direct_response.split("NEED_PYTHON:")[1].strip() exec_result = self.execute_python(code) return self.llm(f"Question: {question}\n\nCalculation result: {exec_result}\n\nFinal answer:") elif "NEED_WIKI:" in direct_response: wiki_query = direct_response.split("NEED_WIKI:")[1].strip() wiki_result = self.search_wikipedia(wiki_query) return self.llm(f"Question: {question}\n\nWikipedia info: {wiki_result}\n\nFinal answer:") elif "NEED_FILE:" in direct_response: file_path = direct_response.split("NEED_FILE:")[1].strip() if file_path.endswith(('.xlsx', '.xls')): file_content = self.read_excel_file(file_path) else: file_content = self.read_text_file(file_path) return self.llm(f"Question: {question}\n\nFile content: {file_content}\n\nFinal answer:") else: return direct_response class OpenRouterLLM: """Simple OpenRouter LLM wrapper""" def __init__(self, model: str = "deepseek/deepseek-v3.1-terminus"): self.api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("my_key") self.model = model self.base_url = "https://openrouter.ai/api/v1/chat/completions" def __call__(self, prompt: str, max_tokens: int = 1500, temperature: float = 0.1) -> str: """Make API call to OpenRouter""" if not self.api_key or not self.api_key.startswith('sk-or-v1-'): return "Error: Invalid OpenRouter API key" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload = { "model": self.model, "messages": [ { "role": "system", "content": "You are a helpful AI assistant. Provide direct, accurate answers. For GAIA evaluation, be precise and concise." }, { "role": "user", "content": prompt } ], "temperature": temperature, "max_tokens": max_tokens, } try: response = requests.post(self.base_url, headers=headers, json=payload, timeout=30) if response.status_code != 200: return f"API Error: {response.status_code}" result = response.json() if "choices" in result and len(result["choices"]) > 0: answer = result["choices"][0]["message"]["content"].strip() return self._clean_answer(answer) else: return "Error: No response content received" except Exception as e: return f"Error: {str(e)}" def _clean_answer(self, answer: str) -> str: """Clean the answer for GAIA evaluation""" answer = answer.strip() # Remove common prefixes prefixes = [ "Answer:", "The answer is:", "Final answer:", "Result:", "Solution:", "Based on", "Therefore", "In conclusion" ] for prefix in prefixes: if answer.lower().startswith(prefix.lower()): answer = answer[len(prefix):].strip() if answer.startswith(':'): answer = answer[1:].strip() break # Remove quotes and periods from short answers if len(answer.split()) <= 3: answer = answer.strip('"\'.') return answer class GaiaAgent: """Simple tool-based agent for GAIA tasks""" def __init__(self): print("Initializing GaiaAgent with OpenRouter DeepSeek...") # Initialize the LLM self.llm = OpenRouterLLM(model="deepseek/deepseek-v3.1-terminus") # Initialize the agent with tools self.agent = SimpleAgent(self.llm) print("GaiaAgent initialized successfully!") def __call__(self, task_id: str, question: str) -> str: """Process a question and return the answer""" try: print(f"Processing task {task_id}: {question[:100]}...") # Check if there are file references in the question enhanced_question = self._enhance_question_with_file_analysis(question) # Run the agent answer = self.agent.run(enhanced_question) # Clean up the answer clean_answer = self._clean_final_answer(answer) print(f"Agent answer for {task_id}: {clean_answer}") return clean_answer except Exception as e: error_msg = f"Agent error: {str(e)}" print(f"Error processing task {task_id}: {error_msg}") return error_msg def _enhance_question_with_file_analysis(self, question: str) -> str: """Check if question mentions files and enhance accordingly""" # Look for file path mentions in the question file_patterns = [ r'/tmp/gaia_cached_files/[^\s]+', r'saved locally at:\s*([^\s]+)', r'file.*?\.xlsx?', r'file.*?\.csv', r'file.*?\.txt' ] for pattern in file_patterns: matches = re.findall(pattern, question, re.IGNORECASE) if matches: # File found, the agent will handle it automatically break return question def _clean_final_answer(self, answer: str) -> str: """Final cleaning of the answer""" answer = answer.strip() # Look for final answer pattern if "final answer:" in answer.lower(): parts = answer.lower().split("final answer:") if len(parts) > 1: answer = answer.split(":")[-1].strip() # Remove common unnecessary phrases cleanup_phrases = [ "based on the", "according to", "the answer is", "therefore", "in conclusion", "as a result", "so the answer is" ] for phrase in cleanup_phrases: if answer.lower().startswith(phrase): answer = answer[len(phrase):].strip() break # Clean up formatting answer = answer.strip('.,;:"\'') return answer