santimber's picture
improed flow
362f128
# =========================
# Imports and Environment
# =========================
import os
import requests
import subprocess
import tempfile
import base64
import io
from pathlib import Path
from dotenv import load_dotenv
from typing import TypedDict, Annotated
from huggingface_hub import list_models
from langchain.tools import Tool
from langchain_community.utilities import SerpAPIWrapper
from langchain_core.messages import HumanMessage
from langchain_huggingface import ChatHuggingFace
from langchain_openai import ChatOpenAI
import openai
from pydub import AudioSegment
import pandas as pd
from PIL import Image
from langchain_community.document_loaders import WikipediaLoader
from langchain_experimental.tools.python.tool import PythonREPLTool
import uuid
import pytesseract
from urllib.parse import urlparse
# Load environment variables
print("Current working directory:", os.getcwd())
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), ".env"))
# =========================
# 1. Web Search Tools
# =========================
def serp_search(query: str) -> str:
"""
Searches the web using SerpAPI and returns the top result snippet.
Args:
query (str): The search query.
Returns:
str: The top result snippet or an error message.
"""
try:
search = SerpAPIWrapper()
results = search.run(query)
return results
except Exception as e:
return f"Search failed: {e}"
serp_search_tool = Tool(
name="serp_search_tool",
func=serp_search,
description="Searches the web using SerpAPI and returns the top result."
)
# =========================
# 2. File Download/Handling Tools
# =========================
# Note: File downloading is now handled in app.py via process_question_with_files()
# This section is kept for reference but the download_file_tool is not exported
def download_file(url: str, save_path: str) -> str:
"""
Downloads a file from a URL and saves it to the given path.
Args:
url (str): The URL from which to download the file.
save_path (str): The local file path where the downloaded file will be saved.
Returns:
str: A message indicating the result of the download operation.
"""
try:
# Reduced from 30 to 15 seconds
response = requests.get(url, timeout=15)
response.raise_for_status()
with open(save_path, "wb") as f:
f.write(response.content)
return f"File downloaded to {save_path}"
except Exception as e:
return f"Failed to download: {e}"
# download_file_tool is now used internally by process_question_with_files() in app.py
# and is not exported as a standalone tool for the agent
# =========================
# 3. Python Execution Tools
# =========================
def RunPythonFileTool(file_path: str) -> str:
"""
Executes a Python script loaded from the specified path using the PythonInterpreterTool if available, otherwise subprocess.
Args:
file_path (str): The full path to the python (.py) file containing the Python code.
Returns:
str: The output produced by the code execution, or an error message if it fails.
"""
try:
if not os.path.exists(file_path):
return f"File not found: {file_path}"
with open(file_path, "r") as f:
code = f.read()
try:
from langchain.tools.python.tool import PythonInterpreterTool
interpreter = PythonInterpreterTool()
result = interpreter.run({"code": code})
return result.get("output", "No output returned.")
except ImportError:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp:
temp.write(code)
temp_path = temp.name
result = subprocess.run(
["python", temp_path],
capture_output=True,
text=True,
timeout=15
)
os.unlink(temp_path)
if result.returncode == 0:
return result.stdout.strip() or "No output returned."
else:
return f"Error: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return "Error: Code execution timed out"
except Exception as e:
return f"Execution failed: {e}"
python_execution_tool = Tool(
name="python_execution_tool",
func=RunPythonFileTool,
description="Executes Python code and returns the output. Use this when you need to run Python scripts or calculate values."
)
# =========================
# 4. Text Utilities
# =========================
def ReverseTextTool(text: str) -> str:
"""
Reverses the order of characters in a given text string.
Args:
text (str): The text to reverse.
Returns:
str: The reversed text or an error message.
"""
try:
return text[::-1]
except Exception as e:
return f"Error reversing text: {str(e)}"
reverse_text_tool = Tool(
name="reverse_text_tool",
func=ReverseTextTool,
description="Reverses the order of characters in a given text string. Use this when you need to reverse text."
)
# =========================
# 5. Audio, Video, and Image Tools
# =========================
def process_audio(audio_file_path: str) -> str:
"""
Processes audio files to extract information and transcribe speech content.
Args:
audio_file_path (str): Path to the audio file.
Returns:
str: Transcription result or file info with error message.
"""
try:
if not os.path.exists(audio_file_path):
return f"Audio file not found: {audio_file_path}"
file_extension = Path(audio_file_path).suffix.lower()
# Check if it's an audio file we can process
if file_extension not in ['.mp3', '.wav', '.m4a', '.flac', '.ogg']:
file_size = os.path.getsize(audio_file_path)
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Unsupported audio format for transcription."
# Try to transcribe the audio
try:
# Initialize OpenAI client
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Convert MP3 to WAV if needed (Whisper works better with WAV)
if file_extension == '.mp3':
audio = AudioSegment.from_mp3(audio_file_path)
# Export as WAV to a temporary buffer
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_buffer.seek(0)
# Use the WAV buffer for transcription
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=wav_buffer,
response_format="text"
)
else:
# For other formats, try direct transcription
with open(audio_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
file_size = os.path.getsize(audio_file_path)
return f"Transcription successful!\nFile: {audio_file_path}\nSize: {file_size} bytes\nType: {file_extension}\n\nTranscription:\n{transcription}"
except openai.AuthenticationError:
file_size = os.path.getsize(audio_file_path)
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. OpenAI API key not found or invalid. Please set OPENAI_API_KEY in your environment variables."
except openai.BadRequestError as e:
file_size = os.path.getsize(audio_file_path)
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Audio format not supported or file too large: {str(e)}"
except Exception as e:
file_size = os.path.getsize(audio_file_path)
return f"Audio file: {audio_file_path}, Size: {file_size} bytes, Type: {file_extension}. Transcription error: {str(e)}"
except Exception as e:
return f"Error processing audio: {str(e)}"
audio_processing_tool = Tool(
name="audio_processing_tool",
func=process_audio,
description="Transcribes audio files (MP3, WAV, M4A, FLAC, OGG) to text using speech recognition. Use this when you need to convert speech in audio files to text."
)
def analyze_video(video_url: str) -> str:
"""
Analyzes video content from YouTube or other video URLs.
Args:
video_url (str): The video URL.
Returns:
str: Video analysis or an error message.
"""
try:
if 'youtube.com' in video_url or 'youtu.be' in video_url:
video_id = None
if 'youtube.com/watch?v=' in video_url:
video_id = video_url.split('watch?v=')[1].split('&')[0]
elif 'youtu.be/' in video_url:
video_id = video_url.split('youtu.be/')[1].split('?')[0]
if video_id:
search_result = serp_search(
f"youtube video {video_id} title description")
return f"Video analysis for {video_id}: {search_result}"
else:
return "Could not extract video ID from URL"
else:
return "Video analysis currently supports YouTube videos only"
except Exception as e:
return f"Error analyzing video: {str(e)}"
video_analysis_tool = Tool(
name="video_analysis_tool",
func=analyze_video,
description="Analyzes video content from URLs. Use this when questions involve video content or YouTube links."
)
# =========================
# 6. Image Recognition Tools
# =========================
def image_recognition(img_path: str) -> str:
"""
Analyzes and describes the content of images using AI vision.
Args:
img_path (str): Path to the image file.
Returns:
str: Description or extracted text, or an error message.
"""
try:
if not os.path.exists(img_path):
return f"Error: Image file not found at {img_path}"
if not os.getenv("OPENAI_API_KEY"):
return "OpenAI API key not found. Please set OPENAI_API_KEY in your environment variables."
# Get image info first
try:
img = Image.open(img_path)
image_info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}"
except Exception as e:
image_info = f"Image info error: {str(e)}"
# Try vision model
try:
vision_llm = ChatOpenAI(model="gpt-4o", temperature=0)
with open(img_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
message = [
HumanMessage(
content=[
{"type": "text", "text": "Describe what you see in this image in detail. If there's text, extract it. If it's a chess position, describe the board state and pieces."},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{image_base64}"}},
]
)
]
response = vision_llm.invoke(message)
vision_result = response.content.strip()
# Check if we got a content policy response
if "sorry" in vision_result.lower() and "can't assist" in vision_result.lower():
# Fallback to OCR
try:
import pytesseract
text = pytesseract.image_to_string(img).strip()
if text:
return f"{image_info}\n\nOCR extracted text:\n{text}"
else:
return f"{image_info}\n\nVision model blocked. OCR found no text."
except ImportError:
return f"{image_info}\n\nVision model blocked. OCR not available."
else:
return f"{image_info}\n\nVision analysis:\n{vision_result}"
except Exception as vision_error:
# Fallback to OCR if vision fails
try:
import pytesseract
text = pytesseract.image_to_string(img).strip()
if text:
return f"{image_info}\n\nVision failed, OCR extracted text:\n{text}"
else:
return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR found no text."
except ImportError:
return f"{image_info}\n\nVision failed: {str(vision_error)}. OCR not available."
except Exception as e:
return f"Error analyzing image: {str(e)}"
image_recognition_tool = Tool(
name="image_recognition_tool",
func=image_recognition,
description="Analyzes and describes the content of images using AI vision. Use this when you need to understand what's in an image."
)
# =========================
# 7. File Type Detection
# =========================
def detect_file_type(file_path: str) -> str:
"""
Detects the type of file and provides appropriate handling suggestions.
Args:
file_path (str): Path to the file.
Returns:
str: File type info or an error message.
"""
try:
if not os.path.exists(file_path):
return f"File not found: {file_path}"
file_extension = Path(file_path).suffix.lower()
file_size = os.path.getsize(file_path)
file_types = {
'.py': 'Python script',
'.mp3': 'Audio file',
'.mp4': 'Video file',
'.jpg': 'Image file',
'.jpeg': 'Image file',
'.png': 'Image file',
'.txt': 'Text file',
'.pdf': 'PDF document',
'.doc': 'Word document',
'.docx': 'Word document',
'.xls': 'Excel spreadsheet',
'.xlsx': 'Excel spreadsheet'
}
file_type = file_types.get(file_extension, 'Unknown file type')
return f"File: {file_path}, Type: {file_type}, Size: {file_size} bytes"
except Exception as e:
return f"Error detecting file type: {str(e)}"
file_type_detection_tool = Tool(
name="file_type_detection_tool",
func=detect_file_type,
description="Detects file types and provides information about files. Use this when you need to understand what type of file you're working with."
)
# =========================
# 8. Enhanced File Reading Tools
# =========================
def read_file(file_name: str) -> str:
"""
Read and process different file types (text, CSV, images).
"""
if not file_name or not os.path.exists(file_name):
return "File not found"
try:
file_extension = os.path.splitext(file_name)[1].lower()
if file_extension == ".csv":
return _read_csv_file(file_name)
elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
return _read_image_file(file_name)
elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]:
return _read_text_file(file_name)
else:
# Try to read as text file
return _read_text_file(file_name)
except Exception as e:
return f"Error reading file: {str(e)}"
def _read_text_file(file_name: str) -> str:
"""Read a text file."""
try:
with open(file_name, "r", encoding="utf-8") as f:
content = f.read()
return content[:5000] # Limit to first 5000 characters
except UnicodeDecodeError:
# Try with different encoding
try:
with open(file_name, "r", encoding="latin-1") as f:
content = f.read()
return content[:5000]
except Exception as e:
return f"Text file reading error: {str(e)}"
def _read_csv_file(file_name: str) -> str:
"""Read and summarize a CSV file."""
try:
df = pd.read_csv(file_name)
# Create a summary
summary = []
summary.append(
f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns")
summary.append(f"Columns: {', '.join(df.columns.tolist())}")
# Show first few rows
summary.append("\nFirst 5 rows:")
summary.append(df.head().to_string())
# Show basic statistics for numeric columns
numeric_columns = df.select_dtypes(include=['number']).columns
if len(numeric_columns) > 0:
summary.append(f"\nNumeric column statistics:")
summary.append(df[numeric_columns].describe().to_string())
return "\n".join(summary)
except Exception as e:
return f"CSV reading error: {str(e)}"
def _read_image_file(file_name: str) -> str:
"""Read and analyze an image file."""
try:
# Try OCR first
try:
import pytesseract
img = Image.open(file_name)
# Get image info
info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}"
# Try OCR
text = pytesseract.image_to_string(img).strip()
if text:
return f"{info}\n\nExtracted text:\n{text}"
else:
return f"{info}\n\nNo text detected in image."
except ImportError:
# OCR not available, just return image info
img = Image.open(file_name)
return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)"
except Exception as e:
return f"Image reading error: {str(e)}"
read_file_tool = Tool(
name="read_file_tool",
func=read_file,
description="Reads and processes different file types including text files, CSV files, and images. Use this when you need to extract content from files."
)
# =========================
# 9. Code Execution and Math Tools
# =========================
def execute_code(code: str, timeout: int = 5) -> str:
"""
Execute Python code safely with timeout.
"""
try:
# Basic security check - prevent dangerous operations
dangerous_keywords = [
"import os", "import subprocess", "__import__", "exec", "eval", "open("]
if any(keyword in code.lower() for keyword in dangerous_keywords):
return "Code execution blocked: potentially unsafe operations detected"
result = subprocess.run(
["python3", "-c", code],
capture_output=True,
text=True,
timeout=timeout,
cwd="/tmp" # Run in safe directory
)
if result.returncode == 0:
return result.stdout.strip() if result.stdout else "Code executed successfully (no output)"
else:
return f"Code execution error: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return "Code execution timeout"
except Exception as e:
return f"Code execution error: {str(e)}"
def calculate_simple_math(expression: str) -> str:
"""
Safely evaluate simple mathematical expressions.
"""
try:
# Only allow basic math characters
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
return "Invalid mathematical expression"
# Use eval safely for basic math
result = eval(expression)
return str(result)
except Exception as e:
return f"Math calculation error: {str(e)}"
code_execution_tool = Tool(
name="code_execution_tool",
func=execute_code,
description="Executes Python code safely with timeout and security checks. Use this when you need to run small Python code snippets."
)
math_calculation_tool = Tool(
name="math_calculation_tool",
func=calculate_simple_math,
description="Safely evaluates simple mathematical expressions. Use this when you need to perform basic math calculations."
)
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return maximum 2 results."""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>'
f"\n{doc.page_content}\n</Document>"
for doc in search_docs
])
return formatted_search_docs
wiki_search_tool = Tool(
name="wiki_search_tool",
func=wiki_search,
description="Search Wikipedia for a query and return up to 2 results. Use this for factual or historical questions."
)
python_tool = PythonREPLTool()
python_repl_tool = Tool(
name="python_repl_tool",
func=python_tool,
description="Executes Python code in a REPL environment. Use this for running Python code snippets interactively."
)
# --- New Tools ---
def extract_text_from_image(file_path: str) -> str:
try:
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text}"
except Exception as e:
return f"Error extracting text from image: {str(e)}"
extract_text_from_image_tool = Tool(
name="extract_text_from_image_tool",
func=extract_text_from_image,
description="Extract text from an image using OCR (pytesseract)."
)
def analyze_csv_file_simple(file_path: str) -> str:
"""Analyze a CSV file using pandas."""
try:
df = pd.read_csv(file_path)
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
analyze_csv_file_tool = Tool(
name="analyze_csv_file_tool",
func=analyze_csv_file_simple,
description="Analyze a CSV file using pandas and answer a question about it."
)
def analyze_excel_file_simple(file_path: str) -> str:
"""Analyze an Excel file using pandas."""
try:
df = pd.read_excel(file_path)
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
analyze_excel_file_tool = Tool(
name="analyze_excel_file_tool",
func=analyze_excel_file_simple,
description="Analyze an Excel file using pandas and answer a question about it."
)
#