mdicio's picture
google
a931dc2
import html
import json
import mimetypes
import os
import re
import time
import traceback
from pathlib import Path
from typing import Dict, List
from urllib.parse import quote_plus, urlparse
import chromadb
import chromadb.utils.embedding_functions as embedding_functions
import fitz # PyMuPDF
import pandas as pd
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from duckduckgo_search import DDGS
from duckduckgo_search.exceptions import (
ConversationLimitException,
DuckDuckGoSearchException,
RatelimitException,
TimeoutException,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
BSHTMLLoader,
JSONLoader,
PyPDFLoader,
TextLoader,
UnstructuredFileLoader,
)
from langchain_community.tools import BraveSearch
from markdownify import markdownify
from ollama import chat
from PIL import Image
from smolagents import Tool, tool
from smolagents.utils import truncate_content
load_dotenv()
class ReadFileContentTool(Tool):
name = "read_file_content"
description = """Reads local files in various formats (text, CSV, Excel, PDF, HTML, etc.) and returns their content as readable text. Automatically detects and processes the appropriate file format."""
inputs = {
"file_path": {
"type": "string",
"description": "The full path to the file from which the content should be read.",
}
}
output_type = "string"
def forward(self, file_path: str) -> str:
if not os.path.exists(file_path):
return f"❌ File does not exist: {file_path}"
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
return truncate_content(f.read())
elif ext == ".csv":
df = pd.read_csv(file_path)
return truncate_content(
f"CSV Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}"
)
elif ext in [".xlsx", ".xls"]:
df = pd.read_excel(file_path)
return truncate_content(
f"Excel Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}"
)
elif ext == ".pdf":
doc = fitz.open(file_path)
text = "".join([page.get_text() for page in doc])
doc.close()
return truncate_content(
text.strip() or "⚠️ PDF contains no readable text."
)
elif ext == ".json":
with open(file_path, "r", encoding="utf-8") as f:
return truncate_content(f.read())
elif ext == ".py":
with open(file_path, "r", encoding="utf-8") as f:
return truncate_content(f.read())
elif ext in [".html", ".htm"]:
with open(file_path, "r", encoding="utf-8") as f:
html = f.read()
try:
markdown = markdownify(html).strip()
markdown = re.sub(r"\n{3,}", "\n\n", markdown)
return f"📄 HTML content (converted to Markdown):\n\n{truncate_content(markdown)}"
except Exception:
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text(separator="\n").strip()
return f"📄 HTML content (raw text fallback):\n\n{truncate_content(text)}"
elif ext in [".mp3", ".wav"]:
return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use transcribe_audio tool to process the audio content."
elif ext in [".mp4", ".mov", ".avi"]:
return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use transcribe_video tool to process the video content."
else:
return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}"
except Exception as e:
return f"❌ Could not read {file_path}: {e}"
class WikipediaSearchTool(Tool):
name = "wikipedia_search"
description = """Searches Wikipedia for a specific topic and returns a concise summary. Useful for background information on subjects, concepts, historical events, or scientific topics."""
inputs = {
"query": {
"type": "string",
"description": "The query or subject to search for on Wikipedia.",
}
}
output_type = "string"
def forward(self, query: str) -> str:
print(f"EXECUTING TOOL: wikipedia_search(query='{query}')")
try:
search_link = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json"
search_response = requests.get(search_link, timeout=10)
search_response.raise_for_status()
search_data = search_response.json()
if not search_data.get("query", {}).get("search", []):
return f"No Wikipedia info for '{query}'."
page_id = search_data["query"]["search"][0]["pageid"]
content_link = (
f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&"
f"exintro=1&explaintext=1&pageids={page_id}&format=json"
)
content_response = requests.get(content_link, timeout=10)
content_response.raise_for_status()
content_data = content_response.json()
extract = content_data["query"]["pages"][str(page_id)]["extract"]
if len(extract) > 1500:
extract = extract[:1500] + "..."
result = f"Wikipedia summary for '{query}':\n{extract}"
print(f"-> Tool Result (Wikipedia): {result[:100]}...")
return result
except Exception as e:
print(f"❌ Error in wikipedia_search: {e}")
traceback.print_exc()
return f"Error wiki: {e}"
class TranscribeAudioTool(Tool):
name = "transcribe_audio"
description = """Converts spoken content in audio files to text. Handles various audio formats and produces a transcript of the spoken content for analysis."""
inputs = {
"file_path": {
"type": "string",
"description": "The full path to the audio file that needs to be transcribed.",
}
}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
import os
import tempfile
import speech_recognition as sr
from pydub import AudioSegment
# Verify file exists
if not os.path.exists(file_path):
return (
f"❌ Audio file not found at: {file_path}. Download the file first."
)
# Initialize recognizer
recognizer = sr.Recognizer()
# Convert to WAV if not already (needed for speech_recognition)
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext != ".wav":
# Create temp WAV file
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Convert to WAV using pydub
audio = AudioSegment.from_file(file_path)
audio.export(temp_wav, format="wav")
audio_path = temp_wav
else:
audio_path = file_path
# Transcribe audio using Google's speech recognition
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file if created
if file_ext != ".wav" and os.path.exists(temp_wav):
os.remove(temp_wav)
return transcript.strip()
except Exception as e:
return f"❌ Transcription failed: {str(e)}"
class TranscibeVideoFileTool(Tool):
name = "transcribe_video"
description = """Extracts and transcribes speech from video files. Converts the audio portion of videos into readable text for analysis or reference."""
inputs = {
"file_path": {
"type": "string",
"description": "The full path to the video file that needs to be transcribed.",
}
}
output_type = "string"
def forward(self, file_path: str) -> str:
try:
# Verify file exists
if not os.path.exists(file_path):
return (
f"❌ Video file not found at: {file_path}. Download the file first."
)
import os
import tempfile
import moviepy.editor as mp
import speech_recognition as sr
# Extract audio from video
video = mp.VideoFileClip(file_path)
# Create temporary audio file
temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Extract audio to WAV format (required for speech_recognition)
video.audio.write_audiofile(temp_audio, verbose=False, logger=None)
video.close()
# Initialize recognizer
recognizer = sr.Recognizer()
# Transcribe audio
with sr.AudioFile(temp_audio) as source:
audio_data = recognizer.record(source)
transcript = recognizer.recognize_google(audio_data)
# Clean up temp file
if os.path.exists(temp_audio):
os.remove(temp_audio)
return transcript.strip()
except Exception as e:
return f"❌ Video processing failed: {str(e)}"
class BraveWebSearchTool(Tool):
name = "web_search"
description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query."""
inputs = {
"query": {
"type": "string",
"description": "A web search query string (e.g., a question or query).",
}
}
output_type = "string"
# api_key = os.getenv("BRAVE_SEARCH_API_KEY")
api_key = "asdasfd"
count = 3
char_limit = 4000 # Adjust based on LLM context window
tool = BraveSearch.from_api_key(api_key=api_key, search_kwargs={"count": count})
def extract_main_text(self, url: str, char_limit: int) -> str:
try:
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
# Remove scripts/styles
for tag in soup(["script", "style", "noscript"]):
tag.extract()
# Heuristic: extract visible text from body
body = soup.body
if not body:
return "⚠️ Could not extract content."
text = " ".join(t.strip() for t in body.stripped_strings)
return text[:char_limit].strip()
except Exception as e:
return f"⚠️ Failed to extract article: {e}"
def forward(self, query: str) -> str:
try:
results_json = self.tool.run(query)
results = (
json.loads(results_json)
if isinstance(results_json, str)
else results_json
)
output_parts = []
for i, r in enumerate(results[: self.count], start=1):
title = html.unescape(r.get("title", "").strip())
link = r.get("link", "").strip()
article_text = self.extract_main_text(link, self.char_limit)
result_block = (
f"Result {i}:\n"
f"Title: {title}\n"
f"URL: {link}\n"
f"Extracted Content:\n{article_text}\n"
)
output_parts.append(result_block)
return "\n\n".join(output_parts).strip()
except Exception as e:
return f"Search failed: {str(e)}"
class DescribeImageTool(Tool):
name = "describe_image"
description = """Analyzes images and generates detailed text descriptions. Identifies objects, scenes, text, and visual elements within the image to provide context or understanding."""
inputs = {
"image_path": {
"type": "string",
"description": "The full path to the image file to describe.",
}
}
output_type = "string"
def forward(self, image_path: str) -> str:
import os
from PIL import Image
from transformers import BlipForConditionalGeneration, BlipProcessor
if not os.path.exists(image_path):
return f"❌ Image file does not exist: {image_path}"
try:
processor = BlipProcessor.from_pretrained(
"Salesforce/blip-image-captioning-base", use_fast=True
)
model = BlipForConditionalGeneration.from_pretrained(
"Salesforce/blip-image-captioning-base"
)
image = Image.open(image_path).convert("RGB")
inputs = processor(images=image, return_tensors="pt")
output_ids = model.generate(**inputs)
caption = processor.decode(output_ids[0], skip_special_tokens=True)
return caption.strip() or "⚠️ No caption could be generated."
except Exception as e:
return f"❌ Failed to describe image: {e}"
class DownloadFileFromLinkTool(Tool):
name = "download_file_from_link"
description = "Downloads files from a URL and saves them locally. Supports various formats including PDFs, documents, images, and data files. Returns the local file path for further processing."
inputs = {
"link": {"type": "string", "description": "The URL to download the file from."},
"file_name": {
"type": "string",
"description": "Desired name of the saved file, without extension.",
"nullable": True,
},
}
output_type = "string"
SUPPORTED_EXTENSIONS = {
".xlsx",
".pdf",
".txt",
".csv",
".json",
".xml",
".html",
".jpg",
".jpeg",
".png",
".mp4",
".mp3",
".wav",
".zip",
}
def forward(self, link: str, file_name: str = "taskfile") -> str:
print(f"⬇️ Downloading file from: {link}")
dir_path = "./downloads"
os.makedirs(dir_path, exist_ok=True)
try:
response = requests.get(link, stream=True, timeout=30)
except requests.RequestException as e:
return f"❌ Error: Request failed - {e}"
if response.status_code != 200:
return (
f"❌ Error: Unable to fetch file. Status code: {response.status_code}"
)
# Step 1: Try extracting extension from provided filename
base_name, provided_ext = os.path.splitext(file_name)
provided_ext = provided_ext.lower()
# Step 2: Check if provided extension is supported
if provided_ext and provided_ext in self.SUPPORTED_EXTENSIONS:
ext = provided_ext
else:
# Step 3: Try to infer from Content-Type
content_type = (
response.headers.get("Content-Type", "").split(";")[0].strip()
)
guessed_ext = mimetypes.guess_extension(content_type or "") or ""
# Step 4: If mimetype returned .bin or nothing useful, try to fallback to URL
if guessed_ext in ("", ".bin"):
parsed_link = urlparse(link)
_, url_ext = os.path.splitext(parsed_link.path)
if url_ext.lower() in self.SUPPORTED_EXTENSIONS:
ext = url_ext.lower()
else:
return f"⚠️ Warning: Cannot determine a valid file extension from '{content_type}' or URL. Please retry with an explicit valid filename and extension."
else:
ext = guessed_ext
# Step 5: Final path and save
file_path = os.path.join(dir_path, base_name + ext)
downloaded = 0
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
downloaded += len(chunk)
return file_path
class DuckDuckGoSearchTool(Tool):
name = "web_search"
description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query."""
inputs = {
"query": {
"type": "string",
"description": "The search query to run on DuckDuckGo",
},
}
output_type = "string"
def _configure(self, max_retries: int = 5, retry_sleep: int = 2):
self._max_retries = max_retries
self._retry_sleep = retry_sleep
def forward(self, query: str) -> str:
self._configure()
top_results = 5
retries = 0
max_retries = getattr(self, "_max_retries", 3)
retry_sleep = getattr(self, "_retry_sleep", 2)
while retries < max_retries:
try:
results = DDGS().text(
keywords=query,
region="wt-wt",
safesearch="moderate",
max_results=top_results,
)
if not results:
return "No results found."
output_lines = []
for idx, res in enumerate(results[:top_results], start=1):
title = res.get("title", "N/A")
url = res.get("href", "N/A")
snippet = res.get("body", "N/A")
output_lines.append(
f"Result {idx}:\n"
f"Title: {title}\n"
f"URL: {url}\n"
f"Snippet: {snippet}\n"
)
output = "\n".join(output_lines)
print(f"-> Tool Result (DuckDuckGo): {output[:1500]}...")
return output
except (
DuckDuckGoSearchException,
TimeoutException,
RatelimitException,
ConversationLimitException,
) as e:
retries += 1
self._retry_sleep +=2
print(
f"⚠️ DuckDuckGo Exception (Attempt {retries}/{max_retries}): {type(e).__name__}: {e}"
)
traceback.print_exc()
time.sleep(retry_sleep)
except Exception as e:
print(f"❌ Unexpected Error: {e}")
traceback.print_exc()
return f"Unhandled exception during DuckDuckGo search: {e}"
return f"❌ Failed to retrieve results after {max_retries} retries."
huggingface_ef = embedding_functions.HuggingFaceEmbeddingFunction(
model_name="sentence-transformers/all-mpnet-base-v2"
)
SUPPORTED_EXTENSIONS = [
".txt",
".md",
".py",
".pdf",
".json",
".jsonl",
".html",
".htm",
]
class AddDocumentToVectorStoreTool(Tool):
name = "add_document_to_vector_store"
description = "Processes a document and adds it to the vector database for semantic search. Automatically chunks files and creates text embeddings to enable powerful content retrieval."
inputs = {
"file_path": {
"type": "string",
"description": "Absolute path to the file to be indexed.",
}
}
output_type = "string"
def _load_file(self, path: Path):
"""Select the right loader for the file extension."""
if path.suffix == ".pdf":
return PyPDFLoader(str(path)).load()
elif path.suffix == ".json":
return JSONLoader(str(path), jq_schema=".").load()
elif path.suffix in [".md"]:
return UnstructuredFileLoader(str(path)).load()
elif path.suffix in [".html", ".htm"]:
return BSHTMLLoader(str(path)).load()
else: # fallback for .txt, .py, etc.
return TextLoader(str(path)).load()
def forward(self, file_path: str) -> str:
print(f"📄 Adding document to vector store: {file_path}")
try:
collection_name = "vectorstore"
path = Path(file_path)
if not path.exists() or path.suffix not in SUPPORTED_EXTENSIONS:
return f"Unsupported or missing file: {file_path}"
docs = self._load_file(path)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, chunk_overlap=50
)
split_docs = text_splitter.split_documents(docs)
client = chromadb.Client(
chromadb.config.Settings(
persist_directory="./chroma_store",
)
)
collection = client.get_or_create_collection(
name=collection_name,
configuration={"embedding_function": huggingface_ef},
)
texts = [doc.page_content for doc in split_docs]
metadatas = [doc.metadata for doc in split_docs]
collection.add(
documents=texts,
metadatas=metadatas,
ids=[f"{path.stem}_{i}" for i in range(len(texts))],
)
return f"✅ Successfully added {len(texts)} chunks from '{file_path}' to collection '{collection_name}'."
except Exception as e:
print(f"❌ Error in add_to_vector_store: {e}")
traceback.print_exc()
return f"Error: {e}"
class QueryVectorStoreTool(Tool):
name = "query_downloaded_documents"
description = "Performs semantic searches across your downloaded documents. Use detailed queries to find specific information, concepts, or answers from your collected resources."
inputs = {
"query": {
"type": "string",
"description": "The search query. Ensure this is constructed intelligently so to retrieve the most relevant outputs.",
}
}
output_type = "string"
def forward(self, query: str) -> str:
collection_name = "vectorstore"
k = 5
print(f"🔎 Querying vector store '{collection_name}' with: '{query}'")
try:
client = chromadb.Client(
chromadb.config.Settings(
persist_directory="./chroma_store",
)
)
collection = client.get_collection(name=collection_name)
results = collection.query(
query_texts=[query],
n_results=k,
)
formatted = []
for i in range(len(results["documents"][0])):
doc = results["documents"][0][i]
metadata = results["metadatas"][0][i]
formatted.append(
f"Result {i+1}:\n" f"Content: {doc}\n" f"Metadata: {metadata}\n"
)
return "\n".join(formatted) or "No relevant documents found."
except Exception as e:
print(f"❌ Error in query_vector_store: {e}")
traceback.print_exc()
return f"Error querying vector store: {e}"
@tool
def image_question_answering(image_path: str, prompt: str) -> str:
"""
Analyzes images and answers specific questions about their content. Can identify objects, read text, describe scenes, or interpret visual information based on your questions.
Args:
image_path: The path to the image file
prompt: The question to ask about the image
Returns:
A string answer generated by the local Ollama model
"""
# Check for supported file types
file_extension = image_path.lower().split(".")[-1]
if file_extension not in ["jpg", "jpeg", "png", "bmp", "gif", "webp"]:
return "Unsupported file type. Please provide an image."
path = Path(image_path)
if not path.exists():
return f"File not found at: {image_path}"
# Send the image and prompt to Ollama's local model
response = chat(
model="llava", # Assuming your model is named 'lava'
messages=[
{
"role": "user",
"content": prompt,
"images": [path],
},
],
options={"temperature": 0.2}, # Slight randomness for naturalness
)
return response.message.content.strip()
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = "Loads a webpage from a URL and converts its content to markdown format. Use this to browse websites, extract information, or identify downloadable resources from a specific web address."
inputs = {
"url": {
"type": "string",
"description": "The url of the webpage to visit.",
}
}
output_type = "string"
def forward(self, url: str) -> str:
try:
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify
from requests.exceptions import RequestException
from smolagents.utils import truncate_content
except ImportError as e:
raise ImportError(
"You must install packages `markdownify`, `requests`, and `beautifulsoup4` to run this tool: for instance run `pip install markdownify requests beautifulsoup4`."
) from e
try:
# Get the webpage content
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=20)
response.raise_for_status()
# Parse the HTML with BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Extract domain name for context
domain = urlparse(url).netloc
# Remove common clutter elements
self._remove_clutter(soup)
# Try to identify and prioritize main content
main_content = self._extract_main_content(soup)
if main_content:
# Convert the cleaned HTML to markdown
markdown_content = markdownify(str(main_content)).strip()
else:
# Fallback to full page content if main content extraction fails
markdown_content = markdownify(str(soup)).strip()
# Post-process the markdown content
markdown_content = self._clean_markdown(markdown_content)
# Add source information
result = f"Content from {domain}:\n\n{markdown_content}"
return truncate_content(result, 40000)
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def _remove_clutter(self, soup):
"""Remove common elements that clutter web pages."""
# Common non-content elements to remove
clutter_selectors = [
"header",
"footer",
"nav",
".nav",
".navigation",
".menu",
".sidebar",
".footer",
".header",
"#footer",
"#header",
"#nav",
"#sidebar",
".widget",
".cookie",
".cookies",
".ad",
".ads",
".advertisement",
"script",
"style",
"noscript",
"iframe",
".social",
".share",
".comment",
".comments",
".subscription",
".newsletter",
'[role="banner"]',
'[role="navigation"]',
'[role="complementary"]',
]
for selector in clutter_selectors:
for element in soup.select(selector):
element.decompose()
# Remove hidden elements
for hidden in soup.select(
'[style*="display: none"], [style*="display:none"], [style*="visibility: hidden"], [style*="visibility:hidden"], [hidden]'
):
hidden.decompose()
def _extract_main_content(self, soup):
"""Try to identify and extract the main content of the page."""
# Priority order for common main content containers
main_content_selectors = [
"main",
'[role="main"]',
"article",
".content",
".main-content",
".post-content",
"#content",
"#main",
"#main-content",
".article",
".post",
".entry",
".page-content",
".entry-content",
]
# Try to find the main content container
for selector in main_content_selectors:
main_content = soup.select(selector)
if main_content:
# If multiple matches, find the one with the most text content
if len(main_content) > 1:
return max(main_content, key=lambda x: len(x.get_text()))
return main_content[0]
# If no main content container found, look for the largest text block
paragraphs = soup.find_all("p")
if paragraphs:
# Find the parent that contains the most paragraphs
parents = {}
for p in paragraphs:
if p.parent:
if p.parent not in parents:
parents[p.parent] = 0
parents[p.parent] += 1
if parents:
# Return the parent with the most paragraphs
return max(parents.items(), key=lambda x: x[1])[0]
# Return None if we can't identify main content
return None
def _clean_markdown(self, content):
"""Clean up the markdown content."""
# Normalize whitespace
content = re.sub(r"\n{3,}", "\n\n", content)
# Remove consecutive duplicate links
content = re.sub(r"(\[.*?\]\(.*?\))\s*\1+", r"\1", content)
# Remove very short lines that are likely menu items
lines = content.split("\n")
filtered_lines = []
# Skip consecutive short lines (likely menus)
short_line_threshold = 40 # characters
consecutive_short_lines = 0
max_consecutive_short_lines = 3
for line in lines:
stripped_line = line.strip()
if len(
stripped_line
) < short_line_threshold and not stripped_line.startswith("#"):
consecutive_short_lines += 1
if consecutive_short_lines > max_consecutive_short_lines:
continue
else:
consecutive_short_lines = 0
filtered_lines.append(line)
content = "\n".join(filtered_lines)
# Remove duplicate headers
seen_headers = set()
lines = content.split("\n")
filtered_lines = []
for line in lines:
if line.startswith("#"):
header_text = line.strip()
if header_text in seen_headers:
continue
seen_headers.add(header_text)
filtered_lines.append(line)
content = "\n".join(filtered_lines)
# Remove lines containing common footer patterns
footer_patterns = [
r"^copyright",
r"^©",
r"^all rights reserved",
r"^terms",
r"^privacy policy",
r"^contact us",
r"^follow us",
r"^social media",
r"^disclaimer",
]
footer_pattern = "|".join(footer_patterns)
lines = content.split("\n")
filtered_lines = []
for line in lines:
if not re.search(footer_pattern, line.lower()):
filtered_lines.append(line)
content = "\n".join(filtered_lines)
return content
class ArxivSearchTool(Tool):
name = "arxiv_search"
description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, abstracts, and download links."""
inputs = {
"query": {
"type": "string",
"description": "A research-related query (e.g., 'AI regulation')",
},
"from_date": {
"type": "string",
"description": "Optional search start date in format (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')",
"nullable": True,
},
"to_date": {
"type": "string",
"description": "Optional search end date in (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')",
"nullable": True,
},
}
output_type = "string"
def forward(
self,
query: str,
from_date: str = None,
to_date: str = None,
) -> str:
# 1) build URL
url = build_arxiv_url(query, from_date, to_date, size=50)
# 2) fetch & parse
try:
papers = fetch_and_parse_arxiv(url)
except Exception as e:
return f"❌ Failed to fetch or parse arXiv results: {e}"
if not papers:
return "No results found for your query."
# 3) format into a single string
output_lines = []
for idx, p in enumerate(papers, start=1):
output_lines += [
f"🔍 RESULT {idx}",
f"Title : {p['title']}",
f"Authors : {p['authors']}",
f"Published : {p['published']}",
f"Summary : {p['abstract'][:500]}{'...' if len(p['abstract'])>500 else ''}",
f"Entry ID : {p['entry_link']}",
f"Download link: {p['download_link']}",
"",
]
return "\n".join(output_lines).strip()
def fetch_and_parse_arxiv(url: str) -> List[Dict[str, str]]:
"""
Fetches the given arXiv advanced‐search URL, parses the HTML,
and returns a list of results. Each result is a dict containing:
- title
- authors
- published
- abstract
- entry_link
- doi (or "[N/A]" if none)
"""
resp = requests.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for li in soup.find_all("li", class_="arxiv-result"):
# Title
t = li.find("p", class_="title")
title = t.get_text(strip=True) if t else ""
# Authors
a = li.find("p", class_="authors")
authors = a.get_text(strip=True).replace("Authors:", "").strip() if a else ""
# Abstract
ab = li.find("span", class_="abstract-full")
abstract = (
ab.get_text(strip=True).replace("Abstract:", "").strip() if ab else ""
)
# Published date
d = li.find("p", class_="is-size-7")
published = d.get_text(strip=True) if d else ""
# Entry link
lt = li.find("p", class_="list-title")
entry_link = lt.find("a")["href"] if lt and lt.find("a") else ""
# DOI
idblock = li.find("p", class_="list-identifier")
if idblock:
for a_tag in idblock.find_all("a", href=True):
if "doi.org" in a_tag["href"]:
doi = a_tag["href"]
break
results.append(
{
"title": title,
"authors": authors,
"published": published,
"abstract": abstract,
"entry_link": entry_link,
"download_link": (
entry_link.replace("abs", "pdf") if "abs" in entry_link else "N/A"
),
}
)
return results
def build_arxiv_url(
query: str, from_date: str = None, to_date: str = None, size: int = 50
) -> str:
"""
Build an arXiv advanced-search URL matching the exact segment order:
1) ?advanced
2) terms-0-operator=AND
3) terms-0-term=…
4) terms-0-field=all
5) classification-physics_archives=all
6) classification-include_cross_list=include
[ optional date‐range block ]
7) abstracts=show
8) size=…
9) order=-announced_date_first
If from_date or to_date is None, the date-range block is omitted.
"""
base = "https://arxiv.org/search/advanced?advanced="
parts = [
"&terms-0-operator=AND",
f"&terms-0-term={quote_plus(query)}",
"&terms-0-field=all",
"&classification-physics_archives=all",
"&classification-include_cross_list=include",
]
# optional date-range filtering
if from_date and to_date:
parts += [
"&date-year=",
"&date-filter_by=date_range",
f"&date-from_date={from_date}",
f"&date-to_date={to_date}",
"&date-date_type=submitted_date",
]
parts += [
"&abstracts=show",
f"&size={size}",
"&order=-announced_date_first",
]
return base + "".join(parts)