Spaces:
Sleeping
Sleeping
# ========================= | |
# Imports and Environment | |
# ========================= | |
import os | |
import requests | |
import subprocess | |
import tempfile | |
import base64 | |
import io | |
from pathlib import Path | |
from dotenv import load_dotenv | |
from typing import TypedDict, Annotated | |
from huggingface_hub import list_models | |
from langchain.tools import Tool | |
from langchain_community.utilities import SerpAPIWrapper | |
from langchain_core.messages import HumanMessage | |
from langchain_huggingface import ChatHuggingFace | |
from langchain_openai import ChatOpenAI | |
import openai | |
from pydub import AudioSegment | |
import pandas as pd | |
from PIL import Image | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_experimental.tools.python.tool import PythonREPLTool | |
import uuid | |
import pytesseract | |
from urllib.parse import urlparse | |
# Load environment variables | |
print("Current working directory:", os.getcwd()) | |
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), ".env")) | |
# ========================= | |
# 1. Web Search Tools | |
# ========================= | |
def serp_search(query: str) -> str: | |
""" | |
Searches the web using SerpAPI and returns the top result snippet. | |
Args: | |
query (str): The search query. | |
Returns: | |
str: The top result snippet or an error message. | |
""" | |
try: | |
search = SerpAPIWrapper() | |
results = search.run(query) | |
return results | |
except Exception as e: | |
return f"Search failed: {e}" | |
serp_search_tool = Tool( | |
name="serp_search_tool", | |
func=serp_search, | |
description="Searches the web using SerpAPI and returns the top result." | |
) | |
# ========================= | |
# 2. File Download/Handling Tools | |
# ========================= | |
# Note: File downloading is now handled in app.py via process_question_with_files() | |
# This section is kept for reference but the download_file_tool is not exported | |
def download_file(url: str, save_path: str) -> str: | |
""" | |
Downloads a file from a URL and saves it to the given path. | |
Args: | |
url (str): The URL from which to download the file. | |
save_path (str): The local file path where the downloaded file will be saved. | |
Returns: | |
str: A message indicating the result of the download operation. | |
""" | |
try: | |
# Reduced from 30 to 15 seconds | |
response = requests.get(url, timeout=15) | |
response.raise_for_status() | |
with open(save_path, "wb") as f: | |
f.write(response.content) | |
return f"File downloaded to {save_path}" | |
except Exception as e: | |
return f"Failed to download: {e}" | |
# download_file_tool is now used internally by process_question_with_files() in app.py | |
# and is not exported as a standalone tool for the agent | |
# ========================= | |
# 3. Python Execution Tools | |
# ========================= | |
def RunPythonFileTool(file_path: str) -> str: | |
""" | |
Executes a Python script loaded from the specified path using the PythonInterpreterTool if available, otherwise subprocess. | |
Args: | |
file_path (str): The full path to the python (.py) file containing the Python code. | |
Returns: | |
str: The output produced by the code execution, or an error message if it fails. | |
""" | |
try: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
with open(file_path, "r") as f: | |
code = f.read() | |
try: | |
from langchain.tools.python.tool import PythonInterpreterTool | |
interpreter = PythonInterpreterTool() | |
result = interpreter.run({"code": code}) | |
return result.get("output", "No output returned.") | |
except ImportError: | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp: | |
temp.write(code) | |
temp_path = temp.name | |
result = subprocess.run( | |
["python", temp_path], | |
capture_output=True, | |
text=True, | |
timeout=15 | |
) | |
os.unlink(temp_path) | |
if result.returncode == 0: | |
return result.stdout.strip() or "No output returned." | |
else: | |
return f"Error: {result.stderr.strip()}" | |
except subprocess.TimeoutExpired: | |
return "Error: Code execution timed out" | |
except Exception as e: | |
return f"Execution failed: {e}" | |
python_execution_tool = Tool( | |
name="python_execution_tool", | |
func=RunPythonFileTool, | |
description="Executes Python code and returns the output. Use this when you need to run Python scripts or calculate values." | |
) | |
# ========================= | |
# 4. Text Utilities | |
# ========================= | |
def ReverseTextTool(text: str) -> str: | |
""" | |
Reverses the order of characters in a given text string. | |
Args: | |
text (str): The text to reverse. | |
Returns: | |
str: The reversed text or an error message. | |
""" | |
try: | |
return text[::-1] | |
except Exception as e: | |
return f"Error reversing text: {str(e)}" | |
reverse_text_tool = Tool( | |
name="reverse_text_tool", | |
func=ReverseTextTool, | |
description="Reverses the order of characters in a given text string. Use this when you need to reverse text." | |
) | |
# ========================= | |
# 5. Audio, Video, and Image Tools | |
# ========================= | |
def process_audio(audio_file_path: str) -> str: | |
""" | |
Processes audio files to extract information and transcribe speech content. | |
Args: | |
audio_file_path (str): Path to the audio file. | |
Returns: | |
str: Transcription result or file info with error message. | |
""" | |
try: | |
if not os.path.exists(audio_file_path): | |
return f"Audio file not found: {audio_file_path}" | |
file_extension = Path(audio_file_path).suffix.lower() | |
# Check if it's an audio file we can process | |
if file_extension not in ['.mp3', '.wav', '.m4a', '.flac', '.ogg']: | |
file_size = os.path.getsize(audio_file_path) | |
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Unsupported audio format for transcription." | |
# Try to transcribe the audio | |
try: | |
# Initialize OpenAI client | |
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Convert MP3 to WAV if needed (Whisper works better with WAV) | |
if file_extension == '.mp3': | |
audio = AudioSegment.from_mp3(audio_file_path) | |
# Export as WAV to a temporary buffer | |
wav_buffer = io.BytesIO() | |
audio.export(wav_buffer, format="wav") | |
wav_buffer.seek(0) | |
# Use the WAV buffer for transcription | |
transcription = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=wav_buffer, | |
response_format="text" | |
) | |
else: | |
# For other formats, try direct transcription | |
with open(audio_file_path, "rb") as audio_file: | |
transcription = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="text" | |
) | |
file_size = os.path.getsize(audio_file_path) | |
return f"Transcription successful!\nFile: {audio_file_path}\nSize: {file_size} bytes\nType: {file_extension}\n\nTranscription:\n{transcription}" | |
except openai.AuthenticationError: | |
file_size = os.path.getsize(audio_file_path) | |
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. OpenAI API key not found or invalid. Please set OPENAI_API_KEY in your environment variables." | |
except openai.BadRequestError as e: | |
file_size = os.path.getsize(audio_file_path) | |
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Audio format not supported or file too large: {str(e)}" | |
except Exception as e: | |
file_size = os.path.getsize(audio_file_path) | |
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Transcription error: {str(e)}" | |
except Exception as e: | |
return f"Error processing audio: {str(e)}" | |
audio_processing_tool = Tool( | |
name="audio_processing_tool", | |
func=process_audio, | |
description="Transcribes audio files (MP3, WAV, M4A, FLAC, OGG) to text using speech recognition. Use this when you need to convert speech in audio files to text." | |
) | |
def analyze_video(video_url: str) -> str: | |
""" | |
Analyzes video content from YouTube or other video URLs. | |
Args: | |
video_url (str): The video URL. | |
Returns: | |
str: Video analysis or an error message. | |
""" | |
try: | |
if 'youtube.com' in video_url or 'youtu.be' in video_url: | |
video_id = None | |
if 'youtube.com/watch?v=' in video_url: | |
video_id = video_url.split('watch?v=')[1].split('&')[0] | |
elif 'youtu.be/' in video_url: | |
video_id = video_url.split('youtu.be/')[1].split('?')[0] | |
if video_id: | |
search_result = serp_search( | |
f"youtube video {video_id} title description") | |
return f"Video analysis for {video_id}: {search_result}" | |
else: | |
return "Could not extract video ID from URL" | |
else: | |
return "Video analysis currently supports YouTube videos only" | |
except Exception as e: | |
return f"Error analyzing video: {str(e)}" | |
video_analysis_tool = Tool( | |
name="video_analysis_tool", | |
func=analyze_video, | |
description="Analyzes video content from URLs. Use this when questions involve video content or YouTube links." | |
) | |
# ========================= | |
# 6. Image Recognition Tools | |
# ========================= | |
def image_recognition(img_path: str) -> str: | |
""" | |
Analyzes and describes the content of images using AI vision. | |
Args: | |
img_path (str): Path to the image file. | |
Returns: | |
str: Description or extracted text, or an error message. | |
""" | |
try: | |
if not os.path.exists(img_path): | |
return f"Error: Image file not found at {img_path}" | |
if not os.getenv("OPENAI_API_KEY"): | |
return "OpenAI API key not found. Please set OPENAI_API_KEY in your environment variables." | |
# Get image info first | |
try: | |
img = Image.open(img_path) | |
image_info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" | |
except Exception as e: | |
image_info = f"Image info error: {str(e)}" | |
# Try vision model | |
try: | |
vision_llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
with open(img_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
message = [ | |
HumanMessage( | |
content=[ | |
{"type": "text", "text": "Describe what you see in this image in detail. If there's text, extract it. If it's a chess position, describe the board state and pieces."}, | |
{"type": "image_url", "image_url": { | |
"url": f"data:image/png;base64,{image_base64}"}}, | |
] | |
) | |
] | |
response = vision_llm.invoke(message) | |
vision_result = response.content.strip() | |
# Check if we got a content policy response | |
if "sorry" in vision_result.lower() and "can't assist" in vision_result.lower(): | |
# Fallback to OCR | |
try: | |
import pytesseract | |
text = pytesseract.image_to_string(img).strip() | |
if text: | |
return f"{image_info}\n\nOCR extracted text:\n{text}" | |
else: | |
return f"{image_info}\n\nVision model blocked. OCR found no text." | |
except ImportError: | |
return f"{image_info}\n\nVision model blocked. OCR not available." | |
else: | |
return f"{image_info}\n\nVision analysis:\n{vision_result}" | |
except Exception as vision_error: | |
# Fallback to OCR if vision fails | |
try: | |
import pytesseract | |
text = pytesseract.image_to_string(img).strip() | |
if text: | |
return f"{image_info}\n\nVision failed, OCR extracted text:\n{text}" | |
else: | |
return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR found no text." | |
except ImportError: | |
return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR not available." | |
except Exception as e: | |
return f"Error analyzing image: {str(e)}" | |
image_recognition_tool = Tool( | |
name="image_recognition_tool", | |
func=image_recognition, | |
description="Analyzes and describes the content of images using AI vision. Use this when you need to understand what's in an image." | |
) | |
# ========================= | |
# 7. File Type Detection | |
# ========================= | |
def detect_file_type(file_path: str) -> str: | |
""" | |
Detects the type of file and provides appropriate handling suggestions. | |
Args: | |
file_path (str): Path to the file. | |
Returns: | |
str: File type info or an error message. | |
""" | |
try: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
file_extension = Path(file_path).suffix.lower() | |
file_size = os.path.getsize(file_path) | |
file_types = { | |
'.py': 'Python script', | |
'.mp3': 'Audio file', | |
'.mp4': 'Video file', | |
'.jpg': 'Image file', | |
'.jpeg': 'Image file', | |
'.png': 'Image file', | |
'.txt': 'Text file', | |
'.pdf': 'PDF document', | |
'.doc': 'Word document', | |
'.docx': 'Word document', | |
'.xls': 'Excel spreadsheet', | |
'.xlsx': 'Excel spreadsheet' | |
} | |
file_type = file_types.get(file_extension, 'Unknown file type') | |
return f"File: {file_path}, Type: {file_type}, Size: {file_size} bytes" | |
except Exception as e: | |
return f"Error detecting file type: {str(e)}" | |
file_type_detection_tool = Tool( | |
name="file_type_detection_tool", | |
func=detect_file_type, | |
description="Detects file types and provides information about files. Use this when you need to understand what type of file you're working with." | |
) | |
# ========================= | |
# 8. Enhanced File Reading Tools | |
# ========================= | |
def read_file(file_name: str) -> str: | |
""" | |
Read and process different file types (text, CSV, images). | |
""" | |
if not file_name or not os.path.exists(file_name): | |
return "File not found" | |
try: | |
file_extension = os.path.splitext(file_name)[1].lower() | |
if file_extension == ".csv": | |
return _read_csv_file(file_name) | |
elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: | |
return _read_image_file(file_name) | |
elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: | |
return _read_text_file(file_name) | |
else: | |
# Try to read as text file | |
return _read_text_file(file_name) | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def _read_text_file(file_name: str) -> str: | |
"""Read a text file.""" | |
try: | |
with open(file_name, "r", encoding="utf-8") as f: | |
content = f.read() | |
return content[:5000] # Limit to first 5000 characters | |
except UnicodeDecodeError: | |
# Try with different encoding | |
try: | |
with open(file_name, "r", encoding="latin-1") as f: | |
content = f.read() | |
return content[:5000] | |
except Exception as e: | |
return f"Text file reading error: {str(e)}" | |
def _read_csv_file(file_name: str) -> str: | |
"""Read and summarize a CSV file.""" | |
try: | |
df = pd.read_csv(file_name) | |
# Create a summary | |
summary = [] | |
summary.append( | |
f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") | |
summary.append(f"Columns: {', '.join(df.columns.tolist())}") | |
# Show first few rows | |
summary.append("\nFirst 5 rows:") | |
summary.append(df.head().to_string()) | |
# Show basic statistics for numeric columns | |
numeric_columns = df.select_dtypes(include=['number']).columns | |
if len(numeric_columns) > 0: | |
summary.append(f"\nNumeric column statistics:") | |
summary.append(df[numeric_columns].describe().to_string()) | |
return "\n".join(summary) | |
except Exception as e: | |
return f"CSV reading error: {str(e)}" | |
def _read_image_file(file_name: str) -> str: | |
"""Read and analyze an image file.""" | |
try: | |
# Try OCR first | |
try: | |
import pytesseract | |
img = Image.open(file_name) | |
# Get image info | |
info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" | |
# Try OCR | |
text = pytesseract.image_to_string(img).strip() | |
if text: | |
return f"{info}\n\nExtracted text:\n{text}" | |
else: | |
return f"{info}\n\nNo text detected in image." | |
except ImportError: | |
# OCR not available, just return image info | |
img = Image.open(file_name) | |
return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" | |
except Exception as e: | |
return f"Image reading error: {str(e)}" | |
read_file_tool = Tool( | |
name="read_file_tool", | |
func=read_file, | |
description="Reads and processes different file types including text files, CSV files, and images. Use this when you need to extract content from files." | |
) | |
# ========================= | |
# 9. Code Execution and Math Tools | |
# ========================= | |
def execute_code(code: str, timeout: int = 5) -> str: | |
""" | |
Execute Python code safely with timeout. | |
""" | |
try: | |
# Basic security check - prevent dangerous operations | |
dangerous_keywords = [ | |
"import os", "import subprocess", "__import__", "exec", "eval", "open("] | |
if any(keyword in code.lower() for keyword in dangerous_keywords): | |
return "Code execution blocked: potentially unsafe operations detected" | |
result = subprocess.run( | |
["python3", "-c", code], | |
capture_output=True, | |
text=True, | |
timeout=timeout, | |
cwd="/tmp" # Run in safe directory | |
) | |
if result.returncode == 0: | |
return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" | |
else: | |
return f"Code execution error: {result.stderr.strip()}" | |
except subprocess.TimeoutExpired: | |
return "Code execution timeout" | |
except Exception as e: | |
return f"Code execution error: {str(e)}" | |
def calculate_simple_math(expression: str) -> str: | |
""" | |
Safely evaluate simple mathematical expressions. | |
""" | |
try: | |
# Only allow basic math characters | |
allowed_chars = set("0123456789+-*/.() ") | |
if not all(c in allowed_chars for c in expression): | |
return "Invalid mathematical expression" | |
# Use eval safely for basic math | |
result = eval(expression) | |
return str(result) | |
except Exception as e: | |
return f"Math calculation error: {str(e)}" | |
code_execution_tool = Tool( | |
name="code_execution_tool", | |
func=execute_code, | |
description="Executes Python code safely with timeout and security checks. Use this when you need to run small Python code snippets." | |
) | |
math_calculation_tool = Tool( | |
name="math_calculation_tool", | |
func=calculate_simple_math, | |
description="Safely evaluates simple mathematical expressions. Use this when you need to perform basic math calculations." | |
) | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>' | |
f"\n{doc.page_content}\n</Document>" | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
wiki_search_tool = Tool( | |
name="wiki_search_tool", | |
func=wiki_search, | |
description="Search Wikipedia for a query and return up to 2 results. Use this for factual or historical questions." | |
) | |
python_tool = PythonREPLTool() | |
python_repl_tool = Tool( | |
name="python_repl_tool", | |
func=python_tool, | |
description="Executes Python code in a REPL environment. Use this for running Python code snippets interactively." | |
) | |
# --- New Tools --- | |
def extract_text_from_image(file_path: str) -> str: | |
try: | |
image = Image.open(file_path) | |
text = pytesseract.image_to_string(image) | |
return f"Extracted text from image:\n\n{text}" | |
except Exception as e: | |
return f"Error extracting text from image: {str(e)}" | |
extract_text_from_image_tool = Tool( | |
name="extract_text_from_image_tool", | |
func=extract_text_from_image, | |
description="Extract text from an image using OCR (pytesseract)." | |
) | |
def analyze_csv_file_simple(file_path: str) -> str: | |
"""Analyze a CSV file using pandas.""" | |
try: | |
df = pd.read_csv(file_path) | |
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
return result | |
except Exception as e: | |
return f"Error analyzing CSV file: {str(e)}" | |
analyze_csv_file_tool = Tool( | |
name="analyze_csv_file_tool", | |
func=analyze_csv_file_simple, | |
description="Analyze a CSV file using pandas and answer a question about it." | |
) | |
def analyze_excel_file_simple(file_path: str) -> str: | |
"""Analyze an Excel file using pandas.""" | |
try: | |
df = pd.read_excel(file_path) | |
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
return result | |
except Exception as e: | |
return f"Error analyzing Excel file: {str(e)}" | |
analyze_excel_file_tool = Tool( | |
name="analyze_excel_file_tool", | |
func=analyze_excel_file_simple, | |
description="Analyze an Excel file using pandas and answer a question about it." | |
) | |
# | |