Agent_Gaia / langgraph_dir /custom_tools.py
AScythe's picture
Upload 9 files
b7b57bf verified
from langchain_core.tools import tool, Tool
import math
@tool
def calculator_tool(expression: str) -> str:
"""
Evaluate a mathematical expression.
"""
# Define the restricted global and local namespace
safe_globals = {"__builtins__": {}}
safe_locals = {
# Math functions
'sqrt': math.sqrt,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'log': math.log10, # log base 10
'ln': math.log, # natural log
'exp': math.exp,
'pow': pow,
# Constants
'pi': math.pi,
'e': math.e,
# Built-in math utilities
'abs': abs,
'round': round,
'max': max,
'min': min,
'sum': sum,
}
try:
# Evaluate the expression in a restricted environment
result = eval(expression, safe_globals, safe_locals)
# Handle None explicitly
if result is None:
return "calculator tool produced no valid result"
# Optional: Round very small floats to avoid scientific notation
if isinstance(result, float) and abs(result) < 1e-9:
result = round(result, 10)
return str(result)
except SyntaxError as se:
return f"Syntax error in expression: {str(se)}"
except NameError as ne:
return f"Undefined variable or function used: {str(ne)}"
except ZeroDivisionError:
return "Error: Division by zero"
except Exception as e:
return f"Evaluation error: {str(e)}"
from langchain_tavily import TavilySearch
@tool
def web_search(query: str) -> str:
"""
Searches the web and returns a list of the most relevant URLs.
Use this FIRST for complex queries, metadata questions, or to find the right sources.
Then follow up with get_webdoc_content or get_website_content on the most promising URL.
"""
try:
tavily_search = TavilySearch(
max_results=5,
topic="general",
search_depth="advanced",
include_raw_content=False, # Just URLs and snippets
)
results = tavily_search.invoke(query)
# Format results to show URLs and brief descriptions
web_search_results = "Search Results:\n"
for i, result in enumerate(results["results"], 1):
web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n"
return web_search_results
except Exception as e:
return f"web_search tool error: {str(e)}"
import os
import tempfile
import requests
import easyocr
from io import BytesIO
from PIL import Image
from openai import OpenAI
@tool
def query_image(query: str, source: str, need_ocr: bool = True, need_reasoning: bool = False) -> str:
"""Use ONLY to answer question about an image using a Vision Language Model.
NOT used to perform image processing or other tasks EXCEPT asking question about an image.
Args:
query (str): The question about the image, e.g. how many persons are on the image?
source (str): URL to the image
need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise.
need_ocr (bool): If True, also extract visible text from the image. Set to False otherwise.
"""
try:
# OCR Extraction (optional)
ocr_text = ""
if need_ocr:
try:
# Download image from URL
response = requests.get(source, stream=True, timeout=10)
response.raise_for_status()
# Load image into PIL
image = Image.open(BytesIO(response.content))
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmpfile:
image.save(tmpfile, format=image.format)
file_to_use = tmpfile.name
# Perform OCR
reader = easyocr.Reader(['en'])
results = reader.readtext(file_to_use)
ocr_text = "\n".join([res[1] for res in results])
ocr_text = f"\n\n[OCR Extracted Text]:\n{ocr_text}"
except Exception as ocr_error:
ocr_text = f"\n\n[OCR Error]: {str(ocr_error)}"
finally:
# Clean up temporary file
if file_to_use and os.path.exists(file_to_use):
os.unlink(file_to_use)
# Query Vision Language Model
client = OpenAI()
if need_reasoning:
model_name = "o4-mini"
else:
model_name = "gpt-4o-mini"
response = client.chat.completions.create(
model=model_name,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": query},
{"type": "image_url", "image_url": {"url": source}},
],
}
],
max_tokens=512,
)
content = response.choices[0].message.content
# Combine OCR and VLM output
final_response = content
if need_ocr and ocr_text:
final_response += ocr_text
return final_response
except Exception as e:
return f"Image query failed: {str(e)}"
from pydantic import BaseModel, Field
from e2b import Sandbox
import re
import os
class PythonCodeInput(BaseModel):
code: str = Field(description="The Python code string to execute.")
@tool(args_schema=PythonCodeInput)
def python_repl(code: str) -> str:
"""
Use this to execute single or multi-line Python commands to perform tasks like:
sort a list in ascending or descending order, reverse input string, draw a table, photo processing, etc.
Input should be syntactically valid Python code.
Make sure to include required imports in the code.
Always include in your code `print(...)` or `image.save(...)` to return outputs that can be seen.
You are allowed to access internet and download files from URLs via code (e.g., using requests)
Avoid using any system-level commands or libraries that could harm the host system.
Avoid commands that require user input or block indefinitely (e.g., `input()`).
"""
# List of forbidden patterns in code
FORBIDDEN_PATTERNS = [
r'\bimport\s+(os|sys|subprocess|shutil|socket)',
r'\b(eval|exec|input|open)\s*$(?=.*\w)',
r'\b__import__',
r'\bos\.',
r'\bsys\.',
r'\bsubprocess\.',
]
# Step 1: Keyword-based security check
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, code):
match = re.search(pattern, code).group()
return f"Blocked unsafe operation: {match}"
# Step 2: Create E2B sandbox
try:
with Sandbox(api_key=os.getenv("E2B_API_KEY")) as sandbox:
# Known mismatches: import name -> pip package name
import_to_pip = {
"PIL": "pillow",
"cv2": "opencv-python",
"yaml": "PyYAML",
"bs4": "beautifulsoup4",
"tkinter": "tk",
}
# Built-in modules that don't need installation
built_in_modules = {
"math", "re", "json", "csv", "os", "sys", "time", "datetime", "random",
"itertools", "functools", "__future__", "collections", "pathlib", "io",
}
# Step 1: Extract import statements
import_matches = re.findall(
r'(?:import\s+([a-zA-Z0-9_]+)(?!\.)|\bfrom\s+([a-zA-Z0-9_]+)(?=\s+import\b))',
code
)
base_imports = set()
base_imports = set(match[0] or match[1] for match in import_matches) # match[0] = 'import X', match[1] = 'from X import Y'
# Step 2: Determine which packages to install
packages_to_install = set()
for imp in base_imports:
# Skip known built-ins
if imp in built_in_modules:
continue
# Use mapped name if exists, else use import name
package_name = import_to_pip.get(imp, imp)
# Avoid installing system-specific modules like __pycache__
if imp.startswith("__"):
continue
packages_to_install.add(package_name)
# Step 3: Install necessary packages
if packages_to_install:
install_cmd = f"pip install {' '.join(packages_to_install)}"
result = sandbox.commands.run(install_cmd)
if result.stderr:
return f"Failed to install packages:\n{result.stderr}"
# Step 4: Write and run the user code
CODE_FILE_PATH = "/tmp/code.py"
sandbox.files.write(CODE_FILE_PATH, code)
# Step 5: Execute the code using the new API
result = sandbox.commands.run(f"python {CODE_FILE_PATH}")
stdout = result.stdout.strip()
stderr = result.stderr.strip()
# Step 6: Return output
if stderr:
return f"Execution error:\n{stderr}"
return stdout or "No output"
except Exception as e:
return f"Sandbox error: {str(e)}"
import requests
from bs4 import BeautifulSoup
from PyPDF2 import PdfReader
from io import BytesIO
from markdownify import markdownify
@tool
def get_webdoc_content(url: str) -> str:
"""
Extracts content from PDFs or document-like URLs (academic papers, reports)
Can be used after web_search to get detailed information.
Args:
url (str): the URL of web page to extract the content from
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
# PDF Handling
if 'application/pdf' in content_type:
pdf_file = BytesIO(response.content)
reader = PdfReader(pdf_file)
text = "\n".join(page.extract_text() for page in reader.pages)
# return f"## PDF Content from {page_url}\n\n{text[:15000]}"
return f"## PDF Content from {url}\n\n```\n{text[:15000]}\n```"
# HTML Document Handling
elif 'text/html' in content_type:
soup = BeautifulSoup(response.text, 'html.parser')
cleaned_html = soup.body or soup # Fallback to full document
return markdownify(str(cleaned_html), strip=['a'])
# Fallback: Raw text extraction
else:
return f"## Raw Content from {url}\n\n{response.text[:15000]}"
except requests.exceptions.RequestException as e:
return f"HTTP error in get_webpage_content: {str(e)}"
except Exception as e:
return f"Unexpected error in get_webpage_content: {str(e)}"
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify
@tool
def get_website_content(url: str) -> str:
"""
Extracts contents from HTML-based URLs.
Specializes in Wikipedia, technical documentation, and discussion pages.
NOT used for document-based URLs (academic papers, reports).
Used after web_search to get detailed information.
Args:
url (str): The URL of the web page to extract content from
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove non-content elements
for element in soup.select('script, style, footer, nav, header, aside'):
element.decompose()
# Convert cleaned HTML to markdown
cleaned_html = str(soup.body) if soup.body else str(soup)
markdown_content = markdownify(cleaned_html, strip=['a']) # Optional: strip links
return f"## Extracted Content from {url}\n\n{markdown_content[:15000]}" # Limit length
except requests.exceptions.RequestException as e:
return f"HTTP error in web_content_extract: {str(e)}"
except Exception as e:
return f"Unexpected error in web_content_extract: {str(e)}"
import os
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
@tool
def extract_answer_from_content(content: str | dict, query: str) -> str:
"""
Extract relevant information from content based on user query.
Args:
content (str/dict): Raw text or transcribed test from audio or structured content from any source
query (str): Natural language question to answer
Returns:
str: Concise answer extracted from content
"""
try:
# Normalize content format
if isinstance(content, dict):
text_content = ""
if "summary" in content:
text_content += f"SUMMARY: {content['summary']}\n\n"
if "infobox" in content:
text_content += "INFOBOX:\n"
for k, v in content["infobox"].items():
text_content += f"{k}: {v}\n"
text_content += "\n"
if "sections" in content:
for section, text in content["sections"].items():
text_content += f"{section}:\n{text}\n\n"
else:
text_content = content
# Initialize OpenAI embeddings
embeddings = OpenAIEmbeddings(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model="text-embedding-3-large"
)
# Split content into manageable chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
chunks = text_splitter.split_text(text_content)
# Create vector store
vectorstore = FAISS.from_texts(chunks, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# Get most relevant content
relevant_docs = retriever.invoke(query)
combined_text = " ".join([doc.page_content for doc in relevant_docs])
# Return relevant content with context
return f"Relevant information found:\n{combined_text[:1500]}"
except Exception as e:
return f"Content extraction failed: {str(e)}"
import os
import requests
from openai import OpenAI
@tool
def transcribe_audio(source: str, file_extension: str) -> str:
"""
Transcribes an audio to text from local path or URL.
Args:
source (str): URL to an audio file.
Returns:
str: The transcribed text, or error message.
"""
# If file is not existing use download_file_from_url tool to download the file first.
client = OpenAI()
try:
# download the audio file
response = requests.get(source)
response.raise_for_status()
# write to disk
file_extension = file_extension.replace('.','')
with open(f'tmp.{file_extension}', 'wb') as file:
file.write(response.content)
audio_file = open(f'tmp.{file_extension}', "rb")
client = OpenAI()
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
return transcription.text
except Exception as e:
return f"Transcription error: {str(e)}"
from youtube_transcript_api import YouTubeTranscriptApi
from pytube import extract
@tool
def get_youtube_transcript(page_url: str) -> str:
"""Get the transcript of audio component of YouTube video.
Use this for Youtube videos with available transcripts
Args:
page_url (str): YouTube URL of the video
"""
try:
# Get video ID from URL
video_id = extract.video_id(page_url)
# Get transcript using correct method
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# Return concatenated text
return '\n'.join([s['text'] for s in transcript])
except Exception as e:
return f"get_youtube_transcript failed: {str(e)}"
from tabulate import tabulate
from typing import Dict, Any, List
@tool
def generate_table_from_data(data: List[Dict[str, Any]]) -> str:
"""
Convert list of dictionaries to markdown table
Args:
data (List[Dict]): List of objects with common keys
Returns:
str: Markdown-formatted table
"""
if not data:
return "No data available"
headers = data[0].keys()
rows = [list(item.values()) for item in data]
return tabulate(rows, headers=headers, tablefmt="pipe")
from pydantic import BaseModel, Field
from typing import List, Dict
class CommutativeCheckInput(BaseModel):
table_str: str = Field(..., description="Markdown-formatted string of the operation table (e.g., |*|a|b|c|...)")
elements: List[str] = Field(..., description="List of elements in the set S")
@tool(args_schema=CommutativeCheckInput)
def check_commutative(table_str: str, elements: List[str]) -> str:
"""
Analyzes a binary operation table for commutativity.
Args:
table_str (str): Markdown-formatted string of the operation table.
elements (List[str]): List of elements in the set S.
Returns:
str: Comma-separated list of element pairs (e.g., "b,e") where x*y β‰  y*x.
"""
# Parse the table string into a 2D list
lines = [line.strip() for line in table_str.strip().split('\n') if line.strip()]
header = [cell.strip() for cell in lines[0].split('|') if cell.strip()][1:] # Skip the first cell (operator)
rows = []
for line in lines[2:]:
cells = [cell.strip() for cell in line.split('|') if cell.strip()] # Remove empty cells
if cells:
rows.append(cells)
# Validate that all rows have the correct number of cells
expected_length = len(header) + 1 # x + one for each header
for row in rows:
if len(row) < expected_length:
return f"Error: Row '{row[0]}' has {len(row)} cells, but expected {expected_length}."
# Build a dictionary for the operation: op[x][y] = result
operation: Dict[str, Dict[str, str]] = {}
for row in rows:
x = row[0]
operation[x] = {}
for i, y in enumerate(header):
operation[x][y] = row[i + 1]
# Check all pairs (x, y) for x*y == y*x
counterexamples = []
for x in elements:
for y in elements:
if x < y: # Avoid redundant checks and self-comparison
try:
xy = operation[x][y]
yx = operation[y][x]
if xy != yx:
counterexamples.append(f"{x},{y}")
except KeyError as e:
return f"Error: Missing data for pair ({x}, {y}) in table."
return "\n".join(counterexamples) if counterexamples else "The operation is commutative."