Spaces:
Sleeping
Sleeping
from smolagents import Tool | |
import pandas as pd | |
import os | |
import tempfile | |
import requests | |
from urllib.parse import urlparse | |
import json | |
import re | |
from datetime import datetime, timedelta | |
class ReverseTextTool(Tool): | |
name = "reverse_text" | |
description = "Reverses the text in a string." | |
inputs = { | |
"text": { | |
"type": "string", | |
"description": "The text to reverse." | |
} | |
} | |
output_type = "string" | |
def forward(self, text: str) -> str: | |
return text[::-1] | |
class ExtractTextFromImageTool(Tool): | |
name = "extract_text_from_image" | |
description = "Extracts text from an image file using OCR." | |
inputs = { | |
"image_path": { | |
"type": "string", | |
"description": "Path to the image file." | |
} | |
} | |
output_type = "string" | |
def forward(self, image_path: str) -> str: | |
try: | |
# Try to import pytesseract | |
import pytesseract | |
from PIL import Image | |
# Open the image | |
image = Image.open(image_path) | |
# Try different configurations for better results | |
configs = [ | |
'--psm 6', # Assume a single uniform block of text | |
'--psm 3', # Automatic page segmentation, but no OSD | |
'--psm 1', # Automatic page segmentation with OSD | |
] | |
results = [] | |
for config in configs: | |
try: | |
text = pytesseract.image_to_string(image, config=config) | |
if text.strip(): | |
results.append(text) | |
except Exception: | |
continue | |
if results: | |
# Return the longest result, which is likely the most complete | |
return f"Extracted text from image:\n\n{max(results, key=len)}" | |
else: | |
return "No text could be extracted from the image." | |
except ImportError: | |
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." | |
except Exception as e: | |
return f"Error extracting text from image: {str(e)}" | |
class AnalyzeCSVTool(Tool): | |
name = "analyze_csv_file" | |
description = "Analyzes a CSV file and provides information about its contents." | |
inputs = { | |
"file_path": { | |
"type": "string", | |
"description": "Path to the CSV file." | |
}, | |
"query": { | |
"type": "string", | |
"description": "Optional query about the data.", | |
"default": "", | |
"nullable": True | |
} | |
} | |
output_type = "string" | |
def forward(self, file_path: str, query: str = "") -> str: | |
try: | |
# Read CSV file with different encodings if needed | |
for encoding in ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']: | |
try: | |
df = pd.read_csv(file_path, encoding=encoding) | |
break | |
except UnicodeDecodeError: | |
continue | |
else: | |
return "Error: Could not read the CSV file with any of the attempted encodings." | |
# Basic information | |
result = f"CSV file has {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# If there's a specific query | |
if query: | |
if "count" in query.lower(): | |
result += f"Row count: {len(df)}\n" | |
# Look for column-specific queries | |
for col in df.columns: | |
if col.lower() in query.lower(): | |
result += f"\nColumn '{col}' information:\n" | |
if pd.api.types.is_numeric_dtype(df[col]): | |
result += f"Min: {df[col].min()}\n" | |
result += f"Max: {df[col].max()}\n" | |
result += f"Mean: {df[col].mean()}\n" | |
result += f"Median: {df[col].median()}\n" | |
else: | |
# For categorical data | |
value_counts = df[col].value_counts().head(10) | |
result += f"Unique values: {df[col].nunique()}\n" | |
result += f"Top values:\n{value_counts.to_string()}\n" | |
# General statistics for all columns | |
else: | |
# For numeric columns | |
numeric_cols = df.select_dtypes(include=['number']).columns | |
if len(numeric_cols) > 0: | |
result += "Numeric columns statistics:\n" | |
result += df[numeric_cols].describe().to_string() | |
result += "\n\n" | |
# For categorical columns, show counts of unique values | |
cat_cols = df.select_dtypes(exclude=['number']).columns | |
if len(cat_cols) > 0: | |
result += "Categorical columns:\n" | |
for col in cat_cols[:5]: # Limit to first 5 columns | |
result += f"- {col}: {df[col].nunique()} unique values\n" | |
return result | |
except Exception as e: | |
return f"Error analyzing CSV file: {str(e)}" | |
class AnalyzeExcelTool(Tool): | |
name = "analyze_excel_file" | |
description = "Analyzes an Excel file and provides information about its contents." | |
inputs = { | |
"file_path": { | |
"type": "string", | |
"description": "Path to the Excel file." | |
}, | |
"query": { | |
"type": "string", | |
"description": "Optional query about the data.", | |
"default": "", | |
"nullable": True | |
}, | |
"sheet_name": { | |
"type": "string", | |
"description": "Name of the sheet to analyze (defaults to first sheet).", | |
"default": None, | |
"nullable": True | |
} | |
} | |
output_type = "string" | |
def forward(self, file_path: str, query: str = "", sheet_name: str = None) -> str: | |
try: | |
# Read sheet names first | |
excel_file = pd.ExcelFile(file_path) | |
sheet_names = excel_file.sheet_names | |
# Info about all sheets | |
result = f"Excel file contains {len(sheet_names)} sheets: {', '.join(sheet_names)}\n\n" | |
# If sheet name is specified, use it; otherwise use first sheet | |
if sheet_name is None: | |
sheet_name = sheet_names[0] | |
elif sheet_name not in sheet_names: | |
return f"Error: Sheet '{sheet_name}' not found. Available sheets: {', '.join(sheet_names)}" | |
# Read the specified sheet | |
df = pd.read_excel(file_path, sheet_name=sheet_name) | |
# Basic information | |
result += f"Sheet '{sheet_name}' has {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# Handle query similar to CSV tool | |
if query: | |
if "count" in query.lower(): | |
result += f"Row count: {len(df)}\n" | |
# Look for column-specific queries | |
for col in df.columns: | |
if col.lower() in query.lower(): | |
result += f"\nColumn '{col}' information:\n" | |
if pd.api.types.is_numeric_dtype(df[col]): | |
result += f"Min: {df[col].min()}\n" | |
result += f"Max: {df[col].max()}\n" | |
result += f"Mean: {df[col].mean()}\n" | |
result += f"Median: {df[col].median()}\n" | |
else: | |
# For categorical data | |
value_counts = df[col].value_counts().head(10) | |
result += f"Unique values: {df[col].nunique()}\n" | |
result += f"Top values:\n{value_counts.to_string()}\n" | |
else: | |
# For numeric columns | |
numeric_cols = df.select_dtypes(include=['number']).columns | |
if len(numeric_cols) > 0: | |
result += "Numeric columns statistics:\n" | |
result += df[numeric_cols].describe().to_string() | |
result += "\n\n" | |
# For categorical columns, show counts of unique values | |
cat_cols = df.select_dtypes(exclude=['number']).columns | |
if len(cat_cols) > 0: | |
result += "Categorical columns:\n" | |
for col in cat_cols[:5]: # Limit to first 5 columns | |
result += f"- {col}: {df[col].nunique()} unique values\n" | |
return result | |
except Exception as e: | |
return f"Error analyzing Excel file: {str(e)}" | |
class DateCalculatorTool(Tool): | |
name = "date_calculator" | |
description = "Performs date calculations like adding days, formatting dates, etc." | |
inputs = { | |
"query": { | |
"type": "string", | |
"description": "The date calculation to perform (e.g., 'What day is 10 days from today?', 'Format 2023-05-15 as MM/DD/YYYY')" | |
} | |
} | |
output_type = "string" | |
def forward(self, query: str) -> str: | |
try: | |
# Get current date/time | |
if re.search(r'(today|now|current date|current time)', query, re.IGNORECASE): | |
now = datetime.now() | |
if 'time' in query.lower(): | |
return f"Current date and time: {now.strftime('%Y-%m-%d %H:%M:%S')}" | |
else: | |
return f"Today's date: {now.strftime('%Y-%m-%d')}" | |
# Add days to a date | |
add_match = re.search(r'(what|when).+?(\d+)\s+(day|days|week|weeks|month|months|year|years)\s+(from|after)\s+(.+)', query, re.IGNORECASE) | |
if add_match: | |
amount = int(add_match.group(2)) | |
unit = add_match.group(3).lower() | |
date_text = add_match.group(5).strip() | |
# Parse the date | |
if date_text.lower() in ['today', 'now']: | |
base_date = datetime.now() | |
else: | |
try: | |
# Try various date formats | |
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y']: | |
try: | |
base_date = datetime.strptime(date_text, fmt) | |
break | |
except ValueError: | |
continue | |
else: | |
return f"Could not parse date: {date_text}" | |
except Exception as e: | |
return f"Error parsing date: {e}" | |
# Calculate new date | |
if 'day' in unit: | |
new_date = base_date + timedelta(days=amount) | |
elif 'week' in unit: | |
new_date = base_date + timedelta(weeks=amount) | |
elif 'month' in unit: | |
# Simplified month calculation | |
new_month = base_date.month + amount | |
new_year = base_date.year + (new_month - 1) // 12 | |
new_month = ((new_month - 1) % 12) + 1 | |
new_date = base_date.replace(year=new_year, month=new_month) | |
elif 'year' in unit: | |
new_date = base_date.replace(year=base_date.year + amount) | |
return f"Date {amount} {unit} from {base_date.strftime('%Y-%m-%d')} is {new_date.strftime('%Y-%m-%d')}" | |
# Format a date | |
format_match = re.search(r'format\s+(.+?)\s+as\s+(.+)', query, re.IGNORECASE) | |
if format_match: | |
date_text = format_match.group(1).strip() | |
format_spec = format_match.group(2).strip() | |
# Parse the date | |
if date_text.lower() in ['today', 'now']: | |
date_obj = datetime.now() | |
else: | |
try: | |
# Try various date formats | |
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y']: | |
try: | |
date_obj = datetime.strptime(date_text, fmt) | |
break | |
except ValueError: | |
continue | |
else: | |
return f"Could not parse date: {date_text}" | |
except Exception as e: | |
return f"Error parsing date: {e}" | |
# Convert format specification to strftime format | |
format_mapping = { | |
'YYYY': '%Y', | |
'YY': '%y', | |
'MM': '%m', | |
'DD': '%d', | |
'HH': '%H', | |
'mm': '%M', | |
'ss': '%S' | |
} | |
strftime_format = format_spec | |
for key, value in format_mapping.items(): | |
strftime_format = strftime_format.replace(key, value) | |
return f"Formatted date: {date_obj.strftime(strftime_format)}" | |
return "I couldn't understand the date calculation query." | |
except Exception as e: | |
return f"Error performing date calculation: {str(e)}" | |
class DownloadFileTool(Tool): | |
name = "download_file" | |
description = "Downloads a file from a URL and saves it locally." | |
inputs = { | |
"url": { | |
"type": "string", | |
"description": "The URL to download from." | |
}, | |
"filename": { | |
"type": "string", | |
"description": "Optional filename to save as (default: derived from URL).", | |
"default": None, | |
"nullable": True | |
} | |
} | |
output_type = "string" | |
def forward(self, url: str, filename: str = None) -> str: | |
try: | |
# Parse URL to get filename if not provided | |
if not filename: | |
path = urlparse(url).path | |
filename = os.path.basename(path) | |
if not filename: | |
# Generate a random name if we couldn't extract one | |
import uuid | |
filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
# Create temporary file | |
temp_dir = tempfile.gettempdir() | |
filepath = os.path.join(temp_dir, filename) | |
# Download the file | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
# Save the file | |
with open(filepath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return f"File downloaded to {filepath}. You can now analyze this file." | |
except Exception as e: | |
return f"Error downloading file: {str(e)}" |