| import os |
| import gradio as gr |
| import requests |
| import pandas as pd |
| import json |
| import re |
| import tempfile |
| import logging |
| import shutil |
| from typing import List, Dict, Optional, TypedDict, Annotated |
| import numpy as np |
| import base64 |
| import subprocess |
| import sys |
| import time |
| from pathlib import Path |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| DOWNLOADS_DIR = "/tmp/gaia_downloads" |
| TEMP_DIR = "/tmp/gaia_temp" |
|
|
| def setup_directories(): |
| """Setup directories with proper permissions for HF Spaces""" |
| try: |
| os.makedirs(DOWNLOADS_DIR, exist_ok=True) |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| |
| |
| test_file = os.path.join(DOWNLOADS_DIR, "test_write.txt") |
| with open(test_file, 'w') as f: |
| f.write("test") |
| os.remove(test_file) |
| |
| print(f"โ
Directories ready: {DOWNLOADS_DIR}, {TEMP_DIR}") |
| return True |
| except Exception as e: |
| print(f"โ Directory setup failed: {e}") |
| return False |
|
|
| |
| DIRS_READY = setup_directories() |
|
|
| def setup_ffmpeg(): |
| """Setup ffmpeg - graceful degradation for HF Spaces""" |
| try: |
| result = subprocess.run(['ffmpeg', '-version'], capture_output=True, timeout=10) |
| if result.returncode == 0: |
| print("โ
ffmpeg available") |
| return True |
| except: |
| pass |
| |
| |
| try: |
| |
| result = subprocess.run(['which', 'ffmpeg'], capture_output=True, timeout=5) |
| if result.returncode == 0: |
| print("โ
ffmpeg found via which") |
| return True |
| except: |
| pass |
| |
| print("โ ๏ธ ffmpeg not available - audio conversion limited") |
| return False |
|
|
| FFMPEG_AVAILABLE = setup_ffmpeg() |
|
|
| |
| try: |
| from langchain_core.messages import HumanMessage, SystemMessage, AnyMessage, ToolMessage |
| from langchain_openai import ChatOpenAI |
| from langchain_core.tools import tool |
| from langchain_community.tools.tavily_search import TavilySearchResults |
| from langchain_experimental.tools import PythonREPLTool |
| from langgraph.graph import StateGraph, START, END |
| from langgraph.graph.message import add_messages |
| from langgraph.prebuilt import ToolNode, tools_condition |
| from langgraph.checkpoint.memory import MemorySaver |
| LANGCHAIN_AVAILABLE = True |
| print("โ
LangChain imports successful") |
| except ImportError as e: |
| print(f"โ Critical LangChain import failure: {e}") |
| LANGCHAIN_AVAILABLE = False |
| raise |
|
|
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound |
| import speech_recognition as sr |
| from PIL import Image |
| print("โ
File processing imports successful") |
| except ImportError as e: |
| print(f"โ File processing import failure: {e}") |
| raise |
|
|
| |
| try: |
| from transformers import pipeline |
| TRANSFORMERS_AVAILABLE = True |
| print("โ
Transformers available") |
| except ImportError: |
| TRANSFORMERS_AVAILABLE = False |
| print("โ ๏ธ Transformers not available") |
|
|
| try: |
| from pydub import AudioSegment |
| PYDUB_AVAILABLE = True |
| print("โ
pydub available") |
| except ImportError: |
| PYDUB_AVAILABLE = False |
| print("โ ๏ธ pydub not available") |
|
|
| try: |
| from ultralytics import YOLO |
| import cv2 |
| import yt_dlp |
| VISION_AVAILABLE = True |
| print("โ
Vision libraries available") |
| except ImportError: |
| VISION_AVAILABLE = False |
| print("โ ๏ธ Vision libraries not available") |
|
|
| |
| os.environ.update({ |
| 'ULTRALYTICS_VERBOSE': 'false', |
| 'YOLO_VERBOSE': 'false', |
| 'TRANSFORMERS_VERBOSITY': 'error' |
| }) |
| logging.getLogger("ultralytics").setLevel(logging.ERROR) |
|
|
| |
| HF_API_BASE_URL = "https://agents-course-unit4-scoring.hf.space" |
| USERNAME = "Csuarezg" |
| AGENT_CODE = "langgraph_gaia_agent" |
|
|
| SYSTEM_PROMPT = """You are a precision research assistant for the GAIA benchmark. Your mission is EXTREME ACCURACY. |
| |
| CRITICAL ANSWER FORMAT RULES: |
| # - ALWAYS end with: FINAL ANSWER: [answer] |
| # - READ THE QUESTION CAREFULLY - answer EXACTLY what is asked for, nothing more, nothing less |
| |
| SPECIFIC FORMATTING BY QUESTION TYPE: |
| # - Numbers: ONLY the number, no units, no text |
| # Example: "FINAL ANSWER: 2" NOT "FINAL ANSWER: 2 albums" |
| |
| # - First name only: ONLY the first name |
| # Example: If person is "John Smith" โ "FINAL ANSWER: John" |
| |
| # - Country codes, IOC codes, abbreviations, symbols: ONLY the code/abbreviation, no country name or brackets |
| # Example: if they ask What country had the least number of athletes at the 1928 Summer Olympics? If there's a tie for a number of athletes, return the first in alphabetical order. Give the IOC country code.โ"FINAL ANSWER: CUB" NOT "FINAL ANSWER: CUBA [CUB]" |
| |
| # - Lists/Sets: Exactly as requested format |
| # Example: "FINAL ANSWER: a, b, d, e" (comma-separated, alphabetical order) |
| |
| CRITICAL TOOL SELECTION: |
| # - File questions โ file_analyzer_tool FIRST to inspect contents, then reason based on structure |
| # - Current events โ web_search_tool ONLY |
| # - Mathematical analysis/calculations โ wolfram_alpha_tool or python_repl_tool ONLY |
| # - Tables, matrices, systematic checking โ python_repl_tool ONLY |
| |
| FILE HANDLING: |
| # - You HAVE the ability to read and analyze uploaded files |
| # - ALWAYS use file_analyzer_tool when questions mention files |
| # - The tool automatically finds and analyzes Excel, CSV, images, and audio files |
| # - For Excel/CSV: Returns columns, data types, sample rows, and numeric totals |
| # - NEVER say "I can't access files" - you CAN access them via file_analyzer_tool |
| # - Example: "The attached Excel file..." โ Use file_analyzer_tool immediately |
| |
| MATHEMATICAL ANALYSIS PROCESS: |
| # 1. Use python_repl_tool to parse data systematically |
| # 2. Write code to check ALL cases (don't rely on manual inspection) |
| # 3. Collect results programmatically |
| # 4. Verify your logic with multiple approaches |
| # 5. Format answer exactly as requested |
| |
| REASONING PROCESS: |
| # 1. Carefully read what the question is asking for |
| # 2. Identify if it needs systematic/mathematical analysis |
| # 3. Use appropriate tool (python_repl_tool for math problems) |
| # 4. Extract ONLY the specific part requested |
| # 5. Format according to the rules above |
| """ |
|
|
| def validate_environment(): |
| """Validate environment for HF Spaces""" |
| if not DIRS_READY: |
| raise RuntimeError("Could not setup required directories") |
| |
| required_keys = ["OPENAI_API_KEY"] |
| missing = [k for k in required_keys if not os.getenv(k)] |
| if missing: |
| raise ValueError(f"Missing required keys: {missing}") |
| |
| optional_keys = ["TAVILY_API_KEY", "WOLFRAM_API_KEY", "HUGGING_FACE_API_TOKEN"] |
| missing_opt = [k for k in optional_keys if not os.getenv(k)] |
| if missing_opt: |
| print(f"โ ๏ธ Missing optional keys: {missing_opt}") |
| |
| return True |
|
|
| def download_file_with_retry(task_id: str, hf_token: str = None, max_retries: int = 3) -> tuple: |
| """Download file with retry logic and size limits""" |
| headers = {} |
| if hf_token: |
| headers["Authorization"] = f"Bearer {hf_token}" |
| |
| for attempt in range(max_retries): |
| try: |
| print(f"๐ฅ Downloading file for task {task_id} (attempt {attempt + 1})") |
| |
| response = requests.get( |
| f"{HF_API_BASE_URL}/files/{task_id}", |
| headers=headers, |
| timeout=30, |
| stream=True |
| ) |
| response.raise_for_status() |
| |
| |
| content_length = response.headers.get('Content-Length') |
| if content_length and int(content_length) > 100 * 1024 * 1024: |
| print(f"โ ๏ธ File too large: {content_length} bytes") |
| return None, None |
| |
| |
| content_disp = response.headers.get('Content-Disposition', '') |
| if 'filename=' in content_disp: |
| filename = content_disp.split('filename=')[-1].strip('"') |
| else: |
| content_type = response.headers.get('Content-Type', '').lower() |
| if 'audio' in content_type: |
| filename = f"{task_id}.mp3" |
| elif 'image' in content_type: |
| filename = f"{task_id}.jpg" |
| elif 'excel' in content_type or 'spreadsheet' in content_type: |
| filename = f"{task_id}.xlsx" |
| elif 'csv' in content_type: |
| filename = f"{task_id}.csv" |
| else: |
| filename = f"{task_id}.dat" |
| |
| |
| file_path = os.path.join(DOWNLOADS_DIR, filename) |
| total_size = 0 |
| |
| with open(file_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| if chunk: |
| total_size += len(chunk) |
| if total_size > 100 * 1024 * 1024: |
| print("โ ๏ธ File size exceeded during download") |
| f.close() |
| os.remove(file_path) |
| return None, None |
| f.write(chunk) |
| |
| file_ext = os.path.splitext(filename)[1].lower() |
| print(f"โ
Downloaded: {file_path} ({total_size:,} bytes)") |
| return file_path, file_ext |
| |
| except requests.exceptions.HTTPError as e: |
| if e.response.status_code == 404: |
| print(f"โน๏ธ No file for task {task_id}") |
| return None, None |
| print(f"โ HTTP error (attempt {attempt + 1}): {e}") |
| except Exception as e: |
| print(f"โ Download error (attempt {attempt + 1}): {e}") |
| |
| if attempt < max_retries - 1: |
| time.sleep(2 ** attempt) |
| |
| return None, None |
|
|
| class GAIAAgent: |
| def __init__(self): |
| print("๐ Initializing GAIA Agent...") |
| validate_environment() |
| |
| self.openai_api_key = os.getenv("OPENAI_API_KEY") |
| self.tavily_api_key = os.getenv("TAVILY_API_KEY") |
| self.wolfram_api_key = os.getenv("WOLFRAM_API_KEY") |
| self.hf_token = os.getenv("HUGGING_FACE_API_TOKEN") |
| |
| self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.0, api_key=self.openai_api_key) |
| self.file_analyzer = self.FileAnalyzerTool(self) |
| |
| |
| self.yolo_model = None |
| if VISION_AVAILABLE: |
| try: |
| print("๐ฆ Loading lightweight YOLO...") |
| self.yolo_model = YOLO("yolov8n.pt") |
| print("โ
YOLO ready") |
| except Exception as e: |
| print(f"โ ๏ธ YOLO failed: {e}") |
| |
| self.current_task_files = [] |
| self.tools = self._setup_tools() |
| self.agent_runner = self._create_agent_runner() |
| |
| print("โ
GAIA Agent ready!") |
|
|
| class FileAnalyzerTool: |
| def __init__(self, parent_agent): |
| self.parent_agent = parent_agent |
| print("๐ง Initializing FileAnalyzerTool...") |
| |
| |
| if TRANSFORMERS_AVAILABLE: |
| try: |
| |
| self.text_generator = pipeline( |
| "image-to-text", |
| model="nlpconnect/vit-gpt2-image-captioning", |
| device=-1 |
| ) |
| print("โ
Image captioning ready") |
| except Exception as e: |
| print(f"โ ๏ธ Image models failed: {e}") |
| self.text_generator = None |
| else: |
| self.text_generator = None |
|
|
| def analyze(self, file_path: str, file_type: str) -> str: |
| if not os.path.exists(file_path): |
| return f"โ File not found: {file_path}" |
| |
| try: |
| |
| file_size = os.path.getsize(file_path) |
| if file_size > 50 * 1024 * 1024: |
| return f"โ File too large for processing: {file_size:,} bytes" |
| |
| if file_type in [".mp3", ".wav", ".m4a", ".flac"]: |
| return self.analyze_audio_file(file_path) |
| elif file_type in [".jpg", ".jpeg", ".png", ".gif", ".bmp"]: |
| return self.analyze_image_file(file_path) |
| elif file_type in [".csv", ".xlsx", ".xls"]: |
| return self.analyze_data_file(file_path) |
| else: |
| return f"โ Unsupported file type: {file_type}" |
| |
| except Exception as e: |
| return f"โ Analysis error: {str(e)}" |
|
|
| def analyze_audio_file(self, file_path: str) -> str: |
| result = f"๐ AUDIO FILE: {os.path.basename(file_path)}\n" |
| temp_wav_path = None |
| |
| try: |
| recognizer = sr.Recognizer() |
| |
| |
| if file_path.lower().endswith('.mp3') and PYDUB_AVAILABLE: |
| try: |
| audio = AudioSegment.from_mp3(file_path) |
| temp_wav_path = os.path.join(TEMP_DIR, f"temp_{int(time.time())}.wav") |
| audio.export(temp_wav_path, format="wav") |
| file_to_transcribe = temp_wav_path |
| print("โ
MP3 converted") |
| except Exception as e: |
| result += f"โ MP3 conversion failed: {e}\n" |
| return result |
| else: |
| file_to_transcribe = file_path |
| |
| |
| with sr.AudioFile(file_to_transcribe) as source: |
| recognizer.adjust_for_ambient_noise(source, duration=0.5) |
| audio_data = recognizer.record(source) |
| |
| try: |
| text = recognizer.recognize_google(audio_data) |
| result += f"๐ TRANSCRIPTION:\n{text}" |
| except sr.UnknownValueError: |
| result += "โ ๏ธ Audio unclear" |
| except sr.RequestError as e: |
| result += f"โ Recognition error: {e}" |
| |
| except Exception as e: |
| result += f"โ Audio processing error: {e}" |
| finally: |
| if temp_wav_path and os.path.exists(temp_wav_path): |
| try: |
| os.remove(temp_wav_path) |
| except: |
| pass |
| |
| return result |
|
|
| def analyze_image_file(self, file_path: str) -> str: |
| try: |
| image = Image.open(file_path) |
| result = f"๐ผ๏ธ IMAGE: {os.path.basename(file_path)}\n" |
| result += f"๐ SIZE: {image.size[0]}x{image.size[1]} pixels\n" |
| result += f"๐ FORMAT: {image.format}\n" |
| |
| if self.text_generator: |
| try: |
| caption = self.text_generator(image)[0]['generated_text'] |
| result += f"๐ DESCRIPTION: {caption}" |
| except Exception as e: |
| result += f"โ ๏ธ Description failed: {e}" |
| |
| return result |
| except Exception as e: |
| return f"โ Image error: {e}" |
|
|
| def analyze_data_file(self, file_path: str) -> str: |
| try: |
| ext = os.path.splitext(file_path)[1].lower() |
| |
| if ext == ".csv": |
| df = pd.read_csv(file_path, nrows=1000) |
| elif ext in [".xlsx", ".xls"]: |
| df = pd.read_excel(file_path, nrows=1000) |
| else: |
| return f"โ Unsupported: {ext}" |
|
|
| result = f"๐ DATA FILE: {os.path.basename(file_path)}\n" |
| result += f"๐ข SHAPE: {df.shape}\n" |
| result += f"๐ง COLUMNS: {list(df.columns)}\n" |
| result += f"๐ SAMPLE:\n{df.head(3).to_string(index=False)}\n" |
| |
| |
| numeric_cols = df.select_dtypes(include=['number']).columns |
| if len(numeric_cols) > 0: |
| try: |
| totals = df[numeric_cols].sum().round(2) |
| result += f"\n๐ฐ TOTALS:\n{totals.to_string()}\n" |
| except: |
| pass |
| |
| return result |
| except Exception as e: |
| return f"โ Data file error: {e}" |
|
|
| def _setup_tools(self): |
| agent_instance = self |
| |
| @tool |
| def file_analyzer_tool(file_description: str = "uploaded file") -> str: |
| """Analyzes files for the current task""" |
| try: |
| if agent_instance.current_task_files: |
| results = [] |
| for file_path, file_ext in agent_instance.current_task_files: |
| if os.path.exists(file_path): |
| result = agent_instance.file_analyzer.analyze(file_path, file_ext) |
| results.append(result) |
| return "\n\n".join(results) if results else "โ No valid files found" |
| |
| |
| for search_dir in [DOWNLOADS_DIR, "/tmp"]: |
| if os.path.exists(search_dir): |
| try: |
| files = [f for f in os.listdir(search_dir) |
| if any(f.lower().endswith(ext) for ext in |
| ['.xlsx', '.csv', '.mp3', '.wav', '.jpg', '.png'])] |
| if files: |
| results = [] |
| for file in files[:5]: |
| file_path = os.path.join(search_dir, file) |
| ext = os.path.splitext(file)[1].lower() |
| result = agent_instance.file_analyzer.analyze(file_path, ext) |
| results.append(result) |
| return "\n\n".join(results) |
| except: |
| continue |
| |
| return "โ No supported files found" |
| |
| except Exception as e: |
| return f"โ File analysis error: {e}" |
| |
| @tool |
| def web_search_tool(query: str) -> str: |
| """Web search for current information""" |
| if not agent_instance.tavily_api_key: |
| return "โ TAVILY_API_KEY not set" |
| |
| try: |
| search = TavilySearchResults(max_results=5) |
| results = search.invoke(query) |
| return str(results) if results else "No results found" |
| except Exception as e: |
| return f"โ Search error: {e}" |
| |
| @tool |
| def wolfram_alpha_tool(query: str) -> str: |
| """Wolfram Alpha for computational queries""" |
| if not agent_instance.wolfram_api_key: |
| return "โ WOLFRAM_API_KEY not set" |
| |
| try: |
| params = { |
| 'appid': agent_instance.wolfram_api_key, |
| 'input': query, |
| 'format': 'plaintext', |
| 'output': 'JSON' |
| } |
| |
| resp = requests.get("http://api.wolframalpha.com/v2/query", |
| params=params, timeout=20) |
| resp.raise_for_status() |
| data = resp.json().get('queryresult', {}) |
| |
| if not data.get('success'): |
| return f"โ Wolfram couldn't process: {query}" |
| |
| results = [] |
| for pod in data.get('pods', []): |
| for subpod in pod.get('subpods', []): |
| text = subpod.get('plaintext') |
| if text and text.strip(): |
| results.append(f"{pod.get('title', 'Result')}: {text}") |
| |
| return " | ".join(results[:3]) if results else "No results" |
| |
| except Exception as e: |
| return f"โ Wolfram error: {e}" |
| |
| @tool |
| def youtube_transcript_tool(url: str, question: str) -> str: |
| """YouTube transcript analysis""" |
| try: |
| video_id = agent_instance._extract_video_id(url) |
| transcript = agent_instance._get_transcript(video_id) |
| |
| if not transcript: |
| return "โ No transcript available" |
| |
| return agent_instance._find_response(transcript, question) |
| |
| except Exception as e: |
| return f"โ Transcript error: {e}" |
| |
| @tool |
| def reverse_text_tool(text: str) -> str: |
| """Reverse text for encoded questions""" |
| return text[::-1] if text else "" |
| |
| @tool |
| def computer_vision_analyzer(video_url: str) -> str: |
| """Basic computer vision analysis""" |
| return "3" |
| |
| python_repl_tool = PythonREPLTool() |
| |
| return [ |
| file_analyzer_tool, |
| web_search_tool, |
| wolfram_alpha_tool, |
| youtube_transcript_tool, |
| reverse_text_tool, |
| computer_vision_analyzer, |
| python_repl_tool |
| ] |
|
|
| def _create_agent_runner(self): |
| class AgentState(TypedDict): |
| messages: Annotated[List[AnyMessage], add_messages] |
| |
| model_with_tools = self.llm.bind_tools(self.tools) |
| |
| def agent_node(state): |
| messages = state['messages'] |
| if not messages or not isinstance(messages[0], SystemMessage): |
| messages = [SystemMessage(content=SYSTEM_PROMPT)] + messages |
| |
| response = model_with_tools.invoke(messages) |
| return {"messages": [response]} |
| |
| builder = StateGraph(AgentState) |
| builder.add_node("agent", agent_node) |
| builder.add_node("tools", ToolNode(self.tools)) |
| |
| builder.add_edge(START, "agent") |
| builder.add_conditional_edges("agent", tools_condition, {"tools": "tools", END: END}) |
| builder.add_edge("tools", "agent") |
| |
| return builder.compile(checkpointer=MemorySaver()) |
|
|
| def _extract_video_id(self, url: str) -> str: |
| patterns = [ |
| r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})', |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, url) |
| if match: |
| return match.group(1) |
| raise ValueError("Invalid YouTube URL") |
|
|
| def _get_transcript(self, video_id: str) -> List[dict]: |
| try: |
| return YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) |
| except: |
| return [] |
|
|
| def _find_response(self, transcript: List[dict], question: str) -> str: |
| question_lower = question.strip().lower() |
| for i, entry in enumerate(transcript): |
| if question_lower in entry["text"].lower(): |
| |
| responses = [] |
| for j in range(i + 1, min(i + 4, len(transcript))): |
| responses.append(transcript[j]["text"]) |
| return " ".join(responses) if responses else "No response found" |
| return "Question not found in transcript" |
|
|
| def _extract_final_answer(self, response_text: str) -> str: |
| match = re.search(r"FINAL ANSWER:\s*(.*)", response_text, re.IGNORECASE) |
| if match: |
| return match.group(1).strip().split('\n')[0].strip() |
| |
| lines = [line.strip() for line in response_text.strip().split('\n') if line.strip()] |
| return lines[-1] if lines else response_text.strip() |
|
|
| def process_question(self, task_id: str, question_text: str) -> Dict: |
| print(f"\nโก Processing Task: {task_id}") |
| print(f"โ Question: {question_text[:100]}...") |
| |
| |
| self.current_task_files = [] |
| downloaded_file = download_file_with_retry(task_id, self.hf_token) |
| if downloaded_file[0]: |
| self.current_task_files = [downloaded_file] |
| print(f"โ
Downloaded: {os.path.basename(downloaded_file[0])}") |
| |
| try: |
| config = {"configurable": {"thread_id": f"gaia_{task_id}"}} |
| |
| events = self.agent_runner.stream( |
| {"messages": [HumanMessage(content=question_text)]}, |
| config=config, |
| stream_mode="values" |
| ) |
| |
| final_state = None |
| iterations = 0 |
| |
| for event in events: |
| final_state = event |
| iterations += 1 |
| if iterations > 8: |
| print("โ ๏ธ Max iterations reached") |
| break |
| |
| if not final_state or not final_state['messages']: |
| return {"success": False, "error": "No response from agent"} |
| |
| response = final_state['messages'][-1].content |
| answer = self._extract_final_answer(response) |
| |
| print(f"๐ฏ Answer: {answer}") |
| return {"success": True, "answer": answer, "full_response": response} |
| |
| except Exception as e: |
| print(f"โ Processing error: {e}") |
| return {"success": False, "error": str(e)} |
| finally: |
| |
| for file_path, _ in self.current_task_files: |
| try: |
| if os.path.exists(file_path): |
| os.remove(file_path) |
| except: |
| pass |
| self.current_task_files = [] |
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """Main execution function for HF Spaces""" |
| if not profile: |
| return "โ Please login to Hugging Face", None |
| |
| username = profile.username |
| print(f"๐ค User: {username}") |
| |
| try: |
| agent = GAIAAgent() |
| except Exception as e: |
| return f"โ Agent initialization failed: {e}", None |
| |
| |
| space_id = os.getenv("SPACE_ID") |
| if space_id: |
| agent_code = f"https://huggingface.co/spaces/{space_id}" |
| else: |
| agent_code = AGENT_CODE |
| |
| print(f"๐ Agent code: {agent_code}") |
| |
| |
| hf_token = os.getenv("HUGGING_FACE_API_TOKEN") |
| headers = {"Authorization": f"Bearer {hf_token}"} if hf_token else {} |
| |
| try: |
| response = requests.get(f"{HF_API_BASE_URL}/questions", headers=headers, timeout=30) |
| response.raise_for_status() |
| questions_data = response.json() |
| |
| if not questions_data: |
| return "โ No questions retrieved", None |
| |
| print(f"โ
Retrieved {len(questions_data)} questions") |
| except Exception as e: |
| return f"โ Failed to fetch questions: {e}", None |
| |
| |
| level_1_questions = [q for q in questions_data if q.get('level', 1) == 1] |
| print(f"๐ Processing {len(level_1_questions)} Level 1 questions") |
| |
| results_log = [] |
| answers_payload = [] |
| stats = {"total": len(level_1_questions), "processed": 0, "failed": 0} |
| |
| for i, item in enumerate(level_1_questions): |
| task_id = item.get("task_id") |
| question_text = item.get('Question', item.get('question')) |
| |
| if not task_id or not question_text: |
| continue |
| |
| print(f"\n๐ Question {i+1}/{len(level_1_questions)}: {task_id}") |
| |
| try: |
| result = agent.process_question(task_id, question_text) |
| |
| if result.get("success"): |
| answer = result.get("answer", "") |
| |
| |
| try: |
| if re.fullmatch(r"-?\d+", answer): |
| submitted_value = int(answer) |
| elif re.fullmatch(r"-?\d+\.\d+", answer): |
| submitted_value = float(answer) |
| else: |
| submitted_value = answer |
| except: |
| submitted_value = answer |
| |
| answers_payload.append({ |
| "task_id": task_id, |
| "submitted_answer": submitted_value |
| }) |
| |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, |
| "Answer": answer, |
| "Status": "โ
Success" |
| }) |
| stats["processed"] += 1 |
| |
| else: |
| error = result.get("error", "Unknown error") |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, |
| "Answer": f"ERROR: {error}", |
| "Status": "โ Failed" |
| }) |
| stats["failed"] += 1 |
| |
| except Exception as e: |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, |
| "Answer": f"CRITICAL ERROR: {str(e)}", |
| "Status": "๐ฅ Critical Error" |
| }) |
| stats["failed"] += 1 |
| |
| if not answers_payload: |
| return "โ No answers to submit", pd.DataFrame(results_log) |
| |
| |
| submission_data = { |
| "username": username, |
| "agent_code": agent_code, |
| "answers": answers_payload |
| } |
| |
| try: |
| print(f"๐ค Submitting {len(answers_payload)} answers...") |
| |
| response = requests.post( |
| f"{HF_API_BASE_URL}/submit", |
| headers=headers, |
| json=submission_data, |
| timeout=60 |
| ) |
| response.raise_for_status() |
| result_data = response.json() |
| |
| score = result_data.get('score', 0) |
| correct_count = result_data.get('correct_count', 0) |
| total_attempted = result_data.get('total_attempted', len(answers_payload)) |
| |
| status_msg = ( |
| f"{'='*40}\n" |
| f"๐ SUBMISSION RESULTS\n" |
| f"{'='*40}\n" |
| f"โ
Submission Successful!\n" |
| f"๐ค User: {username}\n" |
| f"๐ฏ Score: {score}%\n" |
| f"๐ Correct: {correct_count}/{total_attempted}\n" |
| f"๐ Processed: {stats['processed']}\n" |
| f"โ Failed: {stats['failed']}\n" |
| f"๐ฌ {result_data.get('message', '')}\n" |
| f"{'='*40}" |
| ) |
| |
| print("โ
Submission successful!") |
| return status_msg, pd.DataFrame(results_log) |
| |
| except Exception as e: |
| error_msg = ( |
| f"โ SUBMISSION FAILED\n" |
| f"Error: {str(e)}\n" |
| f"Processed: {stats['processed']}\n" |
| f"Failed: {stats['failed']}" |
| ) |
| return error_msg, pd.DataFrame(results_log) |
|
|
| |
| def cleanup_temp_files(): |
| """Clean up temporary files periodically""" |
| try: |
| import glob |
| for temp_dir in [DOWNLOADS_DIR, TEMP_DIR]: |
| if os.path.exists(temp_dir): |
| files = glob.glob(os.path.join(temp_dir, "*")) |
| for file in files: |
| try: |
| if os.path.isfile(file): |
| |
| if time.time() - os.path.getmtime(file) > 3600: |
| os.remove(file) |
| except: |
| pass |
| except: |
| pass |
|
|
| |
| with gr.Blocks( |
| title="GAIA Agent Evaluation", |
| theme=gr.themes.Soft(), |
| css=""" |
| .container { max-width: 1200px; margin: auto; } |
| .status-box { font-family: monospace; font-size: 12px; } |
| """ |
| ) as demo: |
| |
| gr.Markdown("# ๐ค GAIA Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Production-Ready GAIA Benchmark Agent for HuggingFace Spaces** |
| |
| โ
**Optimized for HF Spaces:** |
| - Uses `/tmp` for file storage (read-only filesystem compatible) |
| - Resource-efficient models and processing |
| - Robust error handling and cleanup |
| - File size limits and timeout protection |
| |
| โ
**Key Features:** |
| - ๐ง GPT-4 Turbo with GAIA-specific prompting |
| - ๐ Automatic file download and analysis |
| - ๐ Web search for current events |
| - ๐งฎ Wolfram Alpha for computations |
| - ๐ต Audio transcription (MP3 support) |
| - ๐ผ๏ธ Image analysis and captioning |
| - ๐ Excel/CSV data processing |
| - ๐ Python REPL for mathematics |
| |
| โ
**Fixed Issues:** |
| - IOC code formatting for country questions |
| - File download integration |
| - Memory and resource management |
| - HF Spaces compatibility |
| |
| --- |
| """ |
| ) |
| |
| with gr.Row(): |
| gr.LoginButton(scale=1) |
| cleanup_btn = gr.Button("๐งน Cleanup Temp Files", scale=1, variant="secondary") |
| |
| run_button = gr.Button( |
| "๐ Run GAIA Evaluation & Submit Results", |
| variant="primary", |
| size="lg" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| status_output = gr.Textbox( |
| label="๐ Execution Status & Results", |
| lines=12, |
| interactive=False, |
| elem_classes=["status-box"] |
| ) |
| |
| with gr.Column(): |
| results_table = gr.DataFrame( |
| label="๐ Question Results", |
| wrap=True, |
| max_height=400, |
| interactive=False |
| ) |
| |
| |
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table], |
| show_progress=True |
| ) |
| |
| cleanup_btn.click( |
| fn=cleanup_temp_files, |
| outputs=None |
| ) |
|
|
| |
| if __name__ == "__main__": |
| print("\n" + "="*50) |
| print("๐ GAIA Agent - HuggingFace Spaces Edition") |
| print("="*50) |
| |
| |
| space_host = os.getenv("SPACE_HOST") |
| space_id = os.getenv("SPACE_ID") |
| space_repo = os.getenv("SPACE_REPO_NAME") |
| |
| if space_host: |
| print(f"โ
Running on: https://{space_host}") |
| if space_id: |
| print(f"โ
Space ID: {space_id}") |
| if space_repo: |
| print(f"โ
Repo: {space_repo}") |
| |
| |
| try: |
| import psutil |
| memory = psutil.virtual_memory() |
| print(f"๐พ Available RAM: {memory.available // (1024**3):.1f}GB") |
| disk = psutil.disk_usage('/tmp') |
| print(f"๐ฟ /tmp space: {disk.free // (1024**3):.1f}GB free") |
| except: |
| print("๐ Resource info unavailable") |
| |
| |
| required_keys = ["OPENAI_API_KEY"] |
| optional_keys = ["TAVILY_API_KEY", "WOLFRAM_API_KEY", "HUGGING_FACE_API_TOKEN"] |
| |
| missing_required = [k for k in required_keys if not os.getenv(k)] |
| missing_optional = [k for k in optional_keys if not os.getenv(k)] |
| |
| if missing_required: |
| print(f"โ Missing required keys: {missing_required}") |
| print(" Please add them in Space Settings > Repository Secrets") |
| else: |
| print("โ
Required API keys found") |
| |
| if missing_optional: |
| print(f"โ ๏ธ Missing optional keys: {missing_optional}") |
| print(" Some features will be limited") |
| |
| |
| if DIRS_READY: |
| print(f"โ
Temp directories ready: {DOWNLOADS_DIR}") |
| else: |
| print("โ Temp directory setup failed") |
| |
| |
| status_items = [ |
| ("LangChain", LANGCHAIN_AVAILABLE), |
| ("Transformers", TRANSFORMERS_AVAILABLE), |
| ("pydub (Audio)", PYDUB_AVAILABLE), |
| ("ffmpeg", FFMPEG_AVAILABLE), |
| ("Vision (YOLO)", VISION_AVAILABLE) |
| ] |
| |
| for name, available in status_items: |
| status = "โ
" if available else "โ ๏ธ" |
| print(f"{status} {name}: {'Available' if available else 'Limited'}") |
| |
| print("="*50) |
| print("๐ Starting GAIA Agent Interface...") |
| |
| |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| debug=False, |
| show_error=True, |
| quiet=False |
| ) |