import os import re import tempfile import traceback import fitz # PyMuPDF import pandas as pd import requests from smolagents import Tool class DownloadFileFromTaskTool(Tool): name = "download_file_from_task" description = """Downloads a file for a GAIA task ID and saves it in a temporary directory. Use this when question requires information from a mentioned file, before reading a file.""" inputs = { "task_id": {"type": "string", "description": "The GAIA task ID (REQUIRED)."}, "filename": { "type": "string", "description": "Optional custom filename to save the file as (e.g., 'data.xlsx').", "nullable": True, }, } output_type = "string" def forward(self, task_id: str, filename: str = None) -> str: if not task_id or not re.match(r"^[0-9a-f\-]{36}$", task_id): return "❌ Invalid or missing task_id." file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" try: response = requests.get(file_url, timeout=15) if response.status_code == 404: return "⚠️ No file found for this task." response.raise_for_status() # Try extracting filename and extension from header disposition = response.headers.get("content-disposition", "") header_filename_match = re.search(r'filename="(.+?)"', disposition) ext = "" if header_filename_match: ext = os.path.splitext(header_filename_match.group(1))[1] # Final filename logic if not filename: filename = f"{task_id}{ext or '.bin'}" temp_dir = tempfile.mkdtemp() file_path = os.path.join(temp_dir, filename) with open(file_path, "wb") as f: f.write(response.content) print(f"File saved at: {file_path}") return file_path except Exception as e: return f"❌ Error: {e}" class ReadFileContentTool(Tool): name = "read_file_content" description = """Reads and returns the content of a file. Use after downloading a file using `download_file_from_task`.""" inputs = { "file_path": {"type": "string", "description": "Full path to a file to read."} } output_type = "string" def forward(self, file_path: str) -> str: if not os.path.exists(file_path): return f"❌ File does not exist: {file_path}" ext = os.path.splitext(file_path)[1].lower() try: if ext == ".txt": with open(file_path, "r", encoding="utf-8") as f: return f.read() elif ext == ".csv": df = pd.read_csv(file_path) return df.head().to_string(index=False) elif ext == ".xlsx": df = pd.read_excel(file_path) return df.head().to_string(index=False) elif ext == ".pdf": doc = fitz.open(file_path) text = "" for page in doc: text += page.get_text() doc.close() return text.strip() or "⚠️ PDF contains no readable text." elif ext == ".json": with open(file_path, "r", encoding="utf-8") as f: return f.read() elif ext == ".py": with open(file_path, "r", encoding="utf-8") as f: return f.read() elif ext in [".mp3", ".wav"]: return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use audio processing tool if needed." elif ext in [".mp4", ".mov", ".avi"]: return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use video analysis tool if available." else: return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" except Exception as e: return f"❌ Could not read {file_path}: {e}" class GetWikipediaInfoTool(Tool): name = "get_wikipedia_info" description = """Fetches a short summary about a topic from Wikipedia. Use this when a user asks for background information, an explanation, or context on a well-known subject.""" inputs = { "topic": { "type": "string", "description": "The topic to search for on Wikipedia.", } } output_type = "string" def forward(self, topic: str) -> str: print(f"EXECUTING TOOL: get_wikipedia_info(topic='{topic}')") try: search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={topic}&format=json" search_response = requests.get(search_url, timeout=10) search_response.raise_for_status() search_data = search_response.json() if not search_data.get("query", {}).get("search", []): return f"No Wikipedia info for '{topic}'." page_id = search_data["query"]["search"][0]["pageid"] content_url = ( f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" f"exintro=1&explaintext=1&pageids={page_id}&format=json" ) content_response = requests.get(content_url, timeout=10) content_response.raise_for_status() content_data = content_response.json() extract = content_data["query"]["pages"][str(page_id)]["extract"] if len(extract) > 1500: extract = extract[:1500] + "..." result = f"Wikipedia summary for '{topic}':\n{extract}" print(f"-> Tool Result (Wikipedia): {result[:100]}...") return result except Exception as e: print(f"❌ Error in get_wikipedia_info: {e}") traceback.print_exc() return f"Error wiki: {e}" class VisitWebpageTool(Tool): name = "visit_webpage" description = """ Visits a given URL and returns structured page content including title, metadata, headings, paragraphs, tables, lists, and links. """ inputs = { "url": { "type": "string", "description": "The full URL of the webpage to visit.", } } output_type = "string" def forward(self, url: str) -> str: try: import json import requests from bs4 import BeautifulSoup response = requests.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") def clean(text): return " ".join(text.strip().split()) def extract_tables(soup): tables_data = [] for table in soup.find_all("table"): headers = [clean(th.get_text()) for th in table.find_all("th")] rows = [] for row in table.find_all("tr"): cells = [clean(td.get_text()) for td in row.find_all("td")] if cells: rows.append(cells) if headers and rows: tables_data.append({"headers": headers, "rows": rows}) return tables_data def extract_lists(soup): all_lists = [] for ul in soup.find_all("ul"): items = [clean(li.get_text()) for li in ul.find_all("li")] if items: all_lists.append(items) for ol in soup.find_all("ol"): items = [clean(li.get_text()) for li in ol.find_all("li")] if items: all_lists.append(items) return all_lists def extract_meta(soup): metas = {} for meta in soup.find_all("meta"): name = meta.get("name") or meta.get("property") content = meta.get("content") if name and content: metas[name.lower()] = clean(content) return metas result = { "title": clean(soup.title.string) if soup.title else None, "meta": extract_meta(soup), "headings": { "h1": [clean(h.get_text()) for h in soup.find_all("h1")], "h2": [clean(h.get_text()) for h in soup.find_all("h2")], "h3": [clean(h.get_text()) for h in soup.find_all("h3")], }, "paragraphs": [clean(p.get_text()) for p in soup.find_all("p")], "lists": extract_lists(soup), "tables": extract_tables(soup), "links": [ {"text": clean(a.get_text()), "href": a["href"]} for a in soup.find_all("a", href=True) ], } return json.dumps(result, indent=2) except Exception as e: return f"❌ Failed to fetch or parse webpage: {str(e)}" class TranscribeAudioTool(Tool): name = "transcribe_audio" description = ( """Transcribes spoken audio (e.g. voice memos, lectures) into plain text.""" ) inputs = {"file_path": {"type": "string", "description": "Path to an audio file."}} output_type = "string" def forward(self, file_path: str) -> str: try: import os import tempfile import speech_recognition as sr from pydub import AudioSegment # Initialize recognizer recognizer = sr.Recognizer() # Convert to WAV if not already (needed for speech_recognition) file_ext = os.path.splitext(file_path)[1].lower() if file_ext != ".wav": # Create temp WAV file temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name # Convert to WAV using pydub audio = AudioSegment.from_file(file_path) audio.export(temp_wav, format="wav") audio_path = temp_wav else: audio_path = file_path # Transcribe audio using Google's speech recognition with sr.AudioFile(audio_path) as source: audio_data = recognizer.record(source) transcript = recognizer.recognize_google(audio_data) # Clean up temp file if created if file_ext != ".wav" and os.path.exists(temp_wav): os.remove(temp_wav) return transcript.strip() except Exception as e: return f"❌ Transcription failed: {str(e)}" class TranscibeVideoFileTool(Tool): name = "transcribe_video" description = """Transcribes speech from a video file. Use this to understand video lectures, tutorials, or visual demos.""" inputs = { "file_path": { "type": "string", "description": "Path to the video file (e.g., .mp4, .mov).", } } output_type = "string" def forward(self, file_path: str) -> str: try: import os import tempfile import moviepy.editor as mp import speech_recognition as sr # Extract audio from video video = mp.VideoFileClip(file_path) # Create temporary audio file temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name # Extract audio to WAV format (required for speech_recognition) video.audio.write_audiofile(temp_audio, verbose=False, logger=None) video.close() # Initialize recognizer recognizer = sr.Recognizer() # Transcribe audio with sr.AudioFile(temp_audio) as source: audio_data = recognizer.record(source) transcript = recognizer.recognize_google(audio_data) # Clean up temp file if os.path.exists(temp_audio): os.remove(temp_audio) return transcript.strip() except Exception as e: return f"❌ Video processing failed: {str(e)}"