import html import json import mimetypes import os import re import time import traceback from pathlib import Path from typing import Dict, List from urllib.parse import quote_plus, urlparse import chromadb import chromadb.utils.embedding_functions as embedding_functions import fitz # PyMuPDF import pandas as pd import requests from bs4 import BeautifulSoup from dotenv import load_dotenv from duckduckgo_search import DDGS from duckduckgo_search.exceptions import ( ConversationLimitException, DuckDuckGoSearchException, RatelimitException, TimeoutException, ) from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import ( BSHTMLLoader, JSONLoader, PyPDFLoader, TextLoader, UnstructuredFileLoader, ) from langchain_community.tools import BraveSearch from markdownify import markdownify from ollama import chat from PIL import Image from smolagents import Tool, tool from smolagents.utils import truncate_content load_dotenv() class ReadFileContentTool(Tool): name = "read_file_content" description = """Reads local files in various formats (text, CSV, Excel, PDF, HTML, etc.) and returns their content as readable text. Automatically detects and processes the appropriate file format.""" inputs = { "file_path": { "type": "string", "description": "The full path to the file from which the content should be read.", } } output_type = "string" def forward(self, file_path: str) -> str: if not os.path.exists(file_path): return f"❌ File does not exist: {file_path}" ext = os.path.splitext(file_path)[1].lower() try: if ext == ".txt": with open(file_path, "r", encoding="utf-8") as f: return truncate_content(f.read()) elif ext == ".csv": df = pd.read_csv(file_path) return truncate_content( f"CSV Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" ) elif ext in [".xlsx", ".xls"]: df = pd.read_excel(file_path) return truncate_content( f"Excel Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" ) elif ext == ".pdf": doc = fitz.open(file_path) text = "".join([page.get_text() for page in doc]) doc.close() return truncate_content( text.strip() or "⚠️ PDF contains no readable text." ) elif ext == ".json": with open(file_path, "r", encoding="utf-8") as f: return truncate_content(f.read()) elif ext == ".py": with open(file_path, "r", encoding="utf-8") as f: return truncate_content(f.read()) elif ext in [".html", ".htm"]: with open(file_path, "r", encoding="utf-8") as f: html = f.read() try: markdown = markdownify(html).strip() markdown = re.sub(r"\n{3,}", "\n\n", markdown) return f"📄 HTML content (converted to Markdown):\n\n{truncate_content(markdown)}" except Exception: soup = BeautifulSoup(html, "html.parser") text = soup.get_text(separator="\n").strip() return f"📄 HTML content (raw text fallback):\n\n{truncate_content(text)}" elif ext in [".mp3", ".wav"]: return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use transcribe_audio tool to process the audio content." elif ext in [".mp4", ".mov", ".avi"]: return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use transcribe_video tool to process the video content." else: return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" except Exception as e: return f"❌ Could not read {file_path}: {e}" class WikipediaSearchTool(Tool): name = "wikipedia_search" description = """Searches Wikipedia for a specific topic and returns a concise summary. Useful for background information on subjects, concepts, historical events, or scientific topics.""" inputs = { "query": { "type": "string", "description": "The query or subject to search for on Wikipedia.", } } output_type = "string" def forward(self, query: str) -> str: print(f"EXECUTING TOOL: wikipedia_search(query='{query}')") try: search_link = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json" search_response = requests.get(search_link, timeout=10) search_response.raise_for_status() search_data = search_response.json() if not search_data.get("query", {}).get("search", []): return f"No Wikipedia info for '{query}'." page_id = search_data["query"]["search"][0]["pageid"] content_link = ( f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" f"exintro=1&explaintext=1&pageids={page_id}&format=json" ) content_response = requests.get(content_link, timeout=10) content_response.raise_for_status() content_data = content_response.json() extract = content_data["query"]["pages"][str(page_id)]["extract"] if len(extract) > 1500: extract = extract[:1500] + "..." result = f"Wikipedia summary for '{query}':\n{extract}" print(f"-> Tool Result (Wikipedia): {result[:100]}...") return result except Exception as e: print(f"❌ Error in wikipedia_search: {e}") traceback.print_exc() return f"Error wiki: {e}" class TranscribeAudioTool(Tool): name = "transcribe_audio" description = """Converts spoken content in audio files to text. Handles various audio formats and produces a transcript of the spoken content for analysis.""" inputs = { "file_path": { "type": "string", "description": "The full path to the audio file that needs to be transcribed.", } } output_type = "string" def forward(self, file_path: str) -> str: try: import os import tempfile import speech_recognition as sr from pydub import AudioSegment # Verify file exists if not os.path.exists(file_path): return ( f"❌ Audio file not found at: {file_path}. Download the file first." ) # Initialize recognizer recognizer = sr.Recognizer() # Convert to WAV if not already (needed for speech_recognition) file_ext = os.path.splitext(file_path)[1].lower() if file_ext != ".wav": # Create temp WAV file temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name # Convert to WAV using pydub audio = AudioSegment.from_file(file_path) audio.export(temp_wav, format="wav") audio_path = temp_wav else: audio_path = file_path # Transcribe audio using Google's speech recognition with sr.AudioFile(audio_path) as source: audio_data = recognizer.record(source) transcript = recognizer.recognize_google(audio_data) # Clean up temp file if created if file_ext != ".wav" and os.path.exists(temp_wav): os.remove(temp_wav) return transcript.strip() except Exception as e: return f"❌ Transcription failed: {str(e)}" class TranscibeVideoFileTool(Tool): name = "transcribe_video" description = """Extracts and transcribes speech from video files. Converts the audio portion of videos into readable text for analysis or reference.""" inputs = { "file_path": { "type": "string", "description": "The full path to the video file that needs to be transcribed.", } } output_type = "string" def forward(self, file_path: str) -> str: try: # Verify file exists if not os.path.exists(file_path): return ( f"❌ Video file not found at: {file_path}. Download the file first." ) import os import tempfile import moviepy.editor as mp import speech_recognition as sr # Extract audio from video video = mp.VideoFileClip(file_path) # Create temporary audio file temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name # Extract audio to WAV format (required for speech_recognition) video.audio.write_audiofile(temp_audio, verbose=False, logger=None) video.close() # Initialize recognizer recognizer = sr.Recognizer() # Transcribe audio with sr.AudioFile(temp_audio) as source: audio_data = recognizer.record(source) transcript = recognizer.recognize_google(audio_data) # Clean up temp file if os.path.exists(temp_audio): os.remove(temp_audio) return transcript.strip() except Exception as e: return f"❌ Video processing failed: {str(e)}" class BraveWebSearchTool(Tool): name = "web_search" description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" inputs = { "query": { "type": "string", "description": "A web search query string (e.g., a question or query).", } } output_type = "string" # api_key = os.getenv("BRAVE_SEARCH_API_KEY") api_key = "asdasfd" count = 3 char_limit = 4000 # Adjust based on LLM context window tool = BraveSearch.from_api_key(api_key=api_key, search_kwargs={"count": count}) def extract_main_text(self, url: str, char_limit: int) -> str: try: headers = {"User-Agent": "Mozilla/5.0"} response = requests.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.text, "html.parser") # Remove scripts/styles for tag in soup(["script", "style", "noscript"]): tag.extract() # Heuristic: extract visible text from body body = soup.body if not body: return "⚠️ Could not extract content." text = " ".join(t.strip() for t in body.stripped_strings) return text[:char_limit].strip() except Exception as e: return f"⚠️ Failed to extract article: {e}" def forward(self, query: str) -> str: try: results_json = self.tool.run(query) results = ( json.loads(results_json) if isinstance(results_json, str) else results_json ) output_parts = [] for i, r in enumerate(results[: self.count], start=1): title = html.unescape(r.get("title", "").strip()) link = r.get("link", "").strip() article_text = self.extract_main_text(link, self.char_limit) result_block = ( f"Result {i}:\n" f"Title: {title}\n" f"URL: {link}\n" f"Extracted Content:\n{article_text}\n" ) output_parts.append(result_block) return "\n\n".join(output_parts).strip() except Exception as e: return f"Search failed: {str(e)}" class DescribeImageTool(Tool): name = "describe_image" description = """Analyzes images and generates detailed text descriptions. Identifies objects, scenes, text, and visual elements within the image to provide context or understanding.""" inputs = { "image_path": { "type": "string", "description": "The full path to the image file to describe.", } } output_type = "string" def forward(self, image_path: str) -> str: import os from PIL import Image from transformers import BlipForConditionalGeneration, BlipProcessor if not os.path.exists(image_path): return f"❌ Image file does not exist: {image_path}" try: processor = BlipProcessor.from_pretrained( "Salesforce/blip-image-captioning-base", use_fast=True ) model = BlipForConditionalGeneration.from_pretrained( "Salesforce/blip-image-captioning-base" ) image = Image.open(image_path).convert("RGB") inputs = processor(images=image, return_tensors="pt") output_ids = model.generate(**inputs) caption = processor.decode(output_ids[0], skip_special_tokens=True) return caption.strip() or "⚠️ No caption could be generated." except Exception as e: return f"❌ Failed to describe image: {e}" class DownloadFileFromLinkTool(Tool): name = "download_file_from_link" description = "Downloads files from a URL and saves them locally. Supports various formats including PDFs, documents, images, and data files. Returns the local file path for further processing." inputs = { "link": {"type": "string", "description": "The URL to download the file from."}, "file_name": { "type": "string", "description": "Desired name of the saved file, without extension.", "nullable": True, }, } output_type = "string" SUPPORTED_EXTENSIONS = { ".xlsx", ".pdf", ".txt", ".csv", ".json", ".xml", ".html", ".jpg", ".jpeg", ".png", ".mp4", ".mp3", ".wav", ".zip", } def forward(self, link: str, file_name: str = "taskfile") -> str: print(f"⬇️ Downloading file from: {link}") dir_path = "./downloads" os.makedirs(dir_path, exist_ok=True) try: response = requests.get(link, stream=True, timeout=30) except requests.RequestException as e: return f"❌ Error: Request failed - {e}" if response.status_code != 200: return ( f"❌ Error: Unable to fetch file. Status code: {response.status_code}" ) # Step 1: Try extracting extension from provided filename base_name, provided_ext = os.path.splitext(file_name) provided_ext = provided_ext.lower() # Step 2: Check if provided extension is supported if provided_ext and provided_ext in self.SUPPORTED_EXTENSIONS: ext = provided_ext else: # Step 3: Try to infer from Content-Type content_type = ( response.headers.get("Content-Type", "").split(";")[0].strip() ) guessed_ext = mimetypes.guess_extension(content_type or "") or "" # Step 4: If mimetype returned .bin or nothing useful, try to fallback to URL if guessed_ext in ("", ".bin"): parsed_link = urlparse(link) _, url_ext = os.path.splitext(parsed_link.path) if url_ext.lower() in self.SUPPORTED_EXTENSIONS: ext = url_ext.lower() else: return f"⚠️ Warning: Cannot determine a valid file extension from '{content_type}' or URL. Please retry with an explicit valid filename and extension." else: ext = guessed_ext # Step 5: Final path and save file_path = os.path.join(dir_path, base_name + ext) downloaded = 0 with open(file_path, "wb") as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) downloaded += len(chunk) return file_path class DuckDuckGoSearchTool(Tool): name = "web_search" description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" inputs = { "query": { "type": "string", "description": "The search query to run on DuckDuckGo", }, } output_type = "string" def _configure(self, max_retries: int = 5, retry_sleep: int = 2): self._max_retries = max_retries self._retry_sleep = retry_sleep def forward(self, query: str) -> str: self._configure() top_results = 5 retries = 0 max_retries = getattr(self, "_max_retries", 3) retry_sleep = getattr(self, "_retry_sleep", 2) while retries < max_retries: try: results = DDGS().text( keywords=query, region="wt-wt", safesearch="moderate", max_results=top_results, ) if not results: return "No results found." output_lines = [] for idx, res in enumerate(results[:top_results], start=1): title = res.get("title", "N/A") url = res.get("href", "N/A") snippet = res.get("body", "N/A") output_lines.append( f"Result {idx}:\n" f"Title: {title}\n" f"URL: {url}\n" f"Snippet: {snippet}\n" ) output = "\n".join(output_lines) print(f"-> Tool Result (DuckDuckGo): {output[:1500]}...") return output except ( DuckDuckGoSearchException, TimeoutException, RatelimitException, ConversationLimitException, ) as e: retries += 1 self._retry_sleep +=2 print( f"⚠️ DuckDuckGo Exception (Attempt {retries}/{max_retries}): {type(e).__name__}: {e}" ) traceback.print_exc() time.sleep(retry_sleep) except Exception as e: print(f"❌ Unexpected Error: {e}") traceback.print_exc() return f"Unhandled exception during DuckDuckGo search: {e}" return f"❌ Failed to retrieve results after {max_retries} retries." huggingface_ef = embedding_functions.HuggingFaceEmbeddingFunction( model_name="sentence-transformers/all-mpnet-base-v2" ) SUPPORTED_EXTENSIONS = [ ".txt", ".md", ".py", ".pdf", ".json", ".jsonl", ".html", ".htm", ] class AddDocumentToVectorStoreTool(Tool): name = "add_document_to_vector_store" description = "Processes a document and adds it to the vector database for semantic search. Automatically chunks files and creates text embeddings to enable powerful content retrieval." inputs = { "file_path": { "type": "string", "description": "Absolute path to the file to be indexed.", } } output_type = "string" def _load_file(self, path: Path): """Select the right loader for the file extension.""" if path.suffix == ".pdf": return PyPDFLoader(str(path)).load() elif path.suffix == ".json": return JSONLoader(str(path), jq_schema=".").load() elif path.suffix in [".md"]: return UnstructuredFileLoader(str(path)).load() elif path.suffix in [".html", ".htm"]: return BSHTMLLoader(str(path)).load() else: # fallback for .txt, .py, etc. return TextLoader(str(path)).load() def forward(self, file_path: str) -> str: print(f"📄 Adding document to vector store: {file_path}") try: collection_name = "vectorstore" path = Path(file_path) if not path.exists() or path.suffix not in SUPPORTED_EXTENSIONS: return f"Unsupported or missing file: {file_path}" docs = self._load_file(path) text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50 ) split_docs = text_splitter.split_documents(docs) client = chromadb.Client( chromadb.config.Settings( persist_directory="./chroma_store", ) ) collection = client.get_or_create_collection( name=collection_name, configuration={"embedding_function": huggingface_ef}, ) texts = [doc.page_content for doc in split_docs] metadatas = [doc.metadata for doc in split_docs] collection.add( documents=texts, metadatas=metadatas, ids=[f"{path.stem}_{i}" for i in range(len(texts))], ) return f"✅ Successfully added {len(texts)} chunks from '{file_path}' to collection '{collection_name}'." except Exception as e: print(f"❌ Error in add_to_vector_store: {e}") traceback.print_exc() return f"Error: {e}" class QueryVectorStoreTool(Tool): name = "query_downloaded_documents" description = "Performs semantic searches across your downloaded documents. Use detailed queries to find specific information, concepts, or answers from your collected resources." inputs = { "query": { "type": "string", "description": "The search query. Ensure this is constructed intelligently so to retrieve the most relevant outputs.", } } output_type = "string" def forward(self, query: str) -> str: collection_name = "vectorstore" k = 5 print(f"🔎 Querying vector store '{collection_name}' with: '{query}'") try: client = chromadb.Client( chromadb.config.Settings( persist_directory="./chroma_store", ) ) collection = client.get_collection(name=collection_name) results = collection.query( query_texts=[query], n_results=k, ) formatted = [] for i in range(len(results["documents"][0])): doc = results["documents"][0][i] metadata = results["metadatas"][0][i] formatted.append( f"Result {i+1}:\n" f"Content: {doc}\n" f"Metadata: {metadata}\n" ) return "\n".join(formatted) or "No relevant documents found." except Exception as e: print(f"❌ Error in query_vector_store: {e}") traceback.print_exc() return f"Error querying vector store: {e}" @tool def image_question_answering(image_path: str, prompt: str) -> str: """ Analyzes images and answers specific questions about their content. Can identify objects, read text, describe scenes, or interpret visual information based on your questions. Args: image_path: The path to the image file prompt: The question to ask about the image Returns: A string answer generated by the local Ollama model """ # Check for supported file types file_extension = image_path.lower().split(".")[-1] if file_extension not in ["jpg", "jpeg", "png", "bmp", "gif", "webp"]: return "Unsupported file type. Please provide an image." path = Path(image_path) if not path.exists(): return f"File not found at: {image_path}" # Send the image and prompt to Ollama's local model response = chat( model="llava", # Assuming your model is named 'lava' messages=[ { "role": "user", "content": prompt, "images": [path], }, ], options={"temperature": 0.2}, # Slight randomness for naturalness ) return response.message.content.strip() class VisitWebpageTool(Tool): name = "visit_webpage" description = "Loads a webpage from a URL and converts its content to markdown format. Use this to browse websites, extract information, or identify downloadable resources from a specific web address." inputs = { "url": { "type": "string", "description": "The url of the webpage to visit.", } } output_type = "string" def forward(self, url: str) -> str: try: from urllib.parse import urlparse import requests from bs4 import BeautifulSoup from markdownify import markdownify from requests.exceptions import RequestException from smolagents.utils import truncate_content except ImportError as e: raise ImportError( "You must install packages `markdownify`, `requests`, and `beautifulsoup4` to run this tool: for instance run `pip install markdownify requests beautifulsoup4`." ) from e try: # Get the webpage content headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=20) response.raise_for_status() # Parse the HTML with BeautifulSoup soup = BeautifulSoup(response.text, "html.parser") # Extract domain name for context domain = urlparse(url).netloc # Remove common clutter elements self._remove_clutter(soup) # Try to identify and prioritize main content main_content = self._extract_main_content(soup) if main_content: # Convert the cleaned HTML to markdown markdown_content = markdownify(str(main_content)).strip() else: # Fallback to full page content if main content extraction fails markdown_content = markdownify(str(soup)).strip() # Post-process the markdown content markdown_content = self._clean_markdown(markdown_content) # Add source information result = f"Content from {domain}:\n\n{markdown_content}" return truncate_content(result, 40000) except requests.exceptions.Timeout: return "The request timed out. Please try again later or check the URL." except RequestException as e: return f"Error fetching the webpage: {str(e)}" except Exception as e: return f"An unexpected error occurred: {str(e)}" def _remove_clutter(self, soup): """Remove common elements that clutter web pages.""" # Common non-content elements to remove clutter_selectors = [ "header", "footer", "nav", ".nav", ".navigation", ".menu", ".sidebar", ".footer", ".header", "#footer", "#header", "#nav", "#sidebar", ".widget", ".cookie", ".cookies", ".ad", ".ads", ".advertisement", "script", "style", "noscript", "iframe", ".social", ".share", ".comment", ".comments", ".subscription", ".newsletter", '[role="banner"]', '[role="navigation"]', '[role="complementary"]', ] for selector in clutter_selectors: for element in soup.select(selector): element.decompose() # Remove hidden elements for hidden in soup.select( '[style*="display: none"], [style*="display:none"], [style*="visibility: hidden"], [style*="visibility:hidden"], [hidden]' ): hidden.decompose() def _extract_main_content(self, soup): """Try to identify and extract the main content of the page.""" # Priority order for common main content containers main_content_selectors = [ "main", '[role="main"]', "article", ".content", ".main-content", ".post-content", "#content", "#main", "#main-content", ".article", ".post", ".entry", ".page-content", ".entry-content", ] # Try to find the main content container for selector in main_content_selectors: main_content = soup.select(selector) if main_content: # If multiple matches, find the one with the most text content if len(main_content) > 1: return max(main_content, key=lambda x: len(x.get_text())) return main_content[0] # If no main content container found, look for the largest text block paragraphs = soup.find_all("p") if paragraphs: # Find the parent that contains the most paragraphs parents = {} for p in paragraphs: if p.parent: if p.parent not in parents: parents[p.parent] = 0 parents[p.parent] += 1 if parents: # Return the parent with the most paragraphs return max(parents.items(), key=lambda x: x[1])[0] # Return None if we can't identify main content return None def _clean_markdown(self, content): """Clean up the markdown content.""" # Normalize whitespace content = re.sub(r"\n{3,}", "\n\n", content) # Remove consecutive duplicate links content = re.sub(r"(\[.*?\]\(.*?\))\s*\1+", r"\1", content) # Remove very short lines that are likely menu items lines = content.split("\n") filtered_lines = [] # Skip consecutive short lines (likely menus) short_line_threshold = 40 # characters consecutive_short_lines = 0 max_consecutive_short_lines = 3 for line in lines: stripped_line = line.strip() if len( stripped_line ) < short_line_threshold and not stripped_line.startswith("#"): consecutive_short_lines += 1 if consecutive_short_lines > max_consecutive_short_lines: continue else: consecutive_short_lines = 0 filtered_lines.append(line) content = "\n".join(filtered_lines) # Remove duplicate headers seen_headers = set() lines = content.split("\n") filtered_lines = [] for line in lines: if line.startswith("#"): header_text = line.strip() if header_text in seen_headers: continue seen_headers.add(header_text) filtered_lines.append(line) content = "\n".join(filtered_lines) # Remove lines containing common footer patterns footer_patterns = [ r"^copyright", r"^©", r"^all rights reserved", r"^terms", r"^privacy policy", r"^contact us", r"^follow us", r"^social media", r"^disclaimer", ] footer_pattern = "|".join(footer_patterns) lines = content.split("\n") filtered_lines = [] for line in lines: if not re.search(footer_pattern, line.lower()): filtered_lines.append(line) content = "\n".join(filtered_lines) return content class ArxivSearchTool(Tool): name = "arxiv_search" description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, abstracts, and download links.""" inputs = { "query": { "type": "string", "description": "A research-related query (e.g., 'AI regulation')", }, "from_date": { "type": "string", "description": "Optional search start date in format (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", "nullable": True, }, "to_date": { "type": "string", "description": "Optional search end date in (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", "nullable": True, }, } output_type = "string" def forward( self, query: str, from_date: str = None, to_date: str = None, ) -> str: # 1) build URL url = build_arxiv_url(query, from_date, to_date, size=50) # 2) fetch & parse try: papers = fetch_and_parse_arxiv(url) except Exception as e: return f"❌ Failed to fetch or parse arXiv results: {e}" if not papers: return "No results found for your query." # 3) format into a single string output_lines = [] for idx, p in enumerate(papers, start=1): output_lines += [ f"🔍 RESULT {idx}", f"Title : {p['title']}", f"Authors : {p['authors']}", f"Published : {p['published']}", f"Summary : {p['abstract'][:500]}{'...' if len(p['abstract'])>500 else ''}", f"Entry ID : {p['entry_link']}", f"Download link: {p['download_link']}", "", ] return "\n".join(output_lines).strip() def fetch_and_parse_arxiv(url: str) -> List[Dict[str, str]]: """ Fetches the given arXiv advanced‐search URL, parses the HTML, and returns a list of results. Each result is a dict containing: - title - authors - published - abstract - entry_link - doi (or "[N/A]" if none) """ resp = requests.get(url) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") results = [] for li in soup.find_all("li", class_="arxiv-result"): # Title t = li.find("p", class_="title") title = t.get_text(strip=True) if t else "" # Authors a = li.find("p", class_="authors") authors = a.get_text(strip=True).replace("Authors:", "").strip() if a else "" # Abstract ab = li.find("span", class_="abstract-full") abstract = ( ab.get_text(strip=True).replace("Abstract:", "").strip() if ab else "" ) # Published date d = li.find("p", class_="is-size-7") published = d.get_text(strip=True) if d else "" # Entry link lt = li.find("p", class_="list-title") entry_link = lt.find("a")["href"] if lt and lt.find("a") else "" # DOI idblock = li.find("p", class_="list-identifier") if idblock: for a_tag in idblock.find_all("a", href=True): if "doi.org" in a_tag["href"]: doi = a_tag["href"] break results.append( { "title": title, "authors": authors, "published": published, "abstract": abstract, "entry_link": entry_link, "download_link": ( entry_link.replace("abs", "pdf") if "abs" in entry_link else "N/A" ), } ) return results def build_arxiv_url( query: str, from_date: str = None, to_date: str = None, size: int = 50 ) -> str: """ Build an arXiv advanced-search URL matching the exact segment order: 1) ?advanced 2) terms-0-operator=AND 3) terms-0-term=… 4) terms-0-field=all 5) classification-physics_archives=all 6) classification-include_cross_list=include [ optional date‐range block ] 7) abstracts=show 8) size=… 9) order=-announced_date_first If from_date or to_date is None, the date-range block is omitted. """ base = "https://arxiv.org/search/advanced?advanced=" parts = [ "&terms-0-operator=AND", f"&terms-0-term={quote_plus(query)}", "&terms-0-field=all", "&classification-physics_archives=all", "&classification-include_cross_list=include", ] # optional date-range filtering if from_date and to_date: parts += [ "&date-year=", "&date-filter_by=date_range", f"&date-from_date={from_date}", f"&date-to_date={to_date}", "&date-date_type=submitted_date", ] parts += [ "&abstracts=show", f"&size={size}", "&order=-announced_date_first", ] return base + "".join(parts)