Spaces:
Sleeping
Sleeping
from langchain_core.tools import tool | |
from langchain_community.document_loaders import WebBaseLoader, WikipediaLoader, ArxivLoader | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from langchain_community.tools import DuckDuckGoSearchResults | |
from langchain_community.document_loaders import YoutubeLoader | |
from langchain_community.tools import TavilySearchResults | |
import json | |
import sys | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
def add(values: list[int]) -> float: | |
""" | |
Add all numbers in a list together | |
Args: | |
values: A list of numbers to sum | |
Returns: | |
The sum of all numbers in the list | |
""" | |
logger.info(f"Adding numbers: {values}") | |
return sum(values) | |
def subtract(a: int, b: int) -> int: | |
""" | |
Subtract two numbers | |
Args: | |
a: The first number | |
b: The second number | |
Returns: | |
The difference between the two numbers | |
""" | |
logger.info(f"Subtracting {a} - {b}") | |
return a - b | |
def multiply(a: int, b: int) -> int: | |
""" | |
Multiply two numbers | |
Args: | |
a: The first number | |
b: The second number | |
Returns: | |
The product of the two numbers | |
""" | |
logger.info(f"Multiplying {a} * {b}") | |
return a * b | |
def divide(a: int, b: int) -> float: | |
""" | |
Divide two numbers | |
Args: | |
a: The first number | |
b: The second number | |
Returns: | |
The quotient of the two numbers | |
""" | |
logger.info(f"Dividing {a} / {b}") | |
return a / b | |
def modulo(a: int, b: int) -> int: | |
""" | |
Calculate the modulo of two numbers | |
Args: | |
a: The first number | |
b: The second number | |
Returns: | |
The remainder of the division of the two numbers | |
""" | |
logger.info(f"Calculating modulo of {a} % {b}") | |
return a % b | |
def wikipedia_search(query: str) -> str: | |
""" | |
Search Wikipedia for information | |
Args: | |
query: The query to search for | |
Returns: | |
The search results | |
""" | |
logger.info(f"Searching Wikipedia for: {query}") | |
docs_found = WikipediaLoader(query=query, load_max_docs=3).load() | |
# format the docs found into a string keeping just first paragraph | |
formatted_results = [] | |
for i, doc in enumerate(docs_found, 1): | |
source = doc.metadata.get('source', 'Unknown source') | |
title = doc.metadata.get('title', 'Untitled') | |
# Get the first paragraph (split by \n\n and take first part) | |
content = doc.page_content.strip() | |
#first_paragraph = content.split('\n\n')[0] if content else "No content available" | |
first_paragraph = content if content else "No content available" | |
formatted_doc = f"""--- DOCUMENT {i} START --- | |
Source: {source} | |
Title: {title} | |
Content: {first_paragraph} | |
--- DOCUMENT {i} END ---""" | |
formatted_results.append(formatted_doc) | |
return "\n\n".join(formatted_results) | |
def arxiv_search(query: str) -> str: | |
""" | |
Search ArXiv for research papers | |
Args: | |
query: The query to search for | |
Returns: | |
The search results with abstracts | |
""" | |
logger.info(f"Searching ArXiv for: {query}") | |
docs_found = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_results = [] | |
for i, doc in enumerate(docs_found, 1): | |
source = doc.metadata.get('source', 'Unknown source') | |
title = doc.metadata.get('title', 'Untitled') | |
# For ArXiv, the abstract is typically in the page_content or metadata | |
abstract = doc.page_content.strip() if doc.page_content else "No abstract available" | |
formatted_doc = f"""--- DOCUMENT {i} START --- | |
Source: {source} | |
Title: {title} | |
Abstract: {abstract} | |
--- DOCUMENT {i} END ---""" | |
formatted_results.append(formatted_doc) | |
return "\n\n".join(formatted_results) | |
def web_search(query: str) -> str: | |
""" | |
Search the web for information | |
Args: | |
query: The query to search for (should be a list of URLs or single URL) | |
Returns: | |
The search results | |
""" | |
logger.info(f"Searching the web for: {query}") | |
# Note: WebBaseLoader requires URLs, so this assumes query contains URLs | |
# For a more general web search, you'd need a different approach like SerpAPI | |
try: | |
if isinstance(query, str): | |
urls = [query] if query.startswith('http') else [] | |
else: | |
urls = query | |
if not urls: | |
return "No valid URLs provided for web search." | |
# Limit to 2 URLs maximum | |
urls = urls[:2] | |
docs_found = WebBaseLoader(urls).load() | |
formatted_results = [] | |
for i, doc in enumerate(docs_found, 1): | |
source = doc.metadata.get('source', 'Unknown source') | |
title = doc.metadata.get('title', 'Untitled') | |
# Get first 1000 characters of content | |
content = doc.page_content.strip() | |
first_1000_chars = content if content else "No content available" | |
# if len(content) > 1000: | |
# first_1000_chars += "..." | |
formatted_doc = f"""--- DOCUMENT {i} START --- | |
Source: {source} | |
Title: {title} | |
Content: {first_1000_chars} | |
--- DOCUMENT {i} END ---""" | |
formatted_results.append(formatted_doc) | |
return "\n\n".join(formatted_results) | |
except Exception as e: | |
return f"Error during web search: {str(e)}" | |
def youtube_transcript(url: str) -> str: | |
""" | |
Get transcript of YouTube video. | |
Args: | |
url: YouTube video url in "" | |
""" | |
logger.info(f"Getting transcript of YouTube video: {url}") | |
video_id = url.partition("https://www.youtube.com/watch?v=")[2] | |
transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
transcript_text = " ".join([item["text"] for item in transcript]) | |
return {"youtube_transcript": transcript_text} | |
def python_interpreter(code: str) -> str: | |
""" | |
Execute Python code and return the result. | |
Args: | |
code: Python code to execute | |
Returns: | |
The output of the executed code or error message | |
""" | |
try: | |
# Create a StringIO object to capture stdout | |
import sys | |
from io import StringIO | |
# Capture stdout | |
old_stdout = sys.stdout | |
sys.stdout = captured_output = StringIO() | |
# Create a local namespace for execution | |
local_namespace = { | |
'__builtins__': __builtins__, | |
'print': print, | |
'len': len, | |
'str': str, | |
'int': int, | |
'float': float, | |
'list': list, | |
'dict': dict, | |
'tuple': tuple, | |
'set': set, | |
'range': range, | |
'enumerate': enumerate, | |
'zip': zip, | |
'map': map, | |
'filter': filter, | |
'sum': sum, | |
'max': max, | |
'min': min, | |
'abs': abs, | |
'round': round, | |
'sorted': sorted, | |
'reversed': reversed, | |
'any': any, | |
'all': all, | |
'isinstance': isinstance, | |
'type': type, | |
'hasattr': hasattr, | |
'getattr': getattr, | |
'setattr': setattr, | |
'dir': dir, | |
'help': help, | |
} | |
# Allow common safe imports | |
allowed_modules = { | |
'math', 'random', 'datetime', 'json', 'collections', | |
'itertools', 'functools', 're', 'statistics' | |
} | |
# Parse and execute the code | |
import ast | |
# Check for dangerous operations | |
dangerous_keywords = ['import os', 'import sys', 'import subprocess', 'exec', 'eval', '__import__', 'open(', 'file('] | |
for keyword in dangerous_keywords: | |
if keyword in code.lower(): | |
if keyword.startswith('import') and any(module in code for module in allowed_modules): | |
continue | |
else: | |
return f"Error: Potentially dangerous operation detected: {keyword}" | |
# Execute the code | |
exec(code, {"__builtins__": {}}, local_namespace) | |
# Get the captured output | |
output = captured_output.getvalue() | |
# Restore stdout | |
sys.stdout = old_stdout | |
return output if output else "Code executed successfully (no output)" | |
except Exception as e: | |
# Restore stdout in case of error | |
sys.stdout = old_stdout | |
return f"Error executing code: {str(e)}" | |
finally: | |
# Ensure stdout is always restored | |
sys.stdout = old_stdout | |
def web_search_duckduckgo(query: str) -> dict: | |
"""Search DuckDuckGo for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = DuckDuckGoSearchResults(output_format="list", max_results=3).invoke(input=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document>\n{doc.get("content", "")}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"web_results": formatted_search_docs} | |
def youtube_loader(youtube_url: str) -> dict: | |
"""Elaborate a YouTube video to transcript the content, return the transcript. | |
Args: | |
youtube_url: The YouTube video URL.""" | |
loader = YoutubeLoader.from_youtube_url( | |
youtube_url, | |
add_video_info=True | |
) | |
return {"youtube_transcript": loader.load()} | |
def web_search_tavily(query: str) -> dict: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearchResults(max_results=3).invoke({'query': query}) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document>\n{doc.get("content", "")}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"web_results": formatted_search_docs} |