saranyapotturi's picture
Update agent.py
edc4deb verified
import os
from dotenv import load_dotenv
from typing import List, Dict, Any, Optional
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
from langgraph.prebuilt import ToolNode
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace
from langchain_core.tools import tool
from langchain_community.document_loaders import WikipediaLoader
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_tavily import TavilySearch
import tempfile
import pandas as pd
import numpy as np
import requests
from urllib.parse import urlparse
import uuid
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
import base64
import io
load_dotenv()
# ReAct System Prompt
REACT_SYSTEM_PROMPT = """You are a research assistant that uses ReAct (Reasoning + Acting) methodology. For each question, follow this systematic approach:
**THINK**: First, analyze the question carefully. What type of information do you need? What tools might help?
**ACT**: Use available tools to gather information. Search thoroughly and verify facts from multiple sources when possible.
**OBSERVE**: Analyze the results from your tools. Are they complete and reliable? Do you need more information?
**REASON**: Synthesize all information gathered. Check for consistency and identify any gaps or uncertainties.
**VERIFY**: Before providing your final answer, double-check your reasoning and ensure you have sufficient evidence.
For each question:
1. Break down what you're looking for
2. Use tools systematically to gather comprehensive information
3. Cross-reference information when possible
4. Be honest about limitations - if you cannot find reliable information, say so
5. Only provide confident answers when you have verified evidence
When you cannot access certain content (videos, audio, images without tools), clearly state this limitation.
Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER]
Your final answer should be:
- A number (without commas or units unless specified)
- As few words as possible for strings (no articles, no abbreviations for cities, spell out digits)
- A comma-separated list following the above rules for each element
Be thorough in your research but honest about uncertainty. Quality and accuracy are more important than speed.
"""
@tool
def multiply(a:int, b:int) -> int:
"""
Multiply two numbers
"""
return a * b
@tool
def add(a:int, b:int) -> int:
"""
Add two numbers
"""
return a + b
@tool
def subtract(a:int, b:int) -> int:
"""
Subtract two numbers
"""
return a - b
@tool
def divide(a:int, b:int) -> int:
"""
Divide two numbers
"""
return a / b
@tool
def wikidata_search(query: str) -> str:
"""
Search for information on Wikipedia and return maximum 2 results.
Args:
query: The search query.
"""
loader = WikipediaLoader(query=query, load_max_docs=2)
docs = loader.load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in docs
])
return {"wiki_results": formatted_search_docs}
# Initialize Tavily Search Tool
tavily_search_tool = TavilySearch(
max_results=3,
topic="general",
)
@tool
def load_youtube_transcript(url: str, add_video_info: bool = True, language: List[str] = ["en"], translation: str = "en") -> str:
"""
Load transcript from a YouTube video URL.
Args:
url: YouTube video URL
"""
try:
video_id = url.split("v=")[1]
ytt_api = YouTubeTranscriptApi()
docs = ytt_api.fetch(video_id)
return {"youtube_transcript": docs}
except Exception as e:
return f"Error loading YouTube transcript: {str(e)}"
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file and return the path.
Args:
content (str): the content to save to the file
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extract text from an image using OCR library pytesseract (if available).
Args:
image_path (str): the path to the image file.
"""
try:
# Open the image
image = Image.open(image_path)
# Extract text from the image
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text}"
except Exception as e:
return f"Error extracting text from image: {str(e)}"
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
"""
Analyze a CSV file using pandas and answer a question about it.
Args:
file_path (str): the path to the CSV file.
query (str): Question about the data
"""
try:
# Read the CSV file
df = pd.read_csv(file_path)
# Run various analyses based on the query
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyze an Excel file using pandas and answer a question about it.
Args:
file_path (str): the path to the Excel file.
query (str): Question about the data
"""
try:
# Read the Excel file
df = pd.read_excel(file_path)
# Run various analyses based on the query
result = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
)
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
### ============== IMAGE PROCESSING AND GENERATION TOOLS =============== ###
import os
import io
import base64
import uuid
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
# Helper functions for image processing
def encode_image(image_path: str) -> str:
"""Convert an image file to base64 string."""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def decode_image(base64_string: str) -> Image.Image:
"""Convert a base64 string to a PIL Image."""
image_data = base64.b64decode(base64_string)
return Image.open(io.BytesIO(image_data))
def save_image(image: Image.Image, directory: str = "image_outputs") -> str:
"""Save a PIL Image to disk and return the path."""
os.makedirs(directory, exist_ok=True)
image_id = str(uuid.uuid4())
image_path = os.path.join(directory, f"{image_id}.png")
image.save(image_path)
return image_path
@tool
def analyze_image(image_base64: str) -> Dict[str, Any]:
"""
Analyze basic properties of an image (size, mode, color analysis, thumbnail preview).
Args:
image_base64 (str): Base64 encoded image string
Returns:
Dictionary with analysis result
"""
try:
img = decode_image(image_base64)
width, height = img.size
mode = img.mode
if mode in ("RGB", "RGBA"):
arr = np.array(img)
avg_colors = arr.mean(axis=(0, 1))
dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])]
brightness = avg_colors.mean()
color_analysis = {
"average_rgb": avg_colors.tolist(),
"brightness": brightness,
"dominant_color": dominant,
}
else:
color_analysis = {"note": f"No color analysis for mode {mode}"}
thumbnail = img.copy()
thumbnail.thumbnail((100, 100))
thumb_path = save_image(thumbnail, "thumbnails")
thumbnail_base64 = encode_image(thumb_path)
return {
"dimensions": (width, height),
"mode": mode,
"color_analysis": color_analysis,
"thumbnail": thumbnail_base64,
}
except Exception as e:
return {"error": str(e)}
@tool
def transform_image(
image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Apply transformations: resize, rotate, crop, flip, brightness, contrast, blur, sharpen, grayscale.
Args:
image_base64 (str): Base64 encoded input image
operation (str): Transformation operation
params (Dict[str, Any], optional): Parameters for the operation
Returns:
Dictionary with transformed image (base64)
"""
try:
img = decode_image(image_base64)
params = params or {}
if operation == "resize":
img = img.resize(
(
params.get("width", img.width // 2),
params.get("height", img.height // 2),
)
)
elif operation == "rotate":
img = img.rotate(params.get("angle", 90), expand=True)
elif operation == "crop":
img = img.crop(
(
params.get("left", 0),
params.get("top", 0),
params.get("right", img.width),
params.get("bottom", img.height),
)
)
elif operation == "flip":
if params.get("direction", "horizontal") == "horizontal":
img = img.transpose(Image.FLIP_LEFT_RIGHT)
else:
img = img.transpose(Image.FLIP_TOP_BOTTOM)
elif operation == "adjust_brightness":
img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5))
elif operation == "adjust_contrast":
img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5))
elif operation == "blur":
img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2)))
elif operation == "sharpen":
img = img.filter(ImageFilter.SHARPEN)
elif operation == "grayscale":
img = img.convert("L")
else:
return {"error": f"Unknown operation: {operation}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"transformed_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def draw_on_image(
image_base64: str, drawing_type: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Draw shapes (rectangle, circle, line) or text onto an image.
Args:
image_base64 (str): Base64 encoded input image
drawing_type (str): Drawing type
params (Dict[str, Any]): Drawing parameters
Returns:
Dictionary with result image (base64)
"""
try:
img = decode_image(image_base64)
draw = ImageDraw.Draw(img)
color = params.get("color", "red")
if drawing_type == "rectangle":
draw.rectangle(
[params["left"], params["top"], params["right"], params["bottom"]],
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "circle":
x, y, r = params["x"], params["y"], params["radius"]
draw.ellipse(
(x - r, y - r, x + r, y + r),
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "line":
draw.line(
(
params["start_x"],
params["start_y"],
params["end_x"],
params["end_y"],
),
fill=color,
width=params.get("width", 2),
)
elif drawing_type == "text":
font_size = params.get("font_size", 20)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except IOError:
font = ImageFont.load_default()
draw.text(
(params["x"], params["y"]),
params.get("text", "Text"),
fill=color,
font=font,
)
else:
return {"error": f"Unknown drawing type: {drawing_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"result_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def generate_simple_image(
image_type: str,
width: int = 500,
height: int = 500,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Generate a simple image (gradient, noise, pattern, chart).
Args:
image_type (str): Type of image
width (int), height (int)
params (Dict[str, Any], optional): Specific parameters
Returns:
Dictionary with generated image (base64)
"""
try:
params = params or {}
if image_type == "gradient":
direction = params.get("direction", "horizontal")
start_color = params.get("start_color", (255, 0, 0))
end_color = params.get("end_color", (0, 0, 255))
img = Image.new("RGB", (width, height))
draw = ImageDraw.Draw(img)
if direction == "horizontal":
for x in range(width):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * x / width
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * x / width
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * x / width
)
draw.line([(x, 0), (x, height)], fill=(r, g, b))
else:
for y in range(height):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * y / height
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * y / height
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * y / height
)
draw.line([(0, y), (width, y)], fill=(r, g, b))
elif image_type == "noise":
noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
img = Image.fromarray(noise_array, "RGB")
else:
return {"error": f"Unsupported image_type {image_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
return {"generated_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def combine_images(
images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Combine multiple images (collage, stack, blend).
Args:
images_base64 (List[str]): List of base64 images
operation (str): Combination type
params (Dict[str, Any], optional)
Returns:
Dictionary with combined image (base64)
"""
try:
images = [decode_image(b64) for b64 in images_base64]
params = params or {}
if operation == "stack":
direction = params.get("direction", "horizontal")
if direction == "horizontal":
total_width = sum(img.width for img in images)
max_height = max(img.height for img in images)
new_img = Image.new("RGB", (total_width, max_height))
x = 0
for img in images:
new_img.paste(img, (x, 0))
x += img.width
else:
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
new_img = Image.new("RGB", (max_width, total_height))
y = 0
for img in images:
new_img.paste(img, (0, y))
y += img.height
else:
return {"error": f"Unsupported combination operation {operation}"}
result_path = save_image(new_img)
result_base64 = encode_image(result_path)
return {"combined_image": result_base64}
except Exception as e:
return {"error": str(e)}
@tool
def download_task_file(task_id: str, api_url: str = "https://agents-course-unit4-scoring.hf.space") -> str:
"""
Download a file associated with a task from the evaluation API.
Args:
task_id (str): The task ID to download the file for
api_url (str): The base API URL (defaults to the evaluation server)
"""
try:
# Construct the file download URL
file_url = f"{api_url}/files/{task_id}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filename = f"task_{task_id}.png" # Most files are images
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(file_url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"Task file downloaded to {filepath}. You can now analyze this file."
except Exception as e:
return f"Error downloading task file: {str(e)}"
tools = [multiply, add, subtract, divide, wikidata_search, tavily_search_tool, load_youtube_transcript, combine_images, analyze_image, transform_image, draw_on_image, generate_simple_image, analyze_csv_file, analyze_excel_file, save_and_read_file, download_file_from_url, extract_text_from_image, download_task_file]
def build_graph():
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY"))
llm_with_tools = llm.bind_tools(tools)
def agent_node(state: MessagesState) -> MessagesState:
"""This is the agent node with ReAct methodology"""
messages = state["messages"]
# Add system prompt if not already present
if not messages or not isinstance(messages[0], SystemMessage):
messages = [SystemMessage(content=REACT_SYSTEM_PROMPT)] + messages
return {"messages": [llm_with_tools.invoke(messages)]}
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")
return builder.compile()
class LangGraphAgent:
def __init__(self):
self.graph = build_graph()
print("LangGraphAgent initialized with tools.")
def __call__(self, question: str) -> str:
"""Run the agent on a question and return the answer"""
try:
messages = [HumanMessage(content=question)]
result = self.graph.invoke({"messages": messages})
for m in result["messages"]:
m.pretty_print()
return result["messages"][-1].content
except Exception as e:
return f"Error: {str(e)}"
if __name__ == "__main__":
agent = LangGraphAgent()
question = "The attached Excel file contains the sales of menu items for a local fast-food chain. What were the total sales that the chain made from food (not including drinks)? Express your answer in USD with two decimal places. task_id: 1234567890"
answer = agent(question)