|
|
|
|
| from langchain_community.tools import WikipediaQueryRun, ArxivQueryRun, DuckDuckGoSearchRun |
| from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper, DuckDuckGoSearchAPIWrapper |
|
|
| from langchain_core.tools import tool |
| from datetime import datetime |
|
|
| import requests |
| import io |
| import pandas as pd |
|
|
| wikipedia_tool = WikipediaQueryRun( |
| api_wrapper=WikipediaAPIWrapper(top_k_results=2, doc_content_chars_max=4000,lang="en") |
| ) |
| ddg_search_tool = DuckDuckGoSearchRun( |
| api_wrapper=DuckDuckGoSearchAPIWrapper(max_results=5) |
| ) |
| arxiv_tool = ArxivQueryRun( |
| api_wrapper=ArxivAPIWrapper(top_k_results=3, doc_content_chars_max=4000) |
| ) |
| @tool |
| def get_current_year() -> str: |
| """returns the current year""" |
| return str(datetime.now().year) |
| @tool |
| def get_youtube_transcript(url: str) -> str: |
| """ |
| Get the full transcript/subtitles from a YouTube video. |
| Use this tool whenever the question contains a YouTube URL (youtube.com or youtu.be). |
| Extract the URL from the question and pass it as the argument. |
| Example: if question says 'In the video https://www.youtube.com/watch?v=ABC123, what...' |
| then call this tool with url='https://www.youtube.com/watch?v=ABC123' |
| """ |
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi |
| if "v=" in url: |
| video_id = url.split("v=")[-1].split("&")[0] |
| elif "youtu.be/" in url: |
| video_id = url.split("youtu.be/")[-1].split("?")[0] |
| else: |
| return "Could not extract video ID from URL." |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) |
| return " ".join([t["text"] for t in transcript])[:5000] |
| except Exception as e: |
| return f"Transcript unavailable: {e}" |
| @tool |
| def fetch_url_content(url: str) -> str: |
| """Fetch and return the text content of any URL. Useful for web pages and articles.""" |
| |
| try: |
| headers = {'User-Agent': 'Mozilla/5.0'} |
| response = requests.get(url, headers=headers, timeout=10) |
| response.raise_for_status() |
| text = response.text |
| import re |
| text = re.sub(r'<[^>]+>', ' ', text) |
| text = re.sub(r'\s+', ' ', text).strip() |
| return text[:6000] |
| except Exception as e: |
| return f"Could not fetch URL: {e}" |
| @tool |
| def get_gaia_file(task_id: str) -> str: |
| """Download and read a file attachment for a GAIA task. Use when question mentions an attached file.""" |
| try: |
| url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" |
| resp = requests.get(url, timeout=15) |
| if resp.status_code != 200: |
| return f"Could not fetch file for task {task_id}" |
| content_type = resp.headers.get("content-type", "") |
| content_disposition = resp.headers.get("content-disposition", "") |
|
|
| filename = "" |
| if "filename=" in content_disposition: |
| filename = content_disposition.split("filename=")[-1].strip('"') |
| if filename.endswith(".py") or "text/plain" in content_type: |
| return resp.text |
| if filename.endswith(".xlsx") or filename.endswith(".xls") or "spreadsheet" in content_type or "excel" in content_type: |
| df = pd.read_excel(io.BytesIO(resp.content)) |
| return df.to_string() |
| if filename.endswith(".csv") or "csv" in content_type: |
| df = pd.read_csv(io.BytesIO(resp.content)) |
| return df.to_string() |
| if filename.endswith(".mp3") or "audio" in content_type: |
| return "This is an audio file. Audio transcription is not supported." |
|
|
| try: |
| return resp.text[:5000] |
| except: |
| return "File could not be read." |
| except Exception as e: |
| return f"Error fetching file: {e}" |
|
|
|
|