Princekumar
Fixing lost tool code
94f2387
import os
from typing import ClassVar
from smolagents import Tool
import datetime
from PIL import Image
import pandas as pd
from prompts import SYSTEM_PROMPT
from PIL import Image
from smolagents import WikipediaSearchTool
from dotenv import load_dotenv
import time
from langchain_google_community import GoogleSearchAPIWrapper
from google import genai
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.document_loaders import ArxivLoader
load_dotenv()
class AnyTypeFileAnalyzerTool(Tool):
name: ClassVar[str] = "any_type_file_analyzer_tool"
description: ClassVar[str] = (
"Analyze an image or audio mp3 file using Gemini. Supports jpg, png, and mp3."
)
inputs: ClassVar[dict] = {
"analysis_description": {
"type": "string",
"description": "Describe what you want to analyze from the file",
},
"file_path": {"type": "string", "description": "Path to image/audio file"},
}
output_type: ClassVar[str] = "string"
def forward(self, analysis_description: str, file_path: str):
client = genai.Client(
api_key=os.getenv("GEMINI_API_KEY"),
)
try:
file = client.files.upload(file=file_path)
except Exception as e:
return f"Error uploading file: {e}"
try:
full_description = SYSTEM_PROMPT + f"\n\n{analysis_description}"
GENAI_MODEL = os.getenv("GENAI_MODEL")
response = client.models.generate_content(
model=GENAI_MODEL,
contents=[full_description, file],
)
return response.text
except Exception as e:
return f"Error from Gemini: {e}"
def _get_mime_type(self, file_path: str) -> str:
if file_path.endswith(".png"):
return "image/png"
elif file_path.endswith(".jpg") or file_path.endswith(".jpeg"):
return "image/jpeg"
elif file_path.endswith(".mp3"):
return "audio/mpeg"
else:
raise ValueError(
"Unsupported file type: only .png, .jpg, .jpeg, .mp3 are supported"
)
class CodeFileReadTool(Tool):
name: ClassVar[str] = "read_code_file"
description: ClassVar[str] = (
"Read a code or text file (Python, JavaScript, Java, HTML, CSS, etc.) and return its content as a formatted code block with the correct language extension."
)
inputs: ClassVar[dict] = {
"file_path": {"type": "string", "description": "Path to the code or text file"},
}
output_type: ClassVar[str] = "string"
def forward(self, file_path: str):
try:
# Detect extension and map to language
ext = os.path.splitext(file_path)[-1].lower()
ext_to_lang = {
".py": "python",
".js": "javascript",
".java": "java",
".html": "html",
".css": "css",
".json": "json",
".txt": "",
".md": "markdown",
".c": "c",
".cpp": "cpp",
".ts": "typescript",
".sh": "bash",
".xml": "xml",
".yml": "yaml",
".yaml": "yaml",
}
lang = ext_to_lang.get(ext, "")
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Format as markdown code block
if lang:
return f"```{lang}\n{content}\n```"
else:
return f"```\n{content}\n```"
except Exception as e:
return f"Error reading file: {e}"
class ExcelAndCSVTableInspectorTool(Tool):
name: ClassVar[str] = "excel_csv_file_analyzer"
description: ClassVar[str] = (
"Load a CSV or Excel file and return table info and summary stats in Markdown format."
)
inputs: ClassVar[dict] = {
"file_path": {
"type": "string",
"description": "Path to CSV or Excel file (.csv, .xls, .xlsx)",
}
}
output_type: ClassVar[str] = "string"
def forward(self, file_path: str):
try:
if file_path.endswith(".csv"):
df = pd.read_csv(file_path)
elif file_path.endswith(".xls") or file_path.endswith(".xlsx"):
df = pd.read_excel(file_path)
else:
return "Unsupported file type. Only CSV and Excel (.xls/.xlsx) are supported."
md = df.to_markdown(index=False, tablefmt="pipe")
return md
except Exception as e:
return f"Error loading file: {str(e)}"
class YouTubeVideoAnalyzerTool(Tool):
name: ClassVar[str] = "youtube_video_analyzer"
description: ClassVar[str] = (
"Given a YouTube URL, analyzes the video content using Gemini (google-genai SDK) and answers user queries about it."
)
inputs: ClassVar[dict] = {
"url": {"type": "string", "description": "Full YouTube video URL"},
"user_prompt": {
"type": "string",
"description": "What you want to analyze from the video content",
},
}
output_type: ClassVar[str] = "string"
def forward(self, url: str, user_prompt: str):
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
return "Error: GEMINI_API_KEY environment variable is not set."
try:
genai.configure(api_key=api_key)
model_name = os.getenv("GENAI_MODEL", "gemini-1.5-pro")
model = genai.GenerativeModel(model_name)
prompt = f"""
You are an AI video analyzer. A user wants to analyze the following YouTube video.
### Video URL
{url}
### User Request
{user_prompt}
### Instructions:
- Analyze the video content based on the user's request.
- Identify the main key thing needed to be analyzed.
- Provide the answer as per system prompt.
"""
response = model.generate_content([prompt])
return response.text if hasattr(response, "text") else str(response)
except Exception as e:
return f"Error analyzing video with Gemini: {e}"
# --- Math Tools ---
class CalculatorTool(Tool):
name: ClassVar[str] = "calculator"
description: ClassVar[str] = (
"Evaluate a basic mathematical expression (supports +, -, *, /, **, %, etc.)."
)
inputs: ClassVar[dict] = {
"expression": {
"type": "string",
"description": "A mathematical expression to evaluate",
}
}
output_type: ClassVar[str] = "number"
def forward(self, expression: str):
# Safely evaluate the expression using ast
import ast, operator
# Allowed node types
allowed_nodes = {
ast.Expression,
ast.BinOp,
ast.UnaryOp,
ast.Num,
ast.Constant,
ast.Add,
ast.Sub,
ast.Mult,
ast.Div,
ast.Pow,
ast.Mod,
ast.USub,
ast.UAdd,
}
node = ast.parse(expression, mode="eval")
for subnode in ast.walk(node):
if type(subnode) not in allowed_nodes:
raise ValueError(f"Unsafe or unsupported expression: {expression}")
return eval(compile(node, "<string>", "eval"))
# Optionally, separate basic operations could be defined (e.g., add, subtract).
class AddTool(Tool):
name: ClassVar[str] = "add"
description: ClassVar[str] = "Add two numbers together."
inputs: ClassVar[dict] = {
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"},
}
output_type: ClassVar[str] = "number"
def forward(self, a: float, b: float):
return a + b
class MultiplyTool(Tool):
name: ClassVar[str] = "multiply"
description: ClassVar[str] = "Multiply two numbers."
inputs: ClassVar[dict] = {
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"},
}
output_type: ClassVar[str] = "number"
def forward(self, a: float, b: float):
return a * b
# --- Date/Time Tools ---
class DayOfWeekTool(Tool):
name: ClassVar[str] = "day_of_week"
description: ClassVar[str] = "Return the day of week for a given date (YYYY-MM-DD)."
inputs: ClassVar[dict] = {
"date": {"type": "string", "description": "Date in format YYYY-MM-DD"}
}
output_type: ClassVar[str] = "string"
def forward(self, date: str):
year, month, day = map(int, date.split("-"))
dow = datetime.date(year, month, day).strftime("%A")
return dow
class AddDaysTool(Tool):
name: ClassVar[str] = "add_days"
description: ClassVar[str] = "Add a number of days to a date (YYYY-MM-DD)."
inputs: ClassVar[dict] = {
"date": {"type": "string", "description": "Start date (YYYY-MM-DD)"},
"days": {"type": "integer", "description": "Number of days to add"},
}
output_type: ClassVar[str] = "string"
def forward(self, date: str, days: int):
year, month, day = map(int, date.split("-"))
new_date = datetime.date(year, month, day) + datetime.timedelta(days=days)
return new_date.isoformat()
class DateDiffTool(Tool):
name: ClassVar[str] = "date_diff"
description: ClassVar[str] = (
"Compute difference in days between two dates (YYYY-MM-DD)."
)
inputs: ClassVar[dict] = {
"start_date": {"type": "string", "description": "First date (YYYY-MM-DD)"},
"end_date": {"type": "string", "description": "Second date (YYYY-MM-DD)"},
}
output_type: ClassVar[str] = "integer"
def forward(self, start_date: str, end_date: str):
y1, m1, d1 = map(int, start_date.split("-"))
y2, m2, d2 = map(int, end_date.split("-"))
d0 = datetime.date(y1, m1, d1)
d1 = datetime.date(y2, m2, d2)
return abs((d1 - d0).days)
# --- Unit Conversion Tools ---
class TempConvertTool(Tool):
name: ClassVar[str] = "convert_temperature"
description: ClassVar[str] = "Convert temperature between Celsius and Fahrenheit."
inputs: ClassVar[dict] = {
"value": {"type": "number", "description": "Temperature value to convert"},
"from_unit": {"type": "string", "description": "Unit of input ('C' or 'F')"},
}
output_type: ClassVar[str] = "number"
def forward(self, value: float, from_unit: str):
unit = from_unit.strip().upper()
if unit == "C":
# Celsius to Fahrenheit
return value * 9 / 5 + 32
elif unit == "F":
# Fahrenheit to Celsius
return (value - 32) * 5 / 9
else:
raise ValueError("Unit must be 'C' or 'F'.")
class LengthConvertTool(Tool):
name: ClassVar[str] = "convert_length"
description: ClassVar[str] = (
"Convert length between kilometers, miles, meters, and feet."
)
inputs: ClassVar[dict] = {
"value": {"type": "number", "description": "Length value to convert"},
"from_unit": {
"type": "string",
"description": "Original unit ('km','mi','m','ft')",
},
"to_unit": {
"type": "string",
"description": "Target unit ('km','mi','m','ft')",
},
}
output_type: ClassVar[str] = "number"
def forward(self, value: float, from_unit: str, to_unit: str):
u1 = from_unit.lower()
u2 = to_unit.lower()
# Convert input to meters first
if u1 == "km":
meters = value * 1000
elif u1 == "m":
meters = value
elif u1 == "mi":
meters = value * 1609.34
elif u1 == "ft":
meters = value * 0.3048
else:
raise ValueError("Unsupported from_unit")
# Convert meters to target unit
if u2 == "km":
return meters / 1000
if u2 == "m":
return meters
if u2 == "mi":
return meters / 1609.34
if u2 == "ft":
return meters / 0.3048
raise ValueError("Unsupported to_unit")
# --- Text Tools ---
class WordCountTool(Tool):
name: ClassVar[str] = "word_count"
description: ClassVar[str] = "Count the number of words in a text string."
inputs: ClassVar[dict] = {"text": {"type": "string", "description": "Input text"}}
output_type: ClassVar[str] = "integer"
def forward(self, text: str):
return len(text.split())
class FindTextTool(Tool):
name: ClassVar[str] = "find_text"
description: ClassVar[str] = (
"Find occurrences of a substring in a text; returns count."
)
inputs: ClassVar[dict] = {
"text": {"type": "string", "description": "Text to search in"},
"query": {"type": "string", "description": "Substring to search for"},
}
output_type: ClassVar[str] = "integer"
def forward(self, text: str, query: str):
return text.count(query)
# --- List/Sequence Tools ---
class SortListTool(Tool):
name: ClassVar[str] = "sort_list"
description: ClassVar[str] = "Sort a list of items (numbers or strings)."
inputs: ClassVar[dict] = {
"items": {"type": "array", "description": "List of items to sort"}
}
output_type: ClassVar[str] = "array"
def forward(self, items):
return sorted(items)
class UniqueListTool(Tool):
name: ClassVar[str] = "unique_list"
description: ClassVar[str] = (
"Return a list with duplicate items removed (preserving order)."
)
inputs: ClassVar[dict] = {
"items": {"type": "array", "description": "List of items"}
}
output_type: ClassVar[str] = "array"
def forward(self, items):
seen = []
for x in items:
if x not in seen:
seen.append(x)
return seen
# --- File I/O Tools ---
class ReadFileTool(Tool):
name: ClassVar[str] = "read_file"
description: ClassVar[str] = "Read and return the contents of a text file."
inputs: ClassVar[dict] = {
"file_path": {"type": "string", "description": "Path to a text file"}
}
output_type: ClassVar[str] = "string"
def forward(self, file_path: str):
try:
with open(file_path, "r") as f:
return f.read()
except FileNotFoundError:
return f"Error: File not found: {file_path}"
class WriteFileTool(Tool):
name: ClassVar[str] = "write_file"
description: ClassVar[str] = "Write a string to a text file (overwrites if exists)."
inputs: ClassVar[dict] = {
"file_path": {"type": "string", "description": "Path to write the file"},
"content": {"type": "string", "description": "Content to write"},
}
output_type: ClassVar[str] = "string"
def forward(self, file_path: str, content: str):
with open(file_path, "w") as f:
f.write(content)
return f"Wrote to {file_path}"
# --- Image Tool (stub) ---
class ImageInfoTool(Tool):
name: ClassVar[str] = "image_info"
description: ClassVar[str] = "Load an image and report basic info (size and mode)."
inputs: ClassVar[dict] = {
"image_path": {"type": "string", "description": "Path to an image file"}
}
output_type: ClassVar[str] = "string"
def forward(self, image_path: str):
try:
img = Image.open(image_path)
return f"Image {image_path}: size={img.size}, mode={img.mode}"
except Exception as e:
return f"Error loading image: {e}"
class WikipediaCustomTool(Tool):
name: ClassVar[str] = "wikipedia_search_summary"
description: ClassVar[str] = "Search Wikipedia and return max 3 results"
inputs: ClassVar[dict] = {
"query": {
"type": "string",
"description": "Search Query for Wikipedia",
}
}
output_type: ClassVar[str] = "string"
def forward(self, query: str) -> str:
try:
search_docs = WikipediaLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"wiki_results": formatted_search_docs}
except Exception as e:
return f"Error using Wikipedia API: {e}"
class ArvixSearchTool(Tool):
name: ClassVar[str] = "arvix_search"
description: ClassVar[str] = "Search Arvix for a query and return maximum 3 result"
inputs: ClassVar[dict] = {
"query": {
"type": "string",
"description": "Search Query for Arvix",
}
}
output_type: ClassVar[str] = "string"
def forward(self, query: str) -> str:
try:
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
return {"arvix_results": formatted_search_docs}
except Exception as e:
return f"Error using Arvix Tool: {e}"
class GoogleSearchTool(Tool):
name: ClassVar[str] = "google_search"
description: ClassVar[str] = (
"Search the web using Google Search Engine and return results"
)
inputs: ClassVar[dict] = {
"query": {
"type": "string",
"description": "Search term to find information on Web",
}
}
output_type: ClassVar[str] = "string"
def forward(self, query: str) -> str:
try:
# Initialize Google Search API Wrapper
google_search = GoogleSearchAPIWrapper(
google_api_key=os.getenv("GOOGLE_API_KEY"),
google_cse_id=os.getenv("GOOGLE_CSE_ID"),
)
# Perform the search
results = google_search.results(query, num_results=5)
if not results:
return f"No results found for: '{query}'"
formatted = "\n\n".join(
f"{i+1}. **{r['title']}**\n{r['link']}\n{r['snippet']}"
for i, r in enumerate(results)
)
return f"**Search Results for '{query}':**\n\n{formatted}"
except Exception as e:
return f"Error using Google Search API: {e}"
class AdvanceGoogleAISearchTool(Tool):
name: ClassVar[str] = "google_ai_search"
description: ClassVar[str] = (
"Search the web using Google AI Search Engine and return results"
)
inputs: ClassVar[dict] = {
"query": {
"type": "string",
"description": "Search term to find information on Web",
}
}
output_type: ClassVar[str] = "string"
def forward(self, query: str) -> str:
try:
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
response = client.models.generate_content(
model=os.getenv("GENAI_MODEL"),
contents=[query],
)
if not response:
return f"No results found for: '{query}'"
return f"**Search Results for '{query}':**\n\n{response.text}"
except Exception as e:
return f"Error using Google AI Search API: {e}"
# List of all available tools
agent_tools = [
AnyTypeFileAnalyzerTool(),
ExcelAndCSVTableInspectorTool(),
YouTubeVideoAnalyzerTool(),
CalculatorTool(),
AddTool(),
MultiplyTool(),
DayOfWeekTool(),
AddDaysTool(),
DateDiffTool(),
TempConvertTool(),
LengthConvertTool(),
WordCountTool(),
FindTextTool(),
SortListTool(),
UniqueListTool(),
GoogleSearchTool(),
WikipediaCustomTool(),
ArvixSearchTool(),
AdvanceGoogleAISearchTool(),
]