import os, sys from dotenv import load_dotenv import requests import pandas as pd import base64 from langchain_community.tools import DuckDuckGoSearchRun from langchain_community.utilities import WikipediaAPIWrapper from langchain_community.tools import WikipediaQueryRun from langchain_community.document_loaders import WikipediaLoader import wikipedia from langchain_tavily import TavilySearch from langchain_community.document_loaders import ArxivLoader from langchain_community.document_loaders import YoutubeLoader from langchain_core.tools import tool from langchain.tools import Tool from langchain_core.messages import HumanMessage # per gestire esecuzione di codice python import subprocess DATASET_API_URL = 'https://agents-course-unit4-scoring.hf.space' load_dotenv() WIKIPEDIA_TOP_K_RESULTS = int(os.environ.get("WIKIPEDIA_TOP_K_RESULTS")) WIKIPEDIA_DOC_CONTENT_CHARS_MAX = int(os.environ.get("WIKIPEDIA_DOC_CONTENT_CHARS_MAX")) def get_search_tool(): search_tool = DuckDuckGoSearchRun() return search_tool def get_tavily_search_tool(): tavily_search_tool = TavilySearch( max_results=3, topic="general", # include_answer=False, # include_raw_content=False, # include_images=False, # include_image_descriptions=False, # search_depth="basic", # time_range="day", # include_domains=None, # exclude_domains=None ) return tavily_search_tool # Wikipedia tool 1: usa WikipediaQueryRun dal package 'langchain_community.tools' # problema: sembra ottenere solo i summary def get_wikipedia_tool(): #print("WIKIPEDIA_TOP_K_RESULTS:{}, WIKIPEDIA_DOC_CONTENT_CHARS_MAX:{}".format(WIKIPEDIA_TOP_K_RESULTS, WIKIPEDIA_DOC_CONTENT_CHARS_MAX)) # creates an instance of the Wikipedia API wrapper. top_k_results=1 means it will only fetch the top result from Wikipedia wikipedia_api_wrapper = WikipediaAPIWrapper(top_k_results=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX) # converts the WikipediaAPIWrapper into a LangChain tool. wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper) return wikipedia_tool # Wikipedia tool 2: utilizza direttamente il package 'wikipedia' @tool def wikipedia_search(query: str) -> str: """ Search Wikipedia and return the full content of the most relevant article. """ try: results = wikipedia.search(query) if not results: return f"No results found for '{query}'." page = wikipedia.page(results[0]) content = page.content # Truncate content if it's too long if len(content) > WIKIPEDIA_DOC_CONTENT_CHARS_MAX: content = content[:WIKIPEDIA_DOC_CONTENT_CHARS_MAX] + "..." return content except wikipedia.exceptions.DisambiguationError as e: return f"Ambiguous query. Possible options: {', '.join(e.options[:5])}..." except wikipedia.exceptions.PageError: return f"Page not found for '{query}'." except Exception as e: return f"Error occurred: {str(e)}" # Wikipedia tool 3: utilizza WikipediaLoader dla package 'langchain_community.document_loaders' @tool def wikipedia_search_3(query: str) -> str: """ Search Wikipedia and return the full content of the most relevant articles. Args: query: The search query. """ search_docs = WikipediaLoader(query=query, load_max_docs=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX, load_all_available_meta=True).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} @tool def execute_python_code_from_file(file_path: str) -> str: """ Reads a Python file from the given path, executes its code, and returns the combined stdout and stderr. WARNING: Executing arbitrary code from files is a significant security risk. Only use this tool with trusted code in a controlled environment. """ if not os.path.exists(file_path): return f"Error: File not found at '{file_path}'." if not file_path.endswith(".py"): return f"Error: Provided file '{file_path}' is not a Python (.py) file." try: # Use subprocess to run the Python file in a new process. # This provides some isolation compared to 'exec()' but is still dangerous for untrusted code. result = subprocess.run( [sys.executable, file_path], # sys.executable ensures it uses the current Python interpreter capture_output=True, # Capture stdout and stderr text=True, # Capture output as text (strings) check=False # Do not raise an exception for non-zero exit codes (handle errors manually) ) stdout_output = result.stdout.strip() stderr_output = result.stderr.strip() output_lines = [] if stdout_output: output_lines.append(f"STDOUT:\n{stdout_output}") if stderr_output: output_lines.append(f"STDERR:\n{stderr_output}") if result.returncode != 0: output_lines.append(f"Process exited with code {result.returncode}. This usually indicates an error.") if not output_lines: return "Execution completed with no output." return "\n".join(output_lines) except Exception as e: return f"An unexpected error occurred during code execution: {e}" @tool def download_taskid_file(task_id: str, file_name: str) -> str: """ Downloads the file associated with the given task_id (if any). Returns the absolute path of the file """ try: response = requests.get(f"{DATASET_API_URL}/files/{task_id}", timeout=20) response.raise_for_status() with open(file_name, 'wb') as file: file.write(response.content) return os.path.abspath(file_name) except Exception as e: return "Error occurred: {}".format(e) @tool def analyze_excel_file(file_path: str, query: str) -> str: """ Analyzes an Excel (.xlsx) file using pandas. Loads the specified Excel file into a pandas DataFrame and executes a Python query against it. The query should be a valid pandas DataFrame operation (e.g., df.head(), df.describe(), df[df['column_name'] > 10], df.groupby('category')['value'].mean()). Returns the result of the query as a string (JSON or string representation). """ if not os.path.exists(file_path): return f"Error: File not found at {file_path}" try: df = pd.read_excel(file_path) # Make the DataFrame accessible for the query local_vars = {"df": df} # Execute the query # IMPORTANT: Be extremely cautious with eval/exec for user-provided input in a production system. # For a ReAct agent, the LLM generates this query, so it's generally safer # if the LLM is well-constrained and reliable. # For sensitive applications, consider a safer parsing mechanism or a restricted set of operations. result = eval(query, {}, local_vars) return str(result) # Convert result to string for the LLM except Exception as e: return f"Error analyzing Excel file: {e}" def get_analyze_mp3_tool(llm): @tool def analyze_mp3_file(audio_path: str) -> str: """ Extract text from an mp3 audio file. """ all_text = "" try: # Read audio and encode as base64 with open(audio_path, "rb") as audio_file: audio_bytes = audio_file.read() audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") # Determine the MIME type for MP3 audio_mime_type = "audio/mpeg" # Or "audio/mp3", "audio/wav" etc. for other formats # Prepare the prompt including the base64 image data message = [ HumanMessage( content=[ { "type": "text", "text": ( "Extract all the text from this audio. " "Return only the extracted text, no explanations." ), }, { "type": "media", # <--- CORRECTED: Use 'media' type "data": audio_base64, # <--- Use 'data' for the base64 content "mime_type": audio_mime_type, # <--- Specify the MIME type } ] ) ] # Call the vision-capable model response = llm.invoke(message) # Append extracted text all_text += response.content + "\n\n" return all_text.strip() except Exception as e: print("Error extracting text from audio file:{} - {}".format(audio_path, e)) return "" return analyze_mp3_file def get_analyze_image_tool(llm): @tool def analyze_png_image(image_path: str) -> str: """ Analyzes a PNG image and returns a detailed description of its content. This tool requires an LLM capable of processing images, such as Gemini 1.5 Pro or Gemini 2.0 Flash. """ try: # Read image and encode as base64 with open(image_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") # Prepare the prompt including the base64 image data message = [ HumanMessage( content=[ { "type": "text", "text": ( "Provide a very detailed description of the content of this image. " "Focus on objects, people, actions, text, and overall scene context. " "Be as comprehensive as possible." ), }, { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}, }, ] ) ] # Call the vision-capable model response = llm.invoke(message) return response.content.strip() except Exception as e: print("Error analyzing image file:{} - {}".format(image_path, e)) return "" return analyze_png_image @tool def arxiv_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: query: The search query.""" search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) return {"arxiv_results": formatted_search_docs} @tool def get_youtube_transcript(url: str) -> dict: """Fetches the transcript from a YouTube video URL. Args: url: The URL of the YouTube video. Returns: A dictionary containing the transcript and metadata. The dictionary will have keys "transcript" (string, the video transcript or an error message) and "metadata" (dictionary, containing video title and other information, if available, otherwise empty). """ try: loader = YoutubeLoader.from_youtube_url(url, add_video_info=True) docs = loader.load() # Combine all transcript chunks into a single string transcript = "\n".join(doc.page_content for doc in docs) metadata = docs[0].metadata if docs else {} return {"transcript": transcript, "metadata": metadata} except Exception as e: if "Could not retrieve transcript" in str(e): return {"transcript": "No transcript available for this video.", "metadata": {}} else: return {"transcript": f"Error fetching transcript: {e}", "metadata": {}}