Spaces:
Sleeping
Sleeping
File size: 12,322 Bytes
b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c b69b087 ef7a70c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
import os
import re
import tempfile
import traceback
import fitz # PyMuPDF
import pandas as pd
import requests
from smolagents import Tool
class DownloadFileFromTaskTool(Tool):
name = "download_file_from_task"
description = """Downloads a file for a GAIA task ID and saves it in a temporary directory.
Use this when question requires information from a mentioned file, before reading a file."""
inputs = {
"task_id": {"type": "string", "description": "The GAIA task ID (REQUIRED)."},
"filename": {
"type": "string",
"description": "Optional custom filename to save the file as (e.g., 'data.xlsx').",
"nullable": True,
},
}
output_type = "string"
def forward(self, task_id: str, filename: str = None) -> str:
if not task_id or not re.match(r"^[0-9a-f\-]{36}$", task_id):
return "❌ Invalid or missing task_id."
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
try:
response = requests.get(file_url, timeout=15)
if response.status_code == 404:
return "⚠️ No file found for this task."
response.raise_for_status()
# Try extracting filename and extension from header
disposition = response.headers.get("content-disposition", "")
header_filename_match = re.search(r'filename="(.+?)"', disposition)
ext = ""
if header_filename_match:
ext = os.path.splitext(header_filename_match.group(1))[1]
# Final filename logic
if not filename:
filename = f"{task_id}{ext or '.bin'}"
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, filename)
with open(file_path, "wb") as f:
f.write(response.content)
print(f"File saved at: {file_path}")
return file_path
except Exception as e:
return f"❌ Error: {e}"
class ReadFileContentTool(Tool):
name = "read_file_content"
description = """Reads and returns the content of a file. Use after downloading a file using `download_file_from_task`."""
inputs = {
"file_path": {"type": "string", "description": "Full path to a file to read."}
}
output_type = "string"
def forward(self, file_path: str) -> str:
if not os.path.exists(file_path):
return f"❌ File does not exist: {file_path}"
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext == ".csv":
df = pd.read_csv(file_path)
return df.head().to_string(index=False)
elif ext == ".xlsx":
df = pd.read_excel(file_path)
return df.head().to_string(index=False)
elif ext == ".pdf":
doc = fitz.open(file_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text.strip() or "⚠️ PDF contains no readable text."
elif ext == ".json":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext == ".py":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext in [".mp3", ".wav"]:
return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use audio processing tool if needed."
elif ext in [".mp4", ".mov", ".avi"]:
return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use video analysis tool if available."
else:
return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}"
except Exception as e:
return f"❌ Could not read {file_path}: {e}"
class GetWikipediaInfoTool(Tool):
name = "get_wikipedia_info"
description = """Fetches a short summary about a topic from Wikipedia.
Use this when a user asks for background information, an explanation, or context on a well-known subject."""
inputs = {
"topic": {
"type": "string",
"description": "The topic to search for on Wikipedia.",
}
}
output_type = "string"
def forward(self, topic: str) -> str:
print(f"EXECUTING TOOL: get_wikipedia_info(topic='{topic}')")
try:
search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={topic}&format=json"
search_response = requests.get(search_url, timeout=10)
search_response.raise_for_status()
search_data = search_response.json()
if not search_data.get("query", {}).get("search", []):
return f"No Wikipedia info for '{topic}'."
page_id = search_data["query"]["search"][0]["pageid"]
content_url = (
f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&"
f"exintro=1&explaintext=1&pageids={page_id}&format=json"
)
content_response = requests.get(content_url, timeout=10)
content_response.raise_for_status()
content_data = content_response.json()
extract = content_data["query"]["pages"][str(page_id)]["extract"]
if len(extract) > 1500:
extract = extract[:1500] + "..."
result = f"Wikipedia summary for '{topic}':\n{extract}"
print(f"-> Tool Result (Wikipedia): {result[:100]}...")
return result
except Exception as e:
print(f"❌ Error in get_wikipedia_info: {e}")
traceback.print_exc()
return f"Error wiki: {e}"
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = """
Visits a given URL and returns structured page content including title, metadata, headings, paragraphs,
tables, lists, and links.
"""
inputs = {
"url": {
"type": "string",
"description": "The full URL of the webpage to visit.",
}
}
output_type = "string"
def forward(self, url: str) -> str:
try:
import json
import requests
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
def clean(text):
return " ".join(text.strip().split())
def extract_tables(soup):
tables_data = []
for table in soup.find_all("table"):
headers = [clean(th.get_text()) for th in table.find_all("th")]
rows = []
for row in table.find_all("tr"):
cells = [clean(td.get_text()) for td in row.find_all("td")]
if cells:
rows.append(cells)
if headers and rows:
tables_data.append({"headers": headers, "rows": rows})
return tables_data
def extract_lists(soup):
all_lists = []
for ul in soup.find_all("ul"):
items = [clean(li.get_text()) for li in ul.find_all("li")]
if items:
all_lists.append(items)
for ol in soup.find_all("ol"):
items = [clean(li.get_text()) for li in ol.find_all("li")]
if items:
all_lists.append(items)
return all_lists
def extract_meta(soup):
metas = {}
for meta in soup.find_all("meta"):
name = meta.get("name") or meta.get("property")
content = meta.get("content")
if name and content:
metas[name.lower()] = clean(content)
return metas
result = {
"title": clean(soup.title.string) if soup.title else None,
"meta": extract_meta(soup),
"headings": {
"h1": [clean(h.get_text()) for h in soup.find_all("h1")],
"h2": [clean(h.get_text()) for h in soup.find_all("h2")],
"h3": [clean(h.get_text()) for h in soup.find_all("h3")],
},
"paragraphs": [clean(p.get_text()) for p in soup.find_all("p")],
"lists": extract_lists(soup),
"tables": extract_tables(soup),
"links": [
{"text": clean(a.get_text()), "href": a["href"]}
for a in soup.find_all("a", href=True)
],
}
return json.dumps(result, indent=2)
except Exception as e:
return f"❌ Failed to fetch or parse webpage: {str(e)}"
class TranscribeAudioTool(Tool):
name = "transcribe_audio"
description = (
"""Transcribes spoken audio (e.g. voice memos, lectures) into plain text."""
)
inputs = {"file_path": {"type": "string", "description": "Path to an audio file."}}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
import os
import tempfile
import speech_recognition as sr
from pydub import AudioSegment
# Initialize recognizer
recognizer = sr.Recognizer()
# Convert to WAV if not already (needed for speech_recognition)
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext != ".wav":
# Create temp WAV file
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Convert to WAV using pydub
audio = AudioSegment.from_file(file_path)
audio.export(temp_wav, format="wav")
audio_path = temp_wav
else:
audio_path = file_path
# Transcribe audio using Google's speech recognition
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file if created
if file_ext != ".wav" and os.path.exists(temp_wav):
os.remove(temp_wav)
return transcript.strip()
except Exception as e:
return f"❌ Transcription failed: {str(e)}"
class TranscibeVideoFileTool(Tool):
name = "transcribe_video"
description = """Transcribes speech from a video file. Use this to understand video lectures, tutorials, or visual demos."""
inputs = {
"file_path": {
"type": "string",
"description": "Path to the video file (e.g., .mp4, .mov).",
}
}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
import os
import tempfile
import moviepy.editor as mp
import speech_recognition as sr
# Extract audio from video
video = mp.VideoFileClip(file_path)
# Create temporary audio file
temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Extract audio to WAV format (required for speech_recognition)
video.audio.write_audiofile(temp_audio, verbose=False, logger=None)
video.close()
# Initialize recognizer
recognizer = sr.Recognizer()
# Transcribe audio
with sr.AudioFile(temp_audio) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file
if os.path.exists(temp_audio):
os.remove(temp_audio)
return transcript.strip()
except Exception as e:
return f"❌ Video processing failed: {str(e)}"
|