Luigi D'Addona
il tool get_youtube_transcript() ora restituisce un dict con i campi "transcript" e "metadata"
ffe7776
import os, sys | |
from dotenv import load_dotenv | |
import requests | |
import pandas as pd | |
import base64 | |
from langchain_community.tools import DuckDuckGoSearchRun | |
from langchain_community.utilities import WikipediaAPIWrapper | |
from langchain_community.tools import WikipediaQueryRun | |
from langchain_community.document_loaders import WikipediaLoader | |
import wikipedia | |
from langchain_tavily import TavilySearch | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_community.document_loaders import YoutubeLoader | |
from langchain_core.tools import tool | |
from langchain.tools import Tool | |
from langchain_core.messages import HumanMessage | |
# per gestire esecuzione di codice python | |
import subprocess | |
DATASET_API_URL = 'https://agents-course-unit4-scoring.hf.space' | |
load_dotenv() | |
WIKIPEDIA_TOP_K_RESULTS = int(os.environ.get("WIKIPEDIA_TOP_K_RESULTS")) | |
WIKIPEDIA_DOC_CONTENT_CHARS_MAX = int(os.environ.get("WIKIPEDIA_DOC_CONTENT_CHARS_MAX")) | |
def get_search_tool(): | |
search_tool = DuckDuckGoSearchRun() | |
return search_tool | |
def get_tavily_search_tool(): | |
tavily_search_tool = TavilySearch( | |
max_results=3, | |
topic="general", | |
# include_answer=False, | |
# include_raw_content=False, | |
# include_images=False, | |
# include_image_descriptions=False, | |
# search_depth="basic", | |
# time_range="day", | |
# include_domains=None, | |
# exclude_domains=None | |
) | |
return tavily_search_tool | |
# Wikipedia tool 1: usa WikipediaQueryRun dal package 'langchain_community.tools' | |
# problema: sembra ottenere solo i summary | |
def get_wikipedia_tool(): | |
#print("WIKIPEDIA_TOP_K_RESULTS:{}, WIKIPEDIA_DOC_CONTENT_CHARS_MAX:{}".format(WIKIPEDIA_TOP_K_RESULTS, WIKIPEDIA_DOC_CONTENT_CHARS_MAX)) | |
# creates an instance of the Wikipedia API wrapper. top_k_results=1 means it will only fetch the top result from Wikipedia | |
wikipedia_api_wrapper = WikipediaAPIWrapper(top_k_results=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX) | |
# converts the WikipediaAPIWrapper into a LangChain tool. | |
wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper) | |
return wikipedia_tool | |
# Wikipedia tool 2: utilizza direttamente il package 'wikipedia' | |
def wikipedia_search(query: str) -> str: | |
""" | |
Search Wikipedia and return the full content of the most relevant article. | |
""" | |
try: | |
results = wikipedia.search(query) | |
if not results: | |
return f"No results found for '{query}'." | |
page = wikipedia.page(results[0]) | |
content = page.content | |
# Truncate content if it's too long | |
if len(content) > WIKIPEDIA_DOC_CONTENT_CHARS_MAX: | |
content = content[:WIKIPEDIA_DOC_CONTENT_CHARS_MAX] + "..." | |
return content | |
except wikipedia.exceptions.DisambiguationError as e: | |
return f"Ambiguous query. Possible options: {', '.join(e.options[:5])}..." | |
except wikipedia.exceptions.PageError: | |
return f"Page not found for '{query}'." | |
except Exception as e: | |
return f"Error occurred: {str(e)}" | |
# Wikipedia tool 3: utilizza WikipediaLoader dla package 'langchain_community.document_loaders' | |
def wikipedia_search_3(query: str) -> str: | |
""" | |
Search Wikipedia and return the full content of the most relevant articles. | |
Args: | |
query: The search query. | |
""" | |
search_docs = WikipediaLoader(query=query, | |
load_max_docs=WIKIPEDIA_TOP_K_RESULTS, | |
doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX, | |
load_all_available_meta=True).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"wiki_results": formatted_search_docs} | |
def execute_python_code_from_file(file_path: str) -> str: | |
""" | |
Reads a Python file from the given path, executes its code, and returns the combined stdout and stderr. | |
WARNING: Executing arbitrary code from files is a significant security risk. | |
Only use this tool with trusted code in a controlled environment. | |
""" | |
if not os.path.exists(file_path): | |
return f"Error: File not found at '{file_path}'." | |
if not file_path.endswith(".py"): | |
return f"Error: Provided file '{file_path}' is not a Python (.py) file." | |
try: | |
# Use subprocess to run the Python file in a new process. | |
# This provides some isolation compared to 'exec()' but is still dangerous for untrusted code. | |
result = subprocess.run( | |
[sys.executable, file_path], # sys.executable ensures it uses the current Python interpreter | |
capture_output=True, # Capture stdout and stderr | |
text=True, # Capture output as text (strings) | |
check=False # Do not raise an exception for non-zero exit codes (handle errors manually) | |
) | |
stdout_output = result.stdout.strip() | |
stderr_output = result.stderr.strip() | |
output_lines = [] | |
if stdout_output: | |
output_lines.append(f"STDOUT:\n{stdout_output}") | |
if stderr_output: | |
output_lines.append(f"STDERR:\n{stderr_output}") | |
if result.returncode != 0: | |
output_lines.append(f"Process exited with code {result.returncode}. This usually indicates an error.") | |
if not output_lines: | |
return "Execution completed with no output." | |
return "\n".join(output_lines) | |
except Exception as e: | |
return f"An unexpected error occurred during code execution: {e}" | |
def download_taskid_file(task_id: str, file_name: str) -> str: | |
""" | |
Downloads the file associated with the given task_id (if any). Returns the absolute path of the file | |
""" | |
try: | |
response = requests.get(f"{DATASET_API_URL}/files/{task_id}", timeout=20) | |
response.raise_for_status() | |
with open(file_name, 'wb') as file: | |
file.write(response.content) | |
return os.path.abspath(file_name) | |
except Exception as e: | |
return "Error occurred: {}".format(e) | |
def analyze_excel_file(file_path: str, query: str) -> str: | |
""" | |
Analyzes an Excel (.xlsx) file using pandas. | |
Loads the specified Excel file into a pandas DataFrame and executes a Python query against it. | |
The query should be a valid pandas DataFrame operation (e.g., df.head(), df.describe(), | |
df[df['column_name'] > 10], df.groupby('category')['value'].mean()). | |
Returns the result of the query as a string (JSON or string representation). | |
""" | |
if not os.path.exists(file_path): | |
return f"Error: File not found at {file_path}" | |
try: | |
df = pd.read_excel(file_path) | |
# Make the DataFrame accessible for the query | |
local_vars = {"df": df} | |
# Execute the query | |
# IMPORTANT: Be extremely cautious with eval/exec for user-provided input in a production system. | |
# For a ReAct agent, the LLM generates this query, so it's generally safer | |
# if the LLM is well-constrained and reliable. | |
# For sensitive applications, consider a safer parsing mechanism or a restricted set of operations. | |
result = eval(query, {}, local_vars) | |
return str(result) # Convert result to string for the LLM | |
except Exception as e: | |
return f"Error analyzing Excel file: {e}" | |
def get_analyze_mp3_tool(llm): | |
def analyze_mp3_file(audio_path: str) -> str: | |
""" | |
Extract text from an mp3 audio file. | |
""" | |
all_text = "" | |
try: | |
# Read audio and encode as base64 | |
with open(audio_path, "rb") as audio_file: | |
audio_bytes = audio_file.read() | |
audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") | |
# Determine the MIME type for MP3 | |
audio_mime_type = "audio/mpeg" # Or "audio/mp3", "audio/wav" etc. for other formats | |
# Prepare the prompt including the base64 image data | |
message = [ | |
HumanMessage( | |
content=[ | |
{ | |
"type": "text", | |
"text": ( | |
"Extract all the text from this audio. " | |
"Return only the extracted text, no explanations." | |
), | |
}, | |
{ | |
"type": "media", # <--- CORRECTED: Use 'media' type | |
"data": audio_base64, # <--- Use 'data' for the base64 content | |
"mime_type": audio_mime_type, # <--- Specify the MIME type | |
} | |
] | |
) | |
] | |
# Call the vision-capable model | |
response = llm.invoke(message) | |
# Append extracted text | |
all_text += response.content + "\n\n" | |
return all_text.strip() | |
except Exception as e: | |
print("Error extracting text from audio file:{} - {}".format(audio_path, e)) | |
return "" | |
return analyze_mp3_file | |
def get_analyze_image_tool(llm): | |
def analyze_png_image(image_path: str) -> str: | |
""" | |
Analyzes a PNG image and returns a detailed description of its content. | |
This tool requires an LLM capable of processing images, such as Gemini 1.5 Pro or Gemini 2.0 Flash. | |
""" | |
try: | |
# Read image and encode as base64 | |
with open(image_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
# Prepare the prompt including the base64 image data | |
message = [ | |
HumanMessage( | |
content=[ | |
{ | |
"type": "text", | |
"text": ( | |
"Provide a very detailed description of the content of this image. " | |
"Focus on objects, people, actions, text, and overall scene context. " | |
"Be as comprehensive as possible." | |
), | |
}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:image/png;base64,{image_base64}"}, | |
}, | |
] | |
) | |
] | |
# Call the vision-capable model | |
response = llm.invoke(message) | |
return response.content.strip() | |
except Exception as e: | |
print("Error analyzing image file:{} - {}".format(image_path, e)) | |
return "" | |
return analyze_png_image | |
def arxiv_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"arxiv_results": formatted_search_docs} | |
def get_youtube_transcript(url: str) -> dict: | |
"""Fetches the transcript from a YouTube video URL. | |
Args: | |
url: The URL of the YouTube video. | |
Returns: | |
A dictionary containing the transcript and metadata. | |
The dictionary will have keys "transcript" (string, the video transcript or an error message) and "metadata" (dictionary, containing video title and other information, if available, otherwise empty). | |
""" | |
try: | |
loader = YoutubeLoader.from_youtube_url(url, add_video_info=True) | |
docs = loader.load() | |
# Combine all transcript chunks into a single string | |
transcript = "\n".join(doc.page_content for doc in docs) | |
metadata = docs[0].metadata if docs else {} | |
return {"transcript": transcript, "metadata": metadata} | |
except Exception as e: | |
if "Could not retrieve transcript" in str(e): | |
return {"transcript": "No transcript available for this video.", "metadata": {}} | |
else: | |
return {"transcript": f"Error fetching transcript: {e}", "metadata": {}} |