import os, sys
from dotenv import load_dotenv
import requests
import pandas as pd
import base64
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.tools import WikipediaQueryRun
from langchain_community.document_loaders import WikipediaLoader
import wikipedia
from langchain_tavily import TavilySearch
from langchain_community.document_loaders import ArxivLoader
from langchain_community.document_loaders import YoutubeLoader
from langchain_core.tools import tool
from langchain.tools import Tool
from langchain_core.messages import HumanMessage
# per gestire esecuzione di codice python
import subprocess
DATASET_API_URL = 'https://agents-course-unit4-scoring.hf.space'
load_dotenv()
WIKIPEDIA_TOP_K_RESULTS = int(os.environ.get("WIKIPEDIA_TOP_K_RESULTS"))
WIKIPEDIA_DOC_CONTENT_CHARS_MAX = int(os.environ.get("WIKIPEDIA_DOC_CONTENT_CHARS_MAX"))
def get_search_tool():
search_tool = DuckDuckGoSearchRun()
return search_tool
def get_tavily_search_tool():
tavily_search_tool = TavilySearch(
max_results=3,
topic="general",
# include_answer=False,
# include_raw_content=False,
# include_images=False,
# include_image_descriptions=False,
# search_depth="basic",
# time_range="day",
# include_domains=None,
# exclude_domains=None
)
return tavily_search_tool
# Wikipedia tool 1: usa WikipediaQueryRun dal package 'langchain_community.tools'
# problema: sembra ottenere solo i summary
def get_wikipedia_tool():
#print("WIKIPEDIA_TOP_K_RESULTS:{}, WIKIPEDIA_DOC_CONTENT_CHARS_MAX:{}".format(WIKIPEDIA_TOP_K_RESULTS, WIKIPEDIA_DOC_CONTENT_CHARS_MAX))
# creates an instance of the Wikipedia API wrapper. top_k_results=1 means it will only fetch the top result from Wikipedia
wikipedia_api_wrapper = WikipediaAPIWrapper(top_k_results=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX)
# converts the WikipediaAPIWrapper into a LangChain tool.
wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper)
return wikipedia_tool
# Wikipedia tool 2: utilizza direttamente il package 'wikipedia'
@tool
def wikipedia_search(query: str) -> str:
"""
Search Wikipedia and return the full content of the most relevant article.
"""
try:
results = wikipedia.search(query)
if not results:
return f"No results found for '{query}'."
page = wikipedia.page(results[0])
content = page.content
# Truncate content if it's too long
if len(content) > WIKIPEDIA_DOC_CONTENT_CHARS_MAX:
content = content[:WIKIPEDIA_DOC_CONTENT_CHARS_MAX] + "..."
return content
except wikipedia.exceptions.DisambiguationError as e:
return f"Ambiguous query. Possible options: {', '.join(e.options[:5])}..."
except wikipedia.exceptions.PageError:
return f"Page not found for '{query}'."
except Exception as e:
return f"Error occurred: {str(e)}"
# Wikipedia tool 3: utilizza WikipediaLoader dla package 'langchain_community.document_loaders'
@tool
def wikipedia_search_3(query: str) -> str:
"""
Search Wikipedia and return the full content of the most relevant articles.
Args:
query: The search query.
"""
search_docs = WikipediaLoader(query=query,
load_max_docs=WIKIPEDIA_TOP_K_RESULTS,
doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX,
load_all_available_meta=True).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'\n{doc.page_content}\n'
for doc in search_docs
])
return {"wiki_results": formatted_search_docs}
@tool
def execute_python_code_from_file(file_path: str) -> str:
"""
Reads a Python file from the given path, executes its code, and returns the combined stdout and stderr.
WARNING: Executing arbitrary code from files is a significant security risk.
Only use this tool with trusted code in a controlled environment.
"""
if not os.path.exists(file_path):
return f"Error: File not found at '{file_path}'."
if not file_path.endswith(".py"):
return f"Error: Provided file '{file_path}' is not a Python (.py) file."
try:
# Use subprocess to run the Python file in a new process.
# This provides some isolation compared to 'exec()' but is still dangerous for untrusted code.
result = subprocess.run(
[sys.executable, file_path], # sys.executable ensures it uses the current Python interpreter
capture_output=True, # Capture stdout and stderr
text=True, # Capture output as text (strings)
check=False # Do not raise an exception for non-zero exit codes (handle errors manually)
)
stdout_output = result.stdout.strip()
stderr_output = result.stderr.strip()
output_lines = []
if stdout_output:
output_lines.append(f"STDOUT:\n{stdout_output}")
if stderr_output:
output_lines.append(f"STDERR:\n{stderr_output}")
if result.returncode != 0:
output_lines.append(f"Process exited with code {result.returncode}. This usually indicates an error.")
if not output_lines:
return "Execution completed with no output."
return "\n".join(output_lines)
except Exception as e:
return f"An unexpected error occurred during code execution: {e}"
@tool
def download_taskid_file(task_id: str, file_name: str) -> str:
"""
Downloads the file associated with the given task_id (if any). Returns the absolute path of the file
"""
try:
response = requests.get(f"{DATASET_API_URL}/files/{task_id}", timeout=20)
response.raise_for_status()
with open(file_name, 'wb') as file:
file.write(response.content)
return os.path.abspath(file_name)
except Exception as e:
return "Error occurred: {}".format(e)
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyzes an Excel (.xlsx) file using pandas.
Loads the specified Excel file into a pandas DataFrame and executes a Python query against it.
The query should be a valid pandas DataFrame operation (e.g., df.head(), df.describe(),
df[df['column_name'] > 10], df.groupby('category')['value'].mean()).
Returns the result of the query as a string (JSON or string representation).
"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
try:
df = pd.read_excel(file_path)
# Make the DataFrame accessible for the query
local_vars = {"df": df}
# Execute the query
# IMPORTANT: Be extremely cautious with eval/exec for user-provided input in a production system.
# For a ReAct agent, the LLM generates this query, so it's generally safer
# if the LLM is well-constrained and reliable.
# For sensitive applications, consider a safer parsing mechanism or a restricted set of operations.
result = eval(query, {}, local_vars)
return str(result) # Convert result to string for the LLM
except Exception as e:
return f"Error analyzing Excel file: {e}"
def get_analyze_mp3_tool(llm):
@tool
def analyze_mp3_file(audio_path: str) -> str:
"""
Extract text from an mp3 audio file.
"""
all_text = ""
try:
# Read audio and encode as base64
with open(audio_path, "rb") as audio_file:
audio_bytes = audio_file.read()
audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
# Determine the MIME type for MP3
audio_mime_type = "audio/mpeg" # Or "audio/mp3", "audio/wav" etc. for other formats
# Prepare the prompt including the base64 image data
message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Extract all the text from this audio. "
"Return only the extracted text, no explanations."
),
},
{
"type": "media", # <--- CORRECTED: Use 'media' type
"data": audio_base64, # <--- Use 'data' for the base64 content
"mime_type": audio_mime_type, # <--- Specify the MIME type
}
]
)
]
# Call the vision-capable model
response = llm.invoke(message)
# Append extracted text
all_text += response.content + "\n\n"
return all_text.strip()
except Exception as e:
print("Error extracting text from audio file:{} - {}".format(audio_path, e))
return ""
return analyze_mp3_file
def get_analyze_image_tool(llm):
@tool
def analyze_png_image(image_path: str) -> str:
"""
Analyzes a PNG image and returns a detailed description of its content.
This tool requires an LLM capable of processing images, such as Gemini 1.5 Pro or Gemini 2.0 Flash.
"""
try:
# Read image and encode as base64
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# Prepare the prompt including the base64 image data
message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Provide a very detailed description of the content of this image. "
"Focus on objects, people, actions, text, and overall scene context. "
"Be as comprehensive as possible."
),
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_base64}"},
},
]
)
]
# Call the vision-capable model
response = llm.invoke(message)
return response.content.strip()
except Exception as e:
print("Error analyzing image file:{} - {}".format(image_path, e))
return ""
return analyze_png_image
@tool
def arxiv_search(query: str) -> str:
"""Search Arxiv for a query and return maximum 3 result.
Args:
query: The search query."""
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'\n{doc.page_content[:1000]}\n'
for doc in search_docs
]
)
return {"arxiv_results": formatted_search_docs}
@tool
def get_youtube_transcript(url: str) -> dict:
"""Fetches the transcript from a YouTube video URL.
Args:
url: The URL of the YouTube video.
Returns:
A dictionary containing the transcript and metadata.
The dictionary will have keys "transcript" (string, the video transcript or an error message) and "metadata" (dictionary, containing video title and other information, if available, otherwise empty).
"""
try:
loader = YoutubeLoader.from_youtube_url(url, add_video_info=True)
docs = loader.load()
# Combine all transcript chunks into a single string
transcript = "\n".join(doc.page_content for doc in docs)
metadata = docs[0].metadata if docs else {}
return {"transcript": transcript, "metadata": metadata}
except Exception as e:
if "Could not retrieve transcript" in str(e):
return {"transcript": "No transcript available for this video.", "metadata": {}}
else:
return {"transcript": f"Error fetching transcript: {e}", "metadata": {}}