|
from langchain.tools import Tool |
|
import requests |
|
import os |
|
from PIL import Image |
|
import io |
|
import base64 |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from typing import Optional |
|
import json |
|
import PyPDF2 |
|
import tempfile |
|
|
|
|
|
search_tool = DuckDuckGoSearchRun() |
|
|
|
def web_search_tool_func(query: str) -> str: |
|
"""Searches the web for information using DuckDuckGo.""" |
|
try: |
|
results = search_tool.run(query) |
|
return results |
|
except Exception as e: |
|
return f"Web search failed: {str(e)}" |
|
|
|
web_search_tool = Tool( |
|
name="web_search", |
|
func=web_search_tool_func, |
|
description="Searches the web for current information. Use this for factual questions, recent events, or when you need to find information not in your training data." |
|
) |
|
|
|
def file_download_tool_func(task_id: str) -> str: |
|
"""Downloads a file associated with a GAIA task ID.""" |
|
try: |
|
api_url = "https://agents-course-unit4-scoring.hf.space" |
|
file_url = f"{api_url}/files/{task_id}" |
|
|
|
response = requests.get(file_url, timeout=30) |
|
response.raise_for_status() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".tmp") as temp_file: |
|
temp_file.write(response.content) |
|
temp_path = temp_file.name |
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
|
|
if 'image' in content_type: |
|
return f"Image file downloaded to {temp_path}. Use image_analysis_tool to analyze it." |
|
elif 'pdf' in content_type: |
|
return process_pdf_file(temp_path) |
|
elif 'text' in content_type: |
|
with open(temp_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
os.unlink(temp_path) |
|
return f"Text file content:\n{content}" |
|
else: |
|
return f"File downloaded to {temp_path}. Content type: {content_type}" |
|
|
|
except Exception as e: |
|
return f"Failed to download file for task {task_id}: {str(e)}" |
|
|
|
def process_pdf_file(file_path: str) -> str: |
|
"""Process a PDF file and extract text content.""" |
|
try: |
|
with open(file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
text_content = "" |
|
|
|
for page_num in range(len(pdf_reader.pages)): |
|
page = pdf_reader.pages[page_num] |
|
text_content += f"\n--- Page {page_num + 1} ---\n" |
|
text_content += page.extract_text() |
|
|
|
os.unlink(file_path) |
|
return f"PDF content extracted:\n{text_content}" |
|
except Exception as e: |
|
return f"Failed to process PDF: {str(e)}" |
|
|
|
file_download_tool = Tool( |
|
name="file_download", |
|
func=file_download_tool_func, |
|
description="Downloads and processes files associated with GAIA task IDs. Can handle images, PDFs, and text files." |
|
) |
|
|
|
def image_analysis_tool_func(image_path_or_description: str) -> str: |
|
"""Analyzes images for GAIA questions. For now, returns a placeholder.""" |
|
|
|
try: |
|
if os.path.exists(image_path_or_description): |
|
|
|
with Image.open(image_path_or_description) as img: |
|
width, height = img.size |
|
mode = img.mode |
|
format_info = img.format |
|
|
|
|
|
os.unlink(image_path_or_description) |
|
|
|
return f"Image analyzed: {width}x{height} pixels, mode: {mode}, format: {format_info}. Note: This is a basic analysis. For detailed image content analysis, a vision model would be needed." |
|
else: |
|
return f"Image analysis requested for: {image_path_or_description}. Note: Full image analysis requires a vision model integration." |
|
except Exception as e: |
|
return f"Image analysis failed: {str(e)}" |
|
|
|
image_analysis_tool = Tool( |
|
name="image_analysis", |
|
func=image_analysis_tool_func, |
|
description="Analyzes images to extract information. Use this for questions involving visual content." |
|
) |
|
|
|
def calculator_tool_func(expression: str) -> str: |
|
"""Performs mathematical calculations safely.""" |
|
try: |
|
|
|
allowed_chars = set('0123456789+-*/().= ') |
|
if not all(c in allowed_chars for c in expression): |
|
return f"Invalid characters in expression: {expression}" |
|
|
|
|
|
result = eval(expression) |
|
return f"Calculation result: {expression} = {result}" |
|
except Exception as e: |
|
return f"Calculation failed for '{expression}': {str(e)}" |
|
|
|
calculator_tool = Tool( |
|
name="calculator", |
|
func=calculator_tool_func, |
|
description="Performs mathematical calculations. Use this for numerical computations and math problems." |
|
) |
|
|
|
def text_processor_tool_func(text: str, operation: str = "summarize") -> str: |
|
"""Processes text for various operations like summarization, extraction, etc.""" |
|
try: |
|
if operation == "summarize": |
|
|
|
sentences = text.split('.') |
|
if len(sentences) > 5: |
|
summary = '. '.join(sentences[:2] + sentences[-2:]) |
|
return f"Text summary: {summary}" |
|
else: |
|
return f"Text (short enough to not need summarization): {text}" |
|
|
|
elif operation == "extract_numbers": |
|
import re |
|
numbers = re.findall(r'\d+(?:\.\d+)?', text) |
|
return f"Numbers found in text: {numbers}" |
|
|
|
elif operation == "extract_dates": |
|
import re |
|
|
|
date_patterns = [ |
|
r'\d{1,2}/\d{1,2}/\d{4}', |
|
r'\d{4}-\d{1,2}-\d{1,2}', |
|
r'\b\w+ \d{1,2}, \d{4}\b' |
|
] |
|
dates = [] |
|
for pattern in date_patterns: |
|
dates.extend(re.findall(pattern, text)) |
|
return f"Dates found in text: {dates}" |
|
|
|
else: |
|
return f"Text processing operation '{operation}' not supported. Available: summarize, extract_numbers, extract_dates" |
|
|
|
except Exception as e: |
|
return f"Text processing failed: {str(e)}" |
|
|
|
text_processor_tool = Tool( |
|
name="text_processor", |
|
func=text_processor_tool_func, |
|
description="Processes text for various operations like summarization, number extraction, date extraction. Specify operation as second parameter." |
|
) |
|
|
|
|
|
agent_tools = [ |
|
web_search_tool, |
|
file_download_tool, |
|
image_analysis_tool, |
|
calculator_tool, |
|
text_processor_tool |
|
] |
|
|