# ========================= # Imports and Environment # ========================= import os import requests import subprocess import tempfile import base64 import io from pathlib import Path from dotenv import load_dotenv from typing import TypedDict, Annotated from huggingface_hub import list_models from langchain.tools import Tool from langchain_community.utilities import SerpAPIWrapper from langchain_core.messages import HumanMessage from langchain_huggingface import ChatHuggingFace from langchain_openai import ChatOpenAI import openai from pydub import AudioSegment import pandas as pd from PIL import Image from langchain_community.document_loaders import WikipediaLoader from langchain_experimental.tools.python.tool import PythonREPLTool import uuid import pytesseract from urllib.parse import urlparse # Load environment variables print("Current working directory:", os.getcwd()) load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), ".env")) # ========================= # 1. Web Search Tools # ========================= def serp_search(query: str) -> str: """ Searches the web using SerpAPI and returns the top result snippet. Args: query (str): The search query. Returns: str: The top result snippet or an error message. """ try: search = SerpAPIWrapper() results = search.run(query) return results except Exception as e: return f"Search failed: {e}" serp_search_tool = Tool( name="serp_search_tool", func=serp_search, description="Searches the web using SerpAPI and returns the top result." ) # ========================= # 2. File Download/Handling Tools # ========================= # Note: File downloading is now handled in app.py via process_question_with_files() # This section is kept for reference but the download_file_tool is not exported def download_file(url: str, save_path: str) -> str: """ Downloads a file from a URL and saves it to the given path. Args: url (str): The URL from which to download the file. save_path (str): The local file path where the downloaded file will be saved. Returns: str: A message indicating the result of the download operation. """ try: # Reduced from 30 to 15 seconds response = requests.get(url, timeout=15) response.raise_for_status() with open(save_path, "wb") as f: f.write(response.content) return f"File downloaded to {save_path}" except Exception as e: return f"Failed to download: {e}" # download_file_tool is now used internally by process_question_with_files() in app.py # and is not exported as a standalone tool for the agent # ========================= # 3. Python Execution Tools # ========================= def RunPythonFileTool(file_path: str) -> str: """ Executes a Python script loaded from the specified path using the PythonInterpreterTool if available, otherwise subprocess. Args: file_path (str): The full path to the python (.py) file containing the Python code. Returns: str: The output produced by the code execution, or an error message if it fails. """ try: if not os.path.exists(file_path): return f"File not found: {file_path}" with open(file_path, "r") as f: code = f.read() try: from langchain.tools.python.tool import PythonInterpreterTool interpreter = PythonInterpreterTool() result = interpreter.run({"code": code}) return result.get("output", "No output returned.") except ImportError: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp: temp.write(code) temp_path = temp.name result = subprocess.run( ["python", temp_path], capture_output=True, text=True, timeout=15 ) os.unlink(temp_path) if result.returncode == 0: return result.stdout.strip() or "No output returned." else: return f"Error: {result.stderr.strip()}" except subprocess.TimeoutExpired: return "Error: Code execution timed out" except Exception as e: return f"Execution failed: {e}" python_execution_tool = Tool( name="python_execution_tool", func=RunPythonFileTool, description="Executes Python code and returns the output. Use this when you need to run Python scripts or calculate values." ) # ========================= # 4. Text Utilities # ========================= def ReverseTextTool(text: str) -> str: """ Reverses the order of characters in a given text string. Args: text (str): The text to reverse. Returns: str: The reversed text or an error message. """ try: return text[::-1] except Exception as e: return f"Error reversing text: {str(e)}" reverse_text_tool = Tool( name="reverse_text_tool", func=ReverseTextTool, description="Reverses the order of characters in a given text string. Use this when you need to reverse text." ) # ========================= # 5. Audio, Video, and Image Tools # ========================= def process_audio(audio_file_path: str) -> str: """ Processes audio files to extract information and transcribe speech content. Args: audio_file_path (str): Path to the audio file. Returns: str: Transcription result or file info with error message. """ try: if not os.path.exists(audio_file_path): return f"Audio file not found: {audio_file_path}" file_extension = Path(audio_file_path).suffix.lower() # Check if it's an audio file we can process if file_extension not in ['.mp3', '.wav', '.m4a', '.flac', '.ogg']: file_size = os.path.getsize(audio_file_path) return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Unsupported audio format for transcription." # Try to transcribe the audio try: # Initialize OpenAI client client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Convert MP3 to WAV if needed (Whisper works better with WAV) if file_extension == '.mp3': audio = AudioSegment.from_mp3(audio_file_path) # Export as WAV to a temporary buffer wav_buffer = io.BytesIO() audio.export(wav_buffer, format="wav") wav_buffer.seek(0) # Use the WAV buffer for transcription transcription = client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) else: # For other formats, try direct transcription with open(audio_file_path, "rb") as audio_file: transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="text" ) file_size = os.path.getsize(audio_file_path) return f"Transcription successful!\nFile: {audio_file_path}\nSize: {file_size} bytes\nType: {file_extension}\n\nTranscription:\n{transcription}" except openai.AuthenticationError: file_size = os.path.getsize(audio_file_path) return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. OpenAI API key not found or invalid. Please set OPENAI_API_KEY in your environment variables." except openai.BadRequestError as e: file_size = os.path.getsize(audio_file_path) return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Audio format not supported or file too large: {str(e)}" except Exception as e: file_size = os.path.getsize(audio_file_path) return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Transcription error: {str(e)}" except Exception as e: return f"Error processing audio: {str(e)}" audio_processing_tool = Tool( name="audio_processing_tool", func=process_audio, description="Transcribes audio files (MP3, WAV, M4A, FLAC, OGG) to text using speech recognition. Use this when you need to convert speech in audio files to text." ) def analyze_video(video_url: str) -> str: """ Analyzes video content from YouTube or other video URLs. Args: video_url (str): The video URL. Returns: str: Video analysis or an error message. """ try: if 'youtube.com' in video_url or 'youtu.be' in video_url: video_id = None if 'youtube.com/watch?v=' in video_url: video_id = video_url.split('watch?v=')[1].split('&')[0] elif 'youtu.be/' in video_url: video_id = video_url.split('youtu.be/')[1].split('?')[0] if video_id: search_result = serp_search( f"youtube video {video_id} title description") return f"Video analysis for {video_id}: {search_result}" else: return "Could not extract video ID from URL" else: return "Video analysis currently supports YouTube videos only" except Exception as e: return f"Error analyzing video: {str(e)}" video_analysis_tool = Tool( name="video_analysis_tool", func=analyze_video, description="Analyzes video content from URLs. Use this when questions involve video content or YouTube links." ) # ========================= # 6. Image Recognition Tools # ========================= def image_recognition(img_path: str) -> str: """ Analyzes and describes the content of images using AI vision. Args: img_path (str): Path to the image file. Returns: str: Description or extracted text, or an error message. """ try: if not os.path.exists(img_path): return f"Error: Image file not found at {img_path}" if not os.getenv("OPENAI_API_KEY"): return "OpenAI API key not found. Please set OPENAI_API_KEY in your environment variables." # Get image info first try: img = Image.open(img_path) image_info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" except Exception as e: image_info = f"Image info error: {str(e)}" # Try vision model try: vision_llm = ChatOpenAI(model="gpt-4o", temperature=0) with open(img_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") message = [ HumanMessage( content=[ {"type": "text", "text": "Describe what you see in this image in detail. If there's text, extract it. If it's a chess position, describe the board state and pieces."}, {"type": "image_url", "image_url": { "url": f"data:image/png;base64,{image_base64}"}}, ] ) ] response = vision_llm.invoke(message) vision_result = response.content.strip() # Check if we got a content policy response if "sorry" in vision_result.lower() and "can't assist" in vision_result.lower(): # Fallback to OCR try: import pytesseract text = pytesseract.image_to_string(img).strip() if text: return f"{image_info}\n\nOCR extracted text:\n{text}" else: return f"{image_info}\n\nVision model blocked. OCR found no text." except ImportError: return f"{image_info}\n\nVision model blocked. OCR not available." else: return f"{image_info}\n\nVision analysis:\n{vision_result}" except Exception as vision_error: # Fallback to OCR if vision fails try: import pytesseract text = pytesseract.image_to_string(img).strip() if text: return f"{image_info}\n\nVision failed, OCR extracted text:\n{text}" else: return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR found no text." except ImportError: return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR not available." except Exception as e: return f"Error analyzing image: {str(e)}" image_recognition_tool = Tool( name="image_recognition_tool", func=image_recognition, description="Analyzes and describes the content of images using AI vision. Use this when you need to understand what's in an image." ) # ========================= # 7. File Type Detection # ========================= def detect_file_type(file_path: str) -> str: """ Detects the type of file and provides appropriate handling suggestions. Args: file_path (str): Path to the file. Returns: str: File type info or an error message. """ try: if not os.path.exists(file_path): return f"File not found: {file_path}" file_extension = Path(file_path).suffix.lower() file_size = os.path.getsize(file_path) file_types = { '.py': 'Python script', '.mp3': 'Audio file', '.mp4': 'Video file', '.jpg': 'Image file', '.jpeg': 'Image file', '.png': 'Image file', '.txt': 'Text file', '.pdf': 'PDF document', '.doc': 'Word document', '.docx': 'Word document', '.xls': 'Excel spreadsheet', '.xlsx': 'Excel spreadsheet' } file_type = file_types.get(file_extension, 'Unknown file type') return f"File: {file_path}, Type: {file_type}, Size: {file_size} bytes" except Exception as e: return f"Error detecting file type: {str(e)}" file_type_detection_tool = Tool( name="file_type_detection_tool", func=detect_file_type, description="Detects file types and provides information about files. Use this when you need to understand what type of file you're working with." ) # ========================= # 8. Enhanced File Reading Tools # ========================= def read_file(file_name: str) -> str: """ Read and process different file types (text, CSV, images). """ if not file_name or not os.path.exists(file_name): return "File not found" try: file_extension = os.path.splitext(file_name)[1].lower() if file_extension == ".csv": return _read_csv_file(file_name) elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: return _read_image_file(file_name) elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: return _read_text_file(file_name) else: # Try to read as text file return _read_text_file(file_name) except Exception as e: return f"Error reading file: {str(e)}" def _read_text_file(file_name: str) -> str: """Read a text file.""" try: with open(file_name, "r", encoding="utf-8") as f: content = f.read() return content[:5000] # Limit to first 5000 characters except UnicodeDecodeError: # Try with different encoding try: with open(file_name, "r", encoding="latin-1") as f: content = f.read() return content[:5000] except Exception as e: return f"Text file reading error: {str(e)}" def _read_csv_file(file_name: str) -> str: """Read and summarize a CSV file.""" try: df = pd.read_csv(file_name) # Create a summary summary = [] summary.append( f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") summary.append(f"Columns: {', '.join(df.columns.tolist())}") # Show first few rows summary.append("\nFirst 5 rows:") summary.append(df.head().to_string()) # Show basic statistics for numeric columns numeric_columns = df.select_dtypes(include=['number']).columns if len(numeric_columns) > 0: summary.append(f"\nNumeric column statistics:") summary.append(df[numeric_columns].describe().to_string()) return "\n".join(summary) except Exception as e: return f"CSV reading error: {str(e)}" def _read_image_file(file_name: str) -> str: """Read and analyze an image file.""" try: # Try OCR first try: import pytesseract img = Image.open(file_name) # Get image info info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" # Try OCR text = pytesseract.image_to_string(img).strip() if text: return f"{info}\n\nExtracted text:\n{text}" else: return f"{info}\n\nNo text detected in image." except ImportError: # OCR not available, just return image info img = Image.open(file_name) return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" except Exception as e: return f"Image reading error: {str(e)}" read_file_tool = Tool( name="read_file_tool", func=read_file, description="Reads and processes different file types including text files, CSV files, and images. Use this when you need to extract content from files." ) # ========================= # 9. Code Execution and Math Tools # ========================= def execute_code(code: str, timeout: int = 5) -> str: """ Execute Python code safely with timeout. """ try: # Basic security check - prevent dangerous operations dangerous_keywords = [ "import os", "import subprocess", "__import__", "exec", "eval", "open("] if any(keyword in code.lower() for keyword in dangerous_keywords): return "Code execution blocked: potentially unsafe operations detected" result = subprocess.run( ["python3", "-c", code], capture_output=True, text=True, timeout=timeout, cwd="/tmp" # Run in safe directory ) if result.returncode == 0: return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" else: return f"Code execution error: {result.stderr.strip()}" except subprocess.TimeoutExpired: return "Code execution timeout" except Exception as e: return f"Code execution error: {str(e)}" def calculate_simple_math(expression: str) -> str: """ Safely evaluate simple mathematical expressions. """ try: # Only allow basic math characters allowed_chars = set("0123456789+-*/.() ") if not all(c in allowed_chars for c in expression): return "Invalid mathematical expression" # Use eval safely for basic math result = eval(expression) return str(result) except Exception as e: return f"Math calculation error: {str(e)}" code_execution_tool = Tool( name="code_execution_tool", func=execute_code, description="Executes Python code safely with timeout and security checks. Use this when you need to run small Python code snippets." ) math_calculation_tool = Tool( name="math_calculation_tool", func=calculate_simple_math, description="Safely evaluates simple mathematical expressions. Use this when you need to perform basic math calculations." ) def wiki_search(query: str) -> str: """Search Wikipedia for a query and return maximum 2 results.""" search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'' f"\n{doc.page_content}\n" for doc in search_docs ]) return formatted_search_docs wiki_search_tool = Tool( name="wiki_search_tool", func=wiki_search, description="Search Wikipedia for a query and return up to 2 results. Use this for factual or historical questions." ) python_tool = PythonREPLTool() python_repl_tool = Tool( name="python_repl_tool", func=python_tool, description="Executes Python code in a REPL environment. Use this for running Python code snippets interactively." ) # --- New Tools --- def extract_text_from_image(file_path: str) -> str: try: image = Image.open(file_path) text = pytesseract.image_to_string(image) return f"Extracted text from image:\n\n{text}" except Exception as e: return f"Error extracting text from image: {str(e)}" extract_text_from_image_tool = Tool( name="extract_text_from_image_tool", func=extract_text_from_image, description="Extract text from an image using OCR (pytesseract)." ) def analyze_csv_file_simple(file_path: str) -> str: """Analyze a CSV file using pandas.""" try: df = pd.read_csv(file_path) result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing CSV file: {str(e)}" analyze_csv_file_tool = Tool( name="analyze_csv_file_tool", func=analyze_csv_file_simple, description="Analyze a CSV file using pandas and answer a question about it." ) def analyze_excel_file_simple(file_path: str) -> str: """Analyze an Excel file using pandas.""" try: df = pd.read_excel(file_path) result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except Exception as e: return f"Error analyzing Excel file: {str(e)}" analyze_excel_file_tool = Tool( name="analyze_excel_file_tool", func=analyze_excel_file_simple, description="Analyze an Excel file using pandas and answer a question about it." ) #