|
import html |
|
import json |
|
import mimetypes |
|
import os |
|
import re |
|
import time |
|
import traceback |
|
from pathlib import Path |
|
from typing import Dict, List |
|
from urllib.parse import quote_plus, urlparse |
|
|
|
import chromadb |
|
import chromadb.utils.embedding_functions as embedding_functions |
|
import fitz |
|
import pandas as pd |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from dotenv import load_dotenv |
|
from duckduckgo_search import DDGS |
|
from duckduckgo_search.exceptions import ( |
|
ConversationLimitException, |
|
DuckDuckGoSearchException, |
|
RatelimitException, |
|
TimeoutException, |
|
) |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.document_loaders import ( |
|
BSHTMLLoader, |
|
JSONLoader, |
|
PyPDFLoader, |
|
TextLoader, |
|
UnstructuredFileLoader, |
|
) |
|
from langchain_community.tools import BraveSearch |
|
from markdownify import markdownify |
|
from ollama import chat |
|
from PIL import Image |
|
from smolagents import Tool, tool |
|
from smolagents.utils import truncate_content |
|
|
|
load_dotenv() |
|
|
|
|
|
class ReadFileContentTool(Tool): |
|
name = "read_file_content" |
|
description = """Reads local files in various formats (text, CSV, Excel, PDF, HTML, etc.) and returns their content as readable text. Automatically detects and processes the appropriate file format.""" |
|
|
|
inputs = { |
|
"file_path": { |
|
"type": "string", |
|
"description": "The full path to the file from which the content should be read.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, file_path: str) -> str: |
|
if not os.path.exists(file_path): |
|
return f"❌ File does not exist: {file_path}" |
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
|
try: |
|
if ext == ".txt": |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
return truncate_content(f.read()) |
|
|
|
elif ext == ".csv": |
|
df = pd.read_csv(file_path) |
|
return truncate_content( |
|
f"CSV Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" |
|
) |
|
|
|
elif ext in [".xlsx", ".xls"]: |
|
df = pd.read_excel(file_path) |
|
return truncate_content( |
|
f"Excel Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" |
|
) |
|
|
|
elif ext == ".pdf": |
|
doc = fitz.open(file_path) |
|
text = "".join([page.get_text() for page in doc]) |
|
doc.close() |
|
return truncate_content( |
|
text.strip() or "⚠️ PDF contains no readable text." |
|
) |
|
|
|
elif ext == ".json": |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
return truncate_content(f.read()) |
|
|
|
elif ext == ".py": |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
return truncate_content(f.read()) |
|
|
|
elif ext in [".html", ".htm"]: |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
html = f.read() |
|
try: |
|
markdown = markdownify(html).strip() |
|
markdown = re.sub(r"\n{3,}", "\n\n", markdown) |
|
return f"📄 HTML content (converted to Markdown):\n\n{truncate_content(markdown)}" |
|
except Exception: |
|
soup = BeautifulSoup(html, "html.parser") |
|
text = soup.get_text(separator="\n").strip() |
|
return f"📄 HTML content (raw text fallback):\n\n{truncate_content(text)}" |
|
|
|
elif ext in [".mp3", ".wav"]: |
|
return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use transcribe_audio tool to process the audio content." |
|
|
|
elif ext in [".mp4", ".mov", ".avi"]: |
|
return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use transcribe_video tool to process the video content." |
|
|
|
else: |
|
return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" |
|
|
|
except Exception as e: |
|
return f"❌ Could not read {file_path}: {e}" |
|
|
|
|
|
class WikipediaSearchTool(Tool): |
|
name = "wikipedia_search" |
|
description = """Searches Wikipedia for a specific topic and returns a concise summary. Useful for background information on subjects, concepts, historical events, or scientific topics.""" |
|
|
|
inputs = { |
|
"query": { |
|
"type": "string", |
|
"description": "The query or subject to search for on Wikipedia.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, query: str) -> str: |
|
print(f"EXECUTING TOOL: wikipedia_search(query='{query}')") |
|
try: |
|
search_link = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json" |
|
search_response = requests.get(search_link, timeout=10) |
|
search_response.raise_for_status() |
|
search_data = search_response.json() |
|
|
|
if not search_data.get("query", {}).get("search", []): |
|
return f"No Wikipedia info for '{query}'." |
|
|
|
page_id = search_data["query"]["search"][0]["pageid"] |
|
|
|
content_link = ( |
|
f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" |
|
f"exintro=1&explaintext=1&pageids={page_id}&format=json" |
|
) |
|
content_response = requests.get(content_link, timeout=10) |
|
content_response.raise_for_status() |
|
content_data = content_response.json() |
|
|
|
extract = content_data["query"]["pages"][str(page_id)]["extract"] |
|
if len(extract) > 1500: |
|
extract = extract[:1500] + "..." |
|
|
|
result = f"Wikipedia summary for '{query}':\n{extract}" |
|
print(f"-> Tool Result (Wikipedia): {result[:100]}...") |
|
return result |
|
|
|
except Exception as e: |
|
print(f"❌ Error in wikipedia_search: {e}") |
|
traceback.print_exc() |
|
return f"Error wiki: {e}" |
|
|
|
|
|
class TranscribeAudioTool(Tool): |
|
name = "transcribe_audio" |
|
description = """Converts spoken content in audio files to text. Handles various audio formats and produces a transcript of the spoken content for analysis.""" |
|
|
|
inputs = { |
|
"file_path": { |
|
"type": "string", |
|
"description": "The full path to the audio file that needs to be transcribed.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, file_path: str) -> str: |
|
try: |
|
import os |
|
import tempfile |
|
|
|
import speech_recognition as sr |
|
from pydub import AudioSegment |
|
|
|
|
|
if not os.path.exists(file_path): |
|
return ( |
|
f"❌ Audio file not found at: {file_path}. Download the file first." |
|
) |
|
|
|
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
if file_ext != ".wav": |
|
|
|
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
|
|
|
|
|
audio = AudioSegment.from_file(file_path) |
|
audio.export(temp_wav, format="wav") |
|
audio_path = temp_wav |
|
else: |
|
audio_path = file_path |
|
|
|
|
|
with sr.AudioFile(audio_path) as source: |
|
audio_data = recognizer.record(source) |
|
transcript = recognizer.recognize_google(audio_data) |
|
|
|
|
|
if file_ext != ".wav" and os.path.exists(temp_wav): |
|
os.remove(temp_wav) |
|
|
|
return transcript.strip() |
|
|
|
except Exception as e: |
|
return f"❌ Transcription failed: {str(e)}" |
|
|
|
|
|
class TranscibeVideoFileTool(Tool): |
|
name = "transcribe_video" |
|
description = """Extracts and transcribes speech from video files. Converts the audio portion of videos into readable text for analysis or reference.""" |
|
|
|
inputs = { |
|
"file_path": { |
|
"type": "string", |
|
"description": "The full path to the video file that needs to be transcribed.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, file_path: str) -> str: |
|
try: |
|
|
|
if not os.path.exists(file_path): |
|
return ( |
|
f"❌ Video file not found at: {file_path}. Download the file first." |
|
) |
|
|
|
import os |
|
import tempfile |
|
|
|
import moviepy.editor as mp |
|
import speech_recognition as sr |
|
|
|
|
|
video = mp.VideoFileClip(file_path) |
|
|
|
|
|
temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
|
|
|
|
|
video.audio.write_audiofile(temp_audio, verbose=False, logger=None) |
|
video.close() |
|
|
|
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
with sr.AudioFile(temp_audio) as source: |
|
audio_data = recognizer.record(source) |
|
transcript = recognizer.recognize_google(audio_data) |
|
|
|
|
|
if os.path.exists(temp_audio): |
|
os.remove(temp_audio) |
|
|
|
return transcript.strip() |
|
|
|
except Exception as e: |
|
return f"❌ Video processing failed: {str(e)}" |
|
|
|
|
|
class BraveWebSearchTool(Tool): |
|
name = "web_search" |
|
description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" |
|
|
|
inputs = { |
|
"query": { |
|
"type": "string", |
|
"description": "A web search query string (e.g., a question or query).", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
|
|
api_key = "asdasfd" |
|
count = 3 |
|
char_limit = 4000 |
|
tool = BraveSearch.from_api_key(api_key=api_key, search_kwargs={"count": count}) |
|
|
|
def extract_main_text(self, url: str, char_limit: int) -> str: |
|
try: |
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
response = requests.get(url, headers=headers, timeout=10) |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
for tag in soup(["script", "style", "noscript"]): |
|
tag.extract() |
|
|
|
|
|
body = soup.body |
|
if not body: |
|
return "⚠️ Could not extract content." |
|
|
|
text = " ".join(t.strip() for t in body.stripped_strings) |
|
return text[:char_limit].strip() |
|
except Exception as e: |
|
return f"⚠️ Failed to extract article: {e}" |
|
|
|
def forward(self, query: str) -> str: |
|
try: |
|
results_json = self.tool.run(query) |
|
results = ( |
|
json.loads(results_json) |
|
if isinstance(results_json, str) |
|
else results_json |
|
) |
|
|
|
output_parts = [] |
|
for i, r in enumerate(results[: self.count], start=1): |
|
title = html.unescape(r.get("title", "").strip()) |
|
link = r.get("link", "").strip() |
|
|
|
article_text = self.extract_main_text(link, self.char_limit) |
|
|
|
result_block = ( |
|
f"Result {i}:\n" |
|
f"Title: {title}\n" |
|
f"URL: {link}\n" |
|
f"Extracted Content:\n{article_text}\n" |
|
) |
|
output_parts.append(result_block) |
|
|
|
return "\n\n".join(output_parts).strip() |
|
|
|
except Exception as e: |
|
return f"Search failed: {str(e)}" |
|
|
|
|
|
class DescribeImageTool(Tool): |
|
name = "describe_image" |
|
description = """Analyzes images and generates detailed text descriptions. Identifies objects, scenes, text, and visual elements within the image to provide context or understanding.""" |
|
|
|
inputs = { |
|
"image_path": { |
|
"type": "string", |
|
"description": "The full path to the image file to describe.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, image_path: str) -> str: |
|
import os |
|
|
|
from PIL import Image |
|
from transformers import BlipForConditionalGeneration, BlipProcessor |
|
|
|
if not os.path.exists(image_path): |
|
return f"❌ Image file does not exist: {image_path}" |
|
|
|
try: |
|
processor = BlipProcessor.from_pretrained( |
|
"Salesforce/blip-image-captioning-base", use_fast=True |
|
) |
|
model = BlipForConditionalGeneration.from_pretrained( |
|
"Salesforce/blip-image-captioning-base" |
|
) |
|
|
|
image = Image.open(image_path).convert("RGB") |
|
inputs = processor(images=image, return_tensors="pt") |
|
output_ids = model.generate(**inputs) |
|
|
|
caption = processor.decode(output_ids[0], skip_special_tokens=True) |
|
return caption.strip() or "⚠️ No caption could be generated." |
|
except Exception as e: |
|
return f"❌ Failed to describe image: {e}" |
|
|
|
|
|
class DownloadFileFromLinkTool(Tool): |
|
name = "download_file_from_link" |
|
description = "Downloads files from a URL and saves them locally. Supports various formats including PDFs, documents, images, and data files. Returns the local file path for further processing." |
|
|
|
inputs = { |
|
"link": {"type": "string", "description": "The URL to download the file from."}, |
|
"file_name": { |
|
"type": "string", |
|
"description": "Desired name of the saved file, without extension.", |
|
"nullable": True, |
|
}, |
|
} |
|
|
|
output_type = "string" |
|
SUPPORTED_EXTENSIONS = { |
|
".xlsx", |
|
".pdf", |
|
".txt", |
|
".csv", |
|
".json", |
|
".xml", |
|
".html", |
|
".jpg", |
|
".jpeg", |
|
".png", |
|
".mp4", |
|
".mp3", |
|
".wav", |
|
".zip", |
|
} |
|
|
|
def forward(self, link: str, file_name: str = "taskfile") -> str: |
|
print(f"⬇️ Downloading file from: {link}") |
|
dir_path = "./downloads" |
|
os.makedirs(dir_path, exist_ok=True) |
|
|
|
try: |
|
response = requests.get(link, stream=True, timeout=30) |
|
except requests.RequestException as e: |
|
return f"❌ Error: Request failed - {e}" |
|
|
|
if response.status_code != 200: |
|
return ( |
|
f"❌ Error: Unable to fetch file. Status code: {response.status_code}" |
|
) |
|
|
|
|
|
base_name, provided_ext = os.path.splitext(file_name) |
|
provided_ext = provided_ext.lower() |
|
|
|
|
|
if provided_ext and provided_ext in self.SUPPORTED_EXTENSIONS: |
|
ext = provided_ext |
|
else: |
|
|
|
content_type = ( |
|
response.headers.get("Content-Type", "").split(";")[0].strip() |
|
) |
|
guessed_ext = mimetypes.guess_extension(content_type or "") or "" |
|
|
|
|
|
if guessed_ext in ("", ".bin"): |
|
parsed_link = urlparse(link) |
|
_, url_ext = os.path.splitext(parsed_link.path) |
|
if url_ext.lower() in self.SUPPORTED_EXTENSIONS: |
|
ext = url_ext.lower() |
|
else: |
|
return f"⚠️ Warning: Cannot determine a valid file extension from '{content_type}' or URL. Please retry with an explicit valid filename and extension." |
|
else: |
|
ext = guessed_ext |
|
|
|
|
|
file_path = os.path.join(dir_path, base_name + ext) |
|
downloaded = 0 |
|
|
|
with open(file_path, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=1024): |
|
if chunk: |
|
f.write(chunk) |
|
downloaded += len(chunk) |
|
|
|
return file_path |
|
|
|
|
|
class DuckDuckGoSearchTool(Tool): |
|
name = "web_search" |
|
description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" |
|
|
|
inputs = { |
|
"query": { |
|
"type": "string", |
|
"description": "The search query to run on DuckDuckGo", |
|
}, |
|
} |
|
output_type = "string" |
|
|
|
def _configure(self, max_retries: int = 5, retry_sleep: int = 2): |
|
self._max_retries = max_retries |
|
self._retry_sleep = retry_sleep |
|
|
|
def forward(self, query: str) -> str: |
|
self._configure() |
|
|
|
top_results = 5 |
|
|
|
retries = 0 |
|
max_retries = getattr(self, "_max_retries", 3) |
|
retry_sleep = getattr(self, "_retry_sleep", 2) |
|
|
|
while retries < max_retries: |
|
try: |
|
results = DDGS().text( |
|
keywords=query, |
|
region="wt-wt", |
|
safesearch="moderate", |
|
max_results=top_results, |
|
) |
|
|
|
if not results: |
|
return "No results found." |
|
|
|
output_lines = [] |
|
for idx, res in enumerate(results[:top_results], start=1): |
|
title = res.get("title", "N/A") |
|
url = res.get("href", "N/A") |
|
snippet = res.get("body", "N/A") |
|
|
|
output_lines.append( |
|
f"Result {idx}:\n" |
|
f"Title: {title}\n" |
|
f"URL: {url}\n" |
|
f"Snippet: {snippet}\n" |
|
) |
|
|
|
output = "\n".join(output_lines) |
|
|
|
print(f"-> Tool Result (DuckDuckGo): {output[:1500]}...") |
|
return output |
|
|
|
except ( |
|
DuckDuckGoSearchException, |
|
TimeoutException, |
|
RatelimitException, |
|
ConversationLimitException, |
|
) as e: |
|
retries += 1 |
|
self._retry_sleep +=2 |
|
print( |
|
f"⚠️ DuckDuckGo Exception (Attempt {retries}/{max_retries}): {type(e).__name__}: {e}" |
|
) |
|
traceback.print_exc() |
|
time.sleep(retry_sleep) |
|
|
|
except Exception as e: |
|
print(f"❌ Unexpected Error: {e}") |
|
traceback.print_exc() |
|
return f"Unhandled exception during DuckDuckGo search: {e}" |
|
|
|
return f"❌ Failed to retrieve results after {max_retries} retries." |
|
|
|
|
|
huggingface_ef = embedding_functions.HuggingFaceEmbeddingFunction( |
|
model_name="sentence-transformers/all-mpnet-base-v2" |
|
) |
|
SUPPORTED_EXTENSIONS = [ |
|
".txt", |
|
".md", |
|
".py", |
|
".pdf", |
|
".json", |
|
".jsonl", |
|
".html", |
|
".htm", |
|
] |
|
|
|
|
|
class AddDocumentToVectorStoreTool(Tool): |
|
name = "add_document_to_vector_store" |
|
description = "Processes a document and adds it to the vector database for semantic search. Automatically chunks files and creates text embeddings to enable powerful content retrieval." |
|
|
|
inputs = { |
|
"file_path": { |
|
"type": "string", |
|
"description": "Absolute path to the file to be indexed.", |
|
} |
|
} |
|
|
|
output_type = "string" |
|
|
|
def _load_file(self, path: Path): |
|
"""Select the right loader for the file extension.""" |
|
if path.suffix == ".pdf": |
|
return PyPDFLoader(str(path)).load() |
|
elif path.suffix == ".json": |
|
return JSONLoader(str(path), jq_schema=".").load() |
|
elif path.suffix in [".md"]: |
|
return UnstructuredFileLoader(str(path)).load() |
|
elif path.suffix in [".html", ".htm"]: |
|
return BSHTMLLoader(str(path)).load() |
|
else: |
|
return TextLoader(str(path)).load() |
|
|
|
def forward(self, file_path: str) -> str: |
|
print(f"📄 Adding document to vector store: {file_path}") |
|
try: |
|
collection_name = "vectorstore" |
|
path = Path(file_path) |
|
if not path.exists() or path.suffix not in SUPPORTED_EXTENSIONS: |
|
return f"Unsupported or missing file: {file_path}" |
|
|
|
docs = self._load_file(path) |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=500, chunk_overlap=50 |
|
) |
|
split_docs = text_splitter.split_documents(docs) |
|
|
|
client = chromadb.Client( |
|
chromadb.config.Settings( |
|
persist_directory="./chroma_store", |
|
) |
|
) |
|
|
|
collection = client.get_or_create_collection( |
|
name=collection_name, |
|
configuration={"embedding_function": huggingface_ef}, |
|
) |
|
|
|
texts = [doc.page_content for doc in split_docs] |
|
metadatas = [doc.metadata for doc in split_docs] |
|
|
|
collection.add( |
|
documents=texts, |
|
metadatas=metadatas, |
|
ids=[f"{path.stem}_{i}" for i in range(len(texts))], |
|
) |
|
|
|
return f"✅ Successfully added {len(texts)} chunks from '{file_path}' to collection '{collection_name}'." |
|
|
|
except Exception as e: |
|
print(f"❌ Error in add_to_vector_store: {e}") |
|
traceback.print_exc() |
|
return f"Error: {e}" |
|
|
|
|
|
class QueryVectorStoreTool(Tool): |
|
name = "query_downloaded_documents" |
|
description = "Performs semantic searches across your downloaded documents. Use detailed queries to find specific information, concepts, or answers from your collected resources." |
|
|
|
inputs = { |
|
"query": { |
|
"type": "string", |
|
"description": "The search query. Ensure this is constructed intelligently so to retrieve the most relevant outputs.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, query: str) -> str: |
|
collection_name = "vectorstore" |
|
|
|
k = 5 |
|
|
|
print(f"🔎 Querying vector store '{collection_name}' with: '{query}'") |
|
try: |
|
client = chromadb.Client( |
|
chromadb.config.Settings( |
|
persist_directory="./chroma_store", |
|
) |
|
) |
|
collection = client.get_collection(name=collection_name) |
|
|
|
results = collection.query( |
|
query_texts=[query], |
|
n_results=k, |
|
) |
|
|
|
formatted = [] |
|
for i in range(len(results["documents"][0])): |
|
doc = results["documents"][0][i] |
|
metadata = results["metadatas"][0][i] |
|
formatted.append( |
|
f"Result {i+1}:\n" f"Content: {doc}\n" f"Metadata: {metadata}\n" |
|
) |
|
|
|
return "\n".join(formatted) or "No relevant documents found." |
|
|
|
except Exception as e: |
|
print(f"❌ Error in query_vector_store: {e}") |
|
traceback.print_exc() |
|
return f"Error querying vector store: {e}" |
|
|
|
|
|
@tool |
|
def image_question_answering(image_path: str, prompt: str) -> str: |
|
""" |
|
Analyzes images and answers specific questions about their content. Can identify objects, read text, describe scenes, or interpret visual information based on your questions. |
|
|
|
Args: |
|
image_path: The path to the image file |
|
prompt: The question to ask about the image |
|
|
|
Returns: |
|
A string answer generated by the local Ollama model |
|
""" |
|
|
|
file_extension = image_path.lower().split(".")[-1] |
|
if file_extension not in ["jpg", "jpeg", "png", "bmp", "gif", "webp"]: |
|
return "Unsupported file type. Please provide an image." |
|
|
|
path = Path(image_path) |
|
if not path.exists(): |
|
return f"File not found at: {image_path}" |
|
|
|
|
|
response = chat( |
|
model="llava", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": prompt, |
|
"images": [path], |
|
}, |
|
], |
|
options={"temperature": 0.2}, |
|
) |
|
|
|
return response.message.content.strip() |
|
|
|
|
|
class VisitWebpageTool(Tool): |
|
name = "visit_webpage" |
|
description = "Loads a webpage from a URL and converts its content to markdown format. Use this to browse websites, extract information, or identify downloadable resources from a specific web address." |
|
inputs = { |
|
"url": { |
|
"type": "string", |
|
"description": "The url of the webpage to visit.", |
|
} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, url: str) -> str: |
|
try: |
|
from urllib.parse import urlparse |
|
|
|
import requests |
|
from bs4 import BeautifulSoup |
|
from markdownify import markdownify |
|
from requests.exceptions import RequestException |
|
from smolagents.utils import truncate_content |
|
except ImportError as e: |
|
raise ImportError( |
|
"You must install packages `markdownify`, `requests`, and `beautifulsoup4` to run this tool: for instance run `pip install markdownify requests beautifulsoup4`." |
|
) from e |
|
|
|
try: |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
} |
|
response = requests.get(url, headers=headers, timeout=20) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
domain = urlparse(url).netloc |
|
|
|
|
|
self._remove_clutter(soup) |
|
|
|
|
|
main_content = self._extract_main_content(soup) |
|
|
|
if main_content: |
|
|
|
markdown_content = markdownify(str(main_content)).strip() |
|
else: |
|
|
|
markdown_content = markdownify(str(soup)).strip() |
|
|
|
|
|
markdown_content = self._clean_markdown(markdown_content) |
|
|
|
|
|
result = f"Content from {domain}:\n\n{markdown_content}" |
|
|
|
return truncate_content(result, 40000) |
|
|
|
except requests.exceptions.Timeout: |
|
return "The request timed out. Please try again later or check the URL." |
|
except RequestException as e: |
|
return f"Error fetching the webpage: {str(e)}" |
|
except Exception as e: |
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
def _remove_clutter(self, soup): |
|
"""Remove common elements that clutter web pages.""" |
|
|
|
clutter_selectors = [ |
|
"header", |
|
"footer", |
|
"nav", |
|
".nav", |
|
".navigation", |
|
".menu", |
|
".sidebar", |
|
".footer", |
|
".header", |
|
"#footer", |
|
"#header", |
|
"#nav", |
|
"#sidebar", |
|
".widget", |
|
".cookie", |
|
".cookies", |
|
".ad", |
|
".ads", |
|
".advertisement", |
|
"script", |
|
"style", |
|
"noscript", |
|
"iframe", |
|
".social", |
|
".share", |
|
".comment", |
|
".comments", |
|
".subscription", |
|
".newsletter", |
|
'[role="banner"]', |
|
'[role="navigation"]', |
|
'[role="complementary"]', |
|
] |
|
|
|
for selector in clutter_selectors: |
|
for element in soup.select(selector): |
|
element.decompose() |
|
|
|
|
|
for hidden in soup.select( |
|
'[style*="display: none"], [style*="display:none"], [style*="visibility: hidden"], [style*="visibility:hidden"], [hidden]' |
|
): |
|
hidden.decompose() |
|
|
|
def _extract_main_content(self, soup): |
|
"""Try to identify and extract the main content of the page.""" |
|
|
|
main_content_selectors = [ |
|
"main", |
|
'[role="main"]', |
|
"article", |
|
".content", |
|
".main-content", |
|
".post-content", |
|
"#content", |
|
"#main", |
|
"#main-content", |
|
".article", |
|
".post", |
|
".entry", |
|
".page-content", |
|
".entry-content", |
|
] |
|
|
|
|
|
for selector in main_content_selectors: |
|
main_content = soup.select(selector) |
|
if main_content: |
|
|
|
if len(main_content) > 1: |
|
return max(main_content, key=lambda x: len(x.get_text())) |
|
return main_content[0] |
|
|
|
|
|
paragraphs = soup.find_all("p") |
|
if paragraphs: |
|
|
|
parents = {} |
|
for p in paragraphs: |
|
if p.parent: |
|
if p.parent not in parents: |
|
parents[p.parent] = 0 |
|
parents[p.parent] += 1 |
|
|
|
if parents: |
|
|
|
return max(parents.items(), key=lambda x: x[1])[0] |
|
|
|
|
|
return None |
|
|
|
def _clean_markdown(self, content): |
|
"""Clean up the markdown content.""" |
|
|
|
content = re.sub(r"\n{3,}", "\n\n", content) |
|
|
|
|
|
content = re.sub(r"(\[.*?\]\(.*?\))\s*\1+", r"\1", content) |
|
|
|
|
|
lines = content.split("\n") |
|
filtered_lines = [] |
|
|
|
|
|
short_line_threshold = 40 |
|
consecutive_short_lines = 0 |
|
max_consecutive_short_lines = 3 |
|
|
|
for line in lines: |
|
stripped_line = line.strip() |
|
if len( |
|
stripped_line |
|
) < short_line_threshold and not stripped_line.startswith("#"): |
|
consecutive_short_lines += 1 |
|
if consecutive_short_lines > max_consecutive_short_lines: |
|
continue |
|
else: |
|
consecutive_short_lines = 0 |
|
|
|
filtered_lines.append(line) |
|
|
|
content = "\n".join(filtered_lines) |
|
|
|
|
|
seen_headers = set() |
|
lines = content.split("\n") |
|
filtered_lines = [] |
|
|
|
for line in lines: |
|
if line.startswith("#"): |
|
header_text = line.strip() |
|
if header_text in seen_headers: |
|
continue |
|
seen_headers.add(header_text) |
|
filtered_lines.append(line) |
|
|
|
content = "\n".join(filtered_lines) |
|
|
|
|
|
footer_patterns = [ |
|
r"^copyright", |
|
r"^©", |
|
r"^all rights reserved", |
|
r"^terms", |
|
r"^privacy policy", |
|
r"^contact us", |
|
r"^follow us", |
|
r"^social media", |
|
r"^disclaimer", |
|
] |
|
|
|
footer_pattern = "|".join(footer_patterns) |
|
lines = content.split("\n") |
|
filtered_lines = [] |
|
|
|
for line in lines: |
|
if not re.search(footer_pattern, line.lower()): |
|
filtered_lines.append(line) |
|
|
|
content = "\n".join(filtered_lines) |
|
|
|
return content |
|
|
|
|
|
class ArxivSearchTool(Tool): |
|
name = "arxiv_search" |
|
description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, abstracts, and download links.""" |
|
|
|
inputs = { |
|
"query": { |
|
"type": "string", |
|
"description": "A research-related query (e.g., 'AI regulation')", |
|
}, |
|
"from_date": { |
|
"type": "string", |
|
"description": "Optional search start date in format (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", |
|
"nullable": True, |
|
}, |
|
"to_date": { |
|
"type": "string", |
|
"description": "Optional search end date in (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", |
|
"nullable": True, |
|
}, |
|
} |
|
|
|
output_type = "string" |
|
|
|
def forward( |
|
self, |
|
query: str, |
|
from_date: str = None, |
|
to_date: str = None, |
|
) -> str: |
|
|
|
url = build_arxiv_url(query, from_date, to_date, size=50) |
|
|
|
|
|
try: |
|
papers = fetch_and_parse_arxiv(url) |
|
except Exception as e: |
|
return f"❌ Failed to fetch or parse arXiv results: {e}" |
|
|
|
if not papers: |
|
return "No results found for your query." |
|
|
|
|
|
output_lines = [] |
|
for idx, p in enumerate(papers, start=1): |
|
output_lines += [ |
|
f"🔍 RESULT {idx}", |
|
f"Title : {p['title']}", |
|
f"Authors : {p['authors']}", |
|
f"Published : {p['published']}", |
|
f"Summary : {p['abstract'][:500]}{'...' if len(p['abstract'])>500 else ''}", |
|
f"Entry ID : {p['entry_link']}", |
|
f"Download link: {p['download_link']}", |
|
"", |
|
] |
|
|
|
return "\n".join(output_lines).strip() |
|
|
|
|
|
def fetch_and_parse_arxiv(url: str) -> List[Dict[str, str]]: |
|
""" |
|
Fetches the given arXiv advanced‐search URL, parses the HTML, |
|
and returns a list of results. Each result is a dict containing: |
|
- title |
|
- authors |
|
- published |
|
- abstract |
|
- entry_link |
|
- doi (or "[N/A]" if none) |
|
""" |
|
resp = requests.get(url) |
|
resp.raise_for_status() |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
results = [] |
|
for li in soup.find_all("li", class_="arxiv-result"): |
|
|
|
t = li.find("p", class_="title") |
|
title = t.get_text(strip=True) if t else "" |
|
|
|
|
|
a = li.find("p", class_="authors") |
|
authors = a.get_text(strip=True).replace("Authors:", "").strip() if a else "" |
|
|
|
|
|
ab = li.find("span", class_="abstract-full") |
|
abstract = ( |
|
ab.get_text(strip=True).replace("Abstract:", "").strip() if ab else "" |
|
) |
|
|
|
|
|
d = li.find("p", class_="is-size-7") |
|
published = d.get_text(strip=True) if d else "" |
|
|
|
|
|
lt = li.find("p", class_="list-title") |
|
entry_link = lt.find("a")["href"] if lt and lt.find("a") else "" |
|
|
|
|
|
idblock = li.find("p", class_="list-identifier") |
|
if idblock: |
|
for a_tag in idblock.find_all("a", href=True): |
|
if "doi.org" in a_tag["href"]: |
|
doi = a_tag["href"] |
|
break |
|
|
|
results.append( |
|
{ |
|
"title": title, |
|
"authors": authors, |
|
"published": published, |
|
"abstract": abstract, |
|
"entry_link": entry_link, |
|
"download_link": ( |
|
entry_link.replace("abs", "pdf") if "abs" in entry_link else "N/A" |
|
), |
|
} |
|
) |
|
|
|
return results |
|
|
|
|
|
def build_arxiv_url( |
|
query: str, from_date: str = None, to_date: str = None, size: int = 50 |
|
) -> str: |
|
""" |
|
Build an arXiv advanced-search URL matching the exact segment order: |
|
1) ?advanced |
|
2) terms-0-operator=AND |
|
3) terms-0-term=… |
|
4) terms-0-field=all |
|
5) classification-physics_archives=all |
|
6) classification-include_cross_list=include |
|
[ optional date‐range block ] |
|
7) abstracts=show |
|
8) size=… |
|
9) order=-announced_date_first |
|
If from_date or to_date is None, the date-range block is omitted. |
|
""" |
|
base = "https://arxiv.org/search/advanced?advanced=" |
|
parts = [ |
|
"&terms-0-operator=AND", |
|
f"&terms-0-term={quote_plus(query)}", |
|
"&terms-0-field=all", |
|
"&classification-physics_archives=all", |
|
"&classification-include_cross_list=include", |
|
] |
|
|
|
|
|
if from_date and to_date: |
|
parts += [ |
|
"&date-year=", |
|
"&date-filter_by=date_range", |
|
f"&date-from_date={from_date}", |
|
f"&date-to_date={to_date}", |
|
"&date-date_type=submitted_date", |
|
] |
|
|
|
parts += [ |
|
"&abstracts=show", |
|
f"&size={size}", |
|
"&order=-announced_date_first", |
|
] |
|
|
|
return base + "".join(parts) |
|
|