Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| from langchain_tavily import TavilySearch | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_core.tools import tool # Consolidated import for @tool decorator | |
| from datetime import datetime | |
| from langchain_experimental.utilities import PythonREPL | |
| import pypdf | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.tools import Tool, tool | |
| # Load environment variables | |
| load_dotenv() | |
| # === MATH TOOL === | |
| def calculator(a: float, b: float, operation: str) -> float: | |
| """ | |
| Performs a mathematical operation (addition, subtraction, multiplication, division) on two numbers. | |
| Input should be a dictionary with 'a' (float, first number), 'b' (float, second number), | |
| and 'operation' (string, e.g., 'add', 'subtract', 'multiply', 'divide') keys. | |
| Example: {"a": 5.5, "b": 10.0, "operation": "add"} | |
| """ | |
| if operation == "add": | |
| return a + b | |
| elif operation == "subtract": | |
| return a - b | |
| elif operation == "multiply": | |
| return a * b | |
| elif operation == "divide": | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| else: | |
| raise ValueError("Invalid operation. Choose from 'add', 'subtract', 'multiply', 'divide'.") | |
| # === SEARCH TOOLS === | |
| def wikipedia_search(query: str) -> dict: | |
| """ | |
| Search Wikipedia for a given query and return up to 2 relevant document results. | |
| Useful for factual questions about people, places, events, etc. | |
| Input should be a string representing the search query. | |
| Example: {"query": "Barack Obama"} | |
| The output is a dictionary with a 'wiki_results' key containing the formatted search results. | |
| """ | |
| try: | |
| if not query.strip(): | |
| return {"wiki_results": "Error: Empty query provided."} | |
| # LangChain's WikipediaLoader returns Document objects | |
| loader = WikipediaLoader(query=query, load_max_docs=1, lang="es") | |
| search_docs = loader.load() | |
| if not search_docs: | |
| return {"wiki_results": "No results found on Wikipedia."} | |
| # Format results for the LLM, limiting content to avoid loops | |
| formatted = "\n\n---\n\n".join( | |
| f'<Document source="{doc.metadata.get("source", "unknown")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1500]}\n</Document>' | |
| for doc in search_docs | |
| ) | |
| return {"wiki_results": formatted} | |
| except Exception as e: | |
| return {"wiki_results": f"Error during Wikipedia search: {str(e)}"} | |
| def web_search(query: str) -> dict: | |
| """ | |
| Search the web using Tavily for a given query and return up to 3 relevant snippets. | |
| Useful for up-to-date information, current events, or general web searches. | |
| Input should be a string representing the search query. Requires TAVILY_API_KEY environment variable. | |
| Example: {"query": "latest news on AI"} | |
| The output is a dictionary with a 'web_results' key containing the formatted search results. | |
| """ | |
| try: | |
| if not query.strip(): | |
| return {"web_results": "Error: Empty query provided."} | |
| if not os.getenv("TAVILY_API_KEY"): | |
| return {"web_results": "Error: Tavily API key is not configured."} | |
| # TavilySearchResults.invoke expects 'input' as a keyword argument | |
| search_results = TavilySearch(max_results=3).invoke(input=query) | |
| if not search_results: | |
| return {"web_results": "No results found on the web."} | |
| # Format results for the LLM, accessing dictionary keys instead of a 'metadata' attribute | |
| formatted = "\n\n---\n\n".join( | |
| f'<Document source="{result.get("url", "unknown")}" page="" />\n{result.get("content", "")}\n</Document>' | |
| for result in search_results | |
| ) | |
| return {"web_results": formatted} | |
| except Exception as e: | |
| return {"web_results": f"Error during web search: {str(e)}"} | |
| # === UTILITY TOOLS === | |
| def get_current_datetime(format_string: str = "%Y-%m-%d %H:%M:%S") -> str: | |
| """ | |
| Returns the current date and time in a specified format. | |
| Useful for questions related to the current date, time, or for calculating durations. | |
| Input is an optional format_string (string, default: "%Y-%m-%d %H:%M:%S"). | |
| Example: {"format_string": "%A, %B %d, %Y"} will return "Wednesday, July 16, 2025". | |
| """ | |
| try: | |
| # Using current time as per system prompt guidance | |
| current_time = datetime(2025, 7, 16, 12, 43, 1) # Specific time provided in context | |
| return current_time.strftime(format_string) | |
| except Exception as e: | |
| return f"Error getting current datetime: {str(e)}" | |
| def pdf_qa(pdf_path: str, query: str) -> str: | |
| """ | |
| Answers a question by searching for information within a specific PDF file. | |
| Args: | |
| pdf_path: The file path to the PDF document. | |
| query: The question to answer. | |
| """ | |
| try: | |
| # 1. Load the document | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load_and_split() | |
| # 2. Create the embeddings and vector store | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| vector_store = FAISS.from_documents(documents, embeddings) | |
| # 3. Create the retriever | |
| retriever = vector_store.as_retriever(search_kwargs={"k": 2}) | |
| # 4. Find relevant documents | |
| relevant_docs = retriever.invoke(query) | |
| # 5. Format the retrieved context for the LLM | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| # 6. Return the context and the query to the agent for final answer generation | |
| return f"Context from PDF:\n{context}\n\nUser Query: {query}" | |
| except FileNotFoundError: | |
| return f"Error: The PDF file '{pdf_path}' was not found." | |
| except Exception as e: | |
| return f"An error occurred while processing the PDF: {e}" | |
| # === CODE EXECUTION TOOL === | |
| # Initialize PythonREPL | |
| python_repl_instance = PythonREPL() | |
| # Create a LangChain Tool from the PythonREPL | |
| def execute_python_code(code: str) -> str: | |
| """ | |
| Executes Python code and returns the output. | |
| Useful for mathematical calculations, string manipulations, list operations, | |
| logic problems, and any task that can be solved with Python code. | |
| Input should be a string containing valid Python code to execute. | |
| Example: {"code": "print(2 + 2)"} | |
| """ | |
| try: | |
| # LangChain's PythonREPL.run expects a string input | |
| return python_repl_instance.run(code) | |
| except Exception as e: | |
| return f"Error executing Python code: {str(e)}" | |
| # === TOOLSET EXPORT === | |
| # List of all available tools to be imported by agent.py | |
| tools_for_llm = [ | |
| calculator, | |
| wikipedia_search, | |
| web_search, | |
| get_current_datetime, | |
| pdf_qa, | |
| execute_python_code, | |
| ] | |
| # For local testing of tools (optional) | |
| if __name__ == "__main__": | |
| print("Testing tools.py functionalities...") | |
| # Set dummy API key for testing if not already set in .env | |
| # os.environ["TAVILY_API_KEY"] = "YOUR_TAVILY_API_KEY" # Replace with a real key for actual testing | |
| # Test Math Tool | |
| print("\n--- Calculator Tool Test ---") | |
| print(f"Calculator(5, 3, 'multiply'): {calculator.invoke({'a': 5, 'b': 3, 'operation': 'multiply'})}") | |
| print(f"Calculator(10.5, 2.3, 'add'): {calculator.invoke({'a': 10.5, 'b': 2.3, 'operation': 'add'})}") | |
| try: | |
| print(f"Calculator(7, 0, 'divide') (should error): {calculator.invoke({'a': 7, 'b': 0, 'operation': 'divide'})}") | |
| except ValueError as e: | |
| print(f" Error caught as expected: {e}") | |
| # Test Search Tools | |
| print("\n--- Search Tools Test ---") | |
| wiki_res = wikipedia_search.invoke({'query': 'Artificial Intelligence'}) | |
| print(f"Wiki Search 'Artificial Intelligence': {wiki_res['wiki_results'][:200]}...") | |
| web_res = web_search.invoke({'query': 'Hugging Face new features'}) | |
| print(f"Web Search 'Hugging Face new features': {web_res['web_results'][:200]}...") | |
| # Test Utility Tool | |
| print("\n--- Utility Tools Test ---") | |
| print(f"Current Datetime (default): {get_current_datetime.invoke({})}") | |
| print(f"Current Datetime (custom format): {get_current_datetime.invoke({'format_string': '%A, %d %B %Y'})}") | |
| # Test Python REPL Tool | |
| print("\n--- Python REPL Tool Test ---") | |
| print(f"Python REPL '2 + 2': {python_repl.invoke({'code': '2 + 2'})}") | |
| test_code_len = 'len("hello")' | |
| print(f"Python REPL '{test_code_len}': {python_repl.invoke({'code': test_code_len})}") | |
| print(f"Python REPL error: {python_repl.invoke({'code': '10 / 0'})}") |