Spaces:
Runtime error
Runtime error
| import os | |
| import wiki | |
| import torch | |
| import logging | |
| import requests | |
| import wikipedia | |
| import pytesseract | |
| import pandas as pd | |
| from PIL import Image | |
| from io import BytesIO | |
| import soundfile as sf | |
| from pytube import YouTube | |
| from yt_dlp import YoutubeDL | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| pipeline, | |
| ) | |
| from smolagents import ( | |
| CodeAgent, | |
| DuckDuckGoSearchTool, | |
| PythonInterpreterTool, | |
| HfApiModel, | |
| LiteLLMModel, | |
| Tool, | |
| TransformersModel | |
| ) | |
| model = LiteLLMModel( | |
| model_id="ollama_chat/qwen3:14b", | |
| api_base="http://127.0.0.1:11434", | |
| num_ctx=8192 | |
| ) | |
| #bnb_config = BitsAndBytesConfig(load_in_8bit=True) | |
| #tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| # model = TransformersModel( | |
| # model_id=model_id, | |
| # torch_dtype="bfloat16", | |
| # device_map="cuda", | |
| # trust_remote_code=True, | |
| # max_new_tokens=2048 | |
| # ) | |
| #model = torch.compile(model, mode="default") | |
| from whisper import load_model as load_whisper | |
| whisper_model = load_whisper("small") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1) GAIA system prompt | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| GAIA_SYSTEM_PROMPT = """ | |
| You are a general AI assistant. I will ask you a question. | |
| Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. | |
| YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
| If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| All question related files if existant are given to you below as: AXULIARY FILE FOR QUESTION: [FILE_PATH] | |
| """ | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| AUDIO_FILES = ["wav", "mp3", "aac", "ogg"] | |
| IMAGE_FILES = ["png", "jpg", "tiff", "jpeg", "bmp"] | |
| TABULAR_FILES = ["csv", "xlsx"] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2) Custom tools | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # --- File handler --- | |
| def file_handler(task_id: str, file_name: str): | |
| try: | |
| response = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") | |
| response.raise_for_status() | |
| data = response.content | |
| ext = file_name.split('.')[-1].lower() | |
| return data, ext | |
| except Exception as e: | |
| logger.error(f"Failed to fetch file: {e}") | |
| raise | |
| def fetch_file(args: str) -> str: | |
| """ | |
| Download a binary blob by task_id,file_name via file_handler, | |
| save it under ./tmp/, and return the local filesystem path. | |
| Args: | |
| args: "task_id, file_name" | |
| """ | |
| task_id, file_name = [x.strip() for x in args.split(',')] | |
| data, ext = file_handler(task_id, file_name) | |
| local_path = f"./tmp/{task_id}.{ext}" | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| with open(local_path, 'wb') as f: | |
| f.write(data) | |
| return local_path | |
| class TranscriptionTool(Tool): | |
| name = "TranscriptionTool" | |
| description = """ | |
| This tool transcribes spoken content from local audio files such as .wav or .mp3. | |
| It uses OpenAI's Whisper model to convert speech to text. | |
| It expects a file path to the audio file and returns a string containing the transcription. | |
| To call the tool on code just use TranscriptionTool(path). | |
| """ | |
| inputs = { | |
| "path": { | |
| "type": "string", | |
| "description": "The path to a local audio file (.wav, .mp3, etc.)" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, path: str) -> str: | |
| data, sr = sf.read(path, dtype='float32') | |
| res = whisper_model.transcribe(data, language='en') | |
| return f"The transcribed audio text is: {res['text']}\n" | |
| class OCRTool(Tool): | |
| name = "OCRTool" | |
| description = """ | |
| This tool extracts text from images using Tesseract OCR. | |
| It takes a path to an image file (e.g., .png or .jpg) and returns any readable text found in the image. | |
| To call the tool on code just use OCRTool(path). | |
| """ | |
| inputs = { | |
| "path": { | |
| "type": "string", | |
| "description": "The path to a local image file (.png, .jpg, etc.)" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, path: str) -> str: | |
| img = Image.open(path) | |
| text = pytesseract.image_to_string(img) | |
| return f"Extracted text from image:\n\n{text}" | |
| class TablePreviewTool(Tool): | |
| name = "TablePreviewTool" | |
| description = """ | |
| This tool previews a CSV or Excel spreadsheet file. | |
| It returns the shape (rows, columns), column names, the first few rows of data and some description of the database. | |
| Useful for understanding the structure of tabular data before processing it. | |
| To call the tool on code just use TablePreviewTool(path)""" | |
| inputs = { | |
| "path": { | |
| "type": "string", | |
| "description": "The path to a .csv or .xlsx file" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, path: str) -> str: | |
| ext = path.rsplit('.', 1)[-1].lower() | |
| df = pd.read_csv(path) if ext == 'csv' else pd.read_excel(path) | |
| return f"""Shape: {df.shape}\n Columns: {list(df.columns)}\n\n | |
| Head: {df.head().to_markdown()}\n\n Description of dataset: {str(df.describe())}""" | |
| class YouTubeInfoTool(Tool): | |
| name = "YouTubeInfoTool" | |
| description = """ | |
| This tool fetches metadata and English captions from a given YouTube video. | |
| It returns the video's title, description, and the English subtitles if available. | |
| To call the tool on code just use YouTubeInfoTool(url)""" | |
| inputs = { | |
| "url": { | |
| "type": "string", | |
| "description": "The full URL to a YouTube video" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, url: str) -> str: | |
| ydl_opts = { | |
| "skip_download": True, | |
| "quiet": True, | |
| "writesubtitles": True, | |
| "writeautomaticsub": True, | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| title = info.get("title", "") | |
| if title == None: | |
| title = "None" | |
| desc = info.get("description", "") | |
| if desc == None: | |
| desc = "None" | |
| # try manual subtitles first, then auto-generated | |
| subs = info.get("subtitles", {}) or info.get("automatic_captions", {}) | |
| en_caps = subs.get("en") or subs.get("en-US") or [] | |
| if en_caps: | |
| cap_url = en_caps[0]["url"] | |
| captions = requests.get(cap_url).text | |
| else: | |
| captions = "No English captions available." | |
| text = f"Title: {title}\n\nDescription:\n{desc}\n\nCaptions:\n{captions}" | |
| return f"The Youtube video title, description and captions are respectivelly: {text}" | |
| class WikiTool(Tool): | |
| name = "WikiTool" | |
| description = """ | |
| This tool searches Wikipedia for a given query and returns a concise summary. | |
| It takes a search term (string) as input and returns the first few sentences | |
| of the corresponding Wikipedia article (or a notice if multiple or no pages are found). | |
| To call the tool in code, use: WikiTool(query) | |
| """ | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The search term for Wikipedia (e.g., 'Python programming language')." | |
| } | |
| } | |
| output_type = "string" | |
| def setup(self): | |
| # Set language or any expensive init once | |
| wikipedia.set_lang("en") | |
| def forward(self, query: str) -> str: | |
| # Search for matching pages | |
| results = wikipedia.search(query, results=5) | |
| if not results: | |
| return f"No Wikipedia pages found for '{query}'." | |
| # If multiple results, pick the top one | |
| page_title = results[0] | |
| try: | |
| # Get the summary (first 3 sentences) | |
| summary = wikipedia.summary(page_title, auto_suggest=False) | |
| return f"Wikipedia summary for '{page_title}':\n\n{summary}" | |
| except wikipedia.DisambiguationError as e: | |
| options = ", ".join(e.options[:5]) | |
| return ( | |
| f"Your query '{query}' is ambiguous. " | |
| f"Here are some options: {options}" | |
| ) | |
| except Exception as e: | |
| return f"Error retrieving Wikipedia summary for '{page_title}': {e}" | |
| class TextFileReaderTool(Tool): | |
| name = "TextFileReaderTool" | |
| description = """ | |
| This tool reads the full contents of a local text-based file (e.g., .txt, .py, .md). | |
| It takes a file path as input and returns the entire file as a single string. | |
| To call the tool in code, use: TextFileReaderTool(path) | |
| """ | |
| inputs = { | |
| "path": { | |
| "type": "string", | |
| "description": "The path to a local text based file (.txt, .py, .md, etc.), example: ./tmp/f918266a-b3e0-4914-865d-4faa564f1aef.py" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, path: str) -> str: | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return f"Contents of '{path}':\n\n{content}" | |
| except FileNotFoundError: | |
| return f"Error: File not found at '{path}'." | |
| except Exception as e: | |
| return f"Error reading '{path}': {e}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3) Built-in smolagents tools | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| search_tool = DuckDuckGoSearchTool() | |
| python_repl = PythonInterpreterTool() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4) GaiaAgent class with file-preloading | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GAIAAgent: | |
| def __init__(self, model_name: str = None): | |
| """ | |
| Initialize the GAIA inference agent with your system prompt. | |
| Args: | |
| model_name: optional HF model identifier | |
| """ | |
| self.system_prompt = GAIA_SYSTEM_PROMPT | |
| self.model = model | |
| self.agent = CodeAgent( | |
| model=self.model, | |
| tools=[ | |
| TextFileReaderTool(), | |
| WikiTool(), | |
| DuckDuckGoSearchTool(), | |
| PythonInterpreterTool(), | |
| TranscriptionTool(), | |
| OCRTool(), | |
| TablePreviewTool(), | |
| YouTubeInfoTool(), | |
| ], | |
| max_steps=10, | |
| verbosity_level=2, | |
| add_base_tools=True, | |
| additional_authorized_imports = ["numpy", "pandas", "wikipedia"] | |
| ) | |
| def __call__(self, question: str,task_id: str = None, file_name: str = None) -> str: | |
| """ | |
| Run the agent on `question`. If `task_id` and `file_name` are set, | |
| download the file into ./tmp/ via fetch_file, then prefix: | |
| "FILE: ./tmp/{file_name}\n\n{question}" | |
| Returns only what's after 'FINAL ANSWER:'. | |
| """ | |
| prompt = question | |
| if task_id and file_name: | |
| local_path = fetch_file(f"{task_id},{file_name}") | |
| prompt = f"AXULIARY FILE FOR QUESTION: {local_path}\n\n{question}" | |
| # Add system prompt before passing to model | |
| full_prompt = f"{self.system_prompt}\n\nQuestion: {prompt}" | |
| full_resp = self.agent.run(prompt) | |
| if type(full_resp) != str: | |
| full_resp = str(full_resp) | |
| if "FINAL ANSWER:" in full_resp: | |
| return full_resp.split("FINAL ANSWER:")[-1].strip() | |
| if "**Answer**" in full_resp: | |
| return full_resp.split("**Answer**:")[-1].strip() | |
| if "**Answer:**" in full_resp: | |
| return full_resp.split("**Answer:**")[-1].strip() | |
| return full_resp | |