naman1102's picture
simplify
2871b51
raw
history blame
14.6 kB
# tools.py
import pandas as pd
from pathlib import Path
import requests
import regex as re
import time
import os
from duckduckgo_search import DDGS
from langchain_core.tools import tool
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# Removed complex safety wrapper - keeping things simple
def _download_file_for_task(task_id: str, ext: str) -> str:
"""
Helper: attempt to GET the remote file for a given task_id.
Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful,
or an empty string if no file / download failed.
"""
print("reached _download_file_for_task")
os.makedirs("hf_files", exist_ok=True)
local_path = os.path.join("hf_files", f"{task_id}.{ext}")
url = f"{DEFAULT_API_URL}/files/{task_id}"
try:
resp = requests.get(url, timeout=10)
if resp.status_code == 200 and resp.content:
print(f"Downloaded file from {url} to {local_path}")
with open(local_path, "wb") as f:
f.write(resp.content)
return local_path
except Exception:
print(f"Error downloading file from {url} to {local_path}")
pass
# If we get here, either 404 or download error
return ""
@tool
def image_tool(task_id: str) -> str:
"""
Expects: task_id (str) — a valid image task ID.
Returns: image caption from Hugging Face API or error message.
"""
import requests, os
# Try downloading image with one of the allowed extensions
for ext in ("png", "jpg", "jpeg"):
file_path = _download_file_for_task(task_id, ext)
if file_path and os.path.exists(file_path):
break
else:
return f"Error: Image file for task_id '{task_id}' not found."
# Read the image bytes
try:
with open(file_path, "rb") as f:
image_bytes = f.read()
except Exception as e:
return f"Error reading image: {str(e)}"
# Load HF token
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return "Error: HF_TOKEN not set in environment."
# Use a single reliable model
model = "Salesforce/blip-image-captioning-base"
headers = {"Authorization": f"Bearer {hf_token}"}
try:
response = requests.post(
f"https://api-inference.huggingface.co/models/{model}",
headers=headers,
files={"file": image_bytes},
timeout=30
)
except Exception as e:
return f"Error calling HuggingFace API: {e}"
# Parse response
if response.status_code != 200:
return f"Error from model ({model}): {response.status_code} - {response.text}"
try:
result = response.json()
if isinstance(result, list) and result:
caption = result[0].get("generated_text", "").strip()
elif isinstance(result, dict):
caption = result.get("generated_text", "").strip()
else:
caption = ""
except Exception as e:
return f"Error parsing response: {e}"
if not caption:
return "No caption generated by model."
return f"Image Caption:\n{caption}"
@tool
def excel_tool(task_id: str) -> str:
"""
Downloads <task_id>.xlsx (if any) and returns a stringified list of
records from the specified sheet. No fallback to user-supplied tables.
Expected keys in `task_id`:
• task_id – required (used to download the file)
returns: stringified list of records from the specified sheet
"""
print("reached excel_tool")
sheet = "Sheet1"
local_xlsx = _download_file_for_task(task_id, "xlsx")
if not local_xlsx or not os.path.exists(local_xlsx):
return "Error: Excel file not found for this task."
try:
xls = pd.ExcelFile(local_xlsx)
df = pd.read_excel(
xls,
sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0]
)
print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}")
return str(df.to_dict(orient="records"))
except Exception as e:
return f"Error reading Excel file: {e}"
import openai
@tool
def audio_transcriber_tool(task_id: str) -> str:
"""
LangGraph tool for transcribing audio via OpenAI's Whisper API.
Expects: task_id is a string
Returns:
"<text or error message>"
Always attempts to download the file for the given path or task ID.
"""
print("reached audio_transcriber_tool")
# Always attempt to download the file, regardless of local existence
local_audio = ""
for ext in ("mp3", "wav", "m4a"):
candidate = _download_file_for_task(task_id, ext)
if candidate:
local_audio = candidate
break
if not local_audio or not os.path.exists(local_audio):
return "Error: No audio file found (download failed)."
# Send to OpenAI Whisper
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(local_audio, "rb") as audio_file:
print("reached openai.audio.transcriptions.create")
response = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
)
print("reached response")
text = response.text.strip()
except Exception as e:
text = f"Error during transcription: {e}"
print(f"Transcripted as transcript: {text}")
return text
# tools.py
import re
import requests
@tool
def wikipedia_search_tool(wiki_query: str) -> str:
"""
Searches Wikipedia for the given query and returns the first 5 pages.
Expects: wiki_query is a non‐empty string.
Returns: text summary of first matching page or an error message>"
If no valid wiki_query is provided, returns {}.
"""
print(f"DEBUG: reached wikipedia_search_tool with query: {wiki_query}")
try:
docs = WikipediaLoader(query=wiki_query, load_max_docs=3).load() # Reduced from 5 to 3
print(f"DEBUG: WikipediaLoader returned {len(docs)} documents")
result = ""
counter = 1
for doc in docs:
print(f"DEBUG: Processing Wikipedia document {counter}")
print(f"DEBUG: Document metadata: {doc.metadata}")
print(f"DEBUG: Document content length: {len(doc.page_content)}")
# Handle different metadata structures
title = "Unknown Title"
if hasattr(doc, 'metadata') and doc.metadata:
# Try different possible title keys
if 'title' in doc.metadata:
title = doc.metadata['title']
elif 'Title' in doc.metadata:
title = doc.metadata['Title']
elif 'source' in doc.metadata:
title = doc.metadata['source']
else:
# Use first available key as title
if doc.metadata:
first_key = list(doc.metadata.keys())[0]
title = f"Wikipedia: {doc.metadata[first_key]}"
print(f"DEBUG: Using Wikipedia title: {title}")
# Trim content to key information only (reduced from 2000 to 800 characters)
content = doc.page_content[:800] if len(doc.page_content) > 800 else doc.page_content
# Add document but keep it concise
result += f"\n\nWikipedia Result {counter}: {title}\nSummary: {content}..."
counter += 1
# Stop after 2 documents to keep response manageable
if counter > 2:
break
if not result.strip():
return "No Wikipedia results found for the given query. [END_OF_SEARCH]"
# Add clear end marker
result += "\n\n[END_OF_SEARCH] - Wikipedia search complete. Use this information to answer the question."
print(f"DEBUG: Final Wikipedia result length: {len(result)}")
return result
except Exception as e:
error_msg = f"Error during Wikipedia search: {str(e)} [END_OF_SEARCH]"
print(f"DEBUG: {error_msg}")
return error_msg
@tool
def arxiv_search_tool(arxiv_query: str) -> str:
"""
Searches Arxiv for the given query and returns the first 5 pages.
Expects: arxiv_query is a non‐empty string.
Returns: text summary of first matching page or an error message>"
"""
print(f"DEBUG: reached arxiv_search_tool with query: {arxiv_query}")
try:
docs = ArxivLoader(query=arxiv_query, load_max_docs=3).load() # Reduced from 5 to 3
print(f"DEBUG: ArxivLoader returned {len(docs)} documents")
result = ""
counter = 1
for doc in docs:
print(f"DEBUG: Processing document {counter}")
print(f"DEBUG: Document metadata: {doc.metadata}")
print(f"DEBUG: Document content length: {len(doc.page_content)}")
# Handle different metadata structures
title = "Unknown Title"
if hasattr(doc, 'metadata') and doc.metadata:
# Try different possible title keys
if 'title' in doc.metadata:
title = doc.metadata['title']
elif 'Title' in doc.metadata:
title = doc.metadata['Title']
elif 'entry_id' in doc.metadata:
title = doc.metadata['entry_id']
elif 'summary' in doc.metadata:
title = f"ArXiv Paper {counter}"
else:
# Use first available key as title
if doc.metadata:
first_key = list(doc.metadata.keys())[0]
title = f"{first_key}: {doc.metadata[first_key]}"
print(f"DEBUG: Using title: {title}")
# Trim content to key information only (reduced from 2000 to 800 characters)
content = doc.page_content[:800] if len(doc.page_content) > 800 else doc.page_content
# Add document but keep it concise
result += f"\n\nArXiv Result {counter}: {title}\nAbstract/Summary: {content}..."
counter += 1
# Stop after 2 documents to keep response manageable
if counter > 2:
break
if not result.strip():
return "No ArXiv results found for the given query. [END_OF_SEARCH]"
# Add clear end marker
result += "\n\n[END_OF_SEARCH] - ArXiv search complete. Use this information to answer the question."
print(f"DEBUG: Final ArXiv result length: {len(result)}")
return result
except Exception as e:
error_msg = f"Error during Arxiv search: {str(e)} [END_OF_SEARCH]"
print(f"DEBUG: {error_msg}")
return error_msg
from langchain_openai import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage
LLM = ChatOpenAI(model_name="gpt-4.1-mini", temperature=0.2)
@tool
def analyze_code_tool(task_id: str) -> str:
"""
Either task_id OR (file + task_id)
Reads the code (max 400 lines / 10 kB) and asks the LLM for:
• plain-language summary
• list of key functions/classes
• obvious bugs or style smells
Returns that analysis as a string.
"""
print("reached analyze_code_tool")
code_txt = ""
if not task_id:
code_txt = "No code provided."
else:
path = _download_file_for_task(task_id, "py")
if not path:
return "Error: .py file not found for this task."
code_txt = Path(path).read_text(encoding="utf-8", errors="ignore")
# else:
# return "Error: neither snippet nor file provided."
# Truncate for safety
lines = code_txt.splitlines()[:400]
code_sample = "\n".join(lines)[:10_000]
prompt = [
SystemMessage(content="You are a senior Python code reviewer."),
HumanMessage(content=(
"Please analyse the following code. "
"Summarise what it does, list key functions/classes, "
"and point out any obvious bugs, performance issues or style problems.\n\n"
f"```python\n{code_sample}\n```"
"If you can then find the output of the code and return it in the output."
))
]
return LLM.invoke(prompt).content.strip()
# def web_search_tool(state: AgentState) -> AgentState:
# """
# Expects: state["web_search_query"] is a non‐empty string.
# Returns: {"web_search_query": None, "web_search_result": <string>}.
# Retries up to 5 times on either a DuckDuckGo "202 Ratelimit" response or any exception (e.g. timeout).
# """
# print("reached web_search_tool")
# query = state.get("web_search_query", "")
# if not query:
# return {} # nothing to do
# ddg = DDGS()
# max_retries = 5
# result_text = ""
# for attempt in range(1, max_retries + 1):
# try:
# result_text = str(ddg.text(query, max_results=5))
# except Exception as e:
# # Network error or timeout—retry up to max_retries
# if attempt < max_retries:
# print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})")
# time.sleep(4)
# continue
# else:
# # Final attempt failed
# return {
# "web_search_query": None,
# "web_search_result": f"Error during DuckDuckGo search: {e}"
# }
# # Check for DuckDuckGo rate‐limit indicator
# if "202 Ratelimit" in result_text:
# if attempt < max_retries:
# print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})")
# time.sleep(4)
# continue
# else:
# # Final attempt still rate‐limited
# break
# # Successful response (no exception and no rate‐limit text)
# break
# return {
# "web_search_query": None,
# "web_search_result": result_text
# }