Spaces:
Sleeping
Sleeping
# tools.py | |
import pandas as pd | |
from pathlib import Path | |
import requests | |
import regex as re | |
import time | |
import os | |
from duckduckgo_search import DDGS | |
from langchain_core.tools import tool | |
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# Removed complex safety wrapper - keeping things simple | |
def _download_file_for_task(task_id: str, ext: str) -> str: | |
""" | |
Helper: attempt to GET the remote file for a given task_id. | |
Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful, | |
or an empty string if no file / download failed. | |
""" | |
print("reached _download_file_for_task") | |
os.makedirs("hf_files", exist_ok=True) | |
local_path = os.path.join("hf_files", f"{task_id}.{ext}") | |
url = f"{DEFAULT_API_URL}/files/{task_id}" | |
try: | |
resp = requests.get(url, timeout=10) | |
if resp.status_code == 200 and resp.content: | |
print(f"Downloaded file from {url} to {local_path}") | |
with open(local_path, "wb") as f: | |
f.write(resp.content) | |
return local_path | |
except Exception: | |
print(f"Error downloading file from {url} to {local_path}") | |
pass | |
# If we get here, either 404 or download error | |
return "" | |
def image_tool(task_id: str) -> str: | |
""" | |
Expects: task_id (str) — a valid image task ID. | |
Returns: image caption from Hugging Face API or error message. | |
""" | |
import requests, os | |
# Try downloading image with one of the allowed extensions | |
for ext in ("png", "jpg", "jpeg"): | |
file_path = _download_file_for_task(task_id, ext) | |
if file_path and os.path.exists(file_path): | |
break | |
else: | |
return f"Error: Image file for task_id '{task_id}' not found." | |
# Read the image bytes | |
try: | |
with open(file_path, "rb") as f: | |
image_bytes = f.read() | |
except Exception as e: | |
return f"Error reading image: {str(e)}" | |
# Load HF token | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
return "Error: HF_TOKEN not set in environment." | |
# Use a single reliable model | |
model = "Salesforce/blip-image-captioning-base" | |
headers = {"Authorization": f"Bearer {hf_token}"} | |
try: | |
response = requests.post( | |
f"https://api-inference.huggingface.co/models/{model}", | |
headers=headers, | |
files={"file": image_bytes}, | |
timeout=30 | |
) | |
except Exception as e: | |
return f"Error calling HuggingFace API: {e}" | |
# Parse response | |
if response.status_code != 200: | |
return f"Error from model ({model}): {response.status_code} - {response.text}" | |
try: | |
result = response.json() | |
if isinstance(result, list) and result: | |
caption = result[0].get("generated_text", "").strip() | |
elif isinstance(result, dict): | |
caption = result.get("generated_text", "").strip() | |
else: | |
caption = "" | |
except Exception as e: | |
return f"Error parsing response: {e}" | |
if not caption: | |
return "No caption generated by model." | |
return f"Image Caption:\n{caption}" | |
def excel_tool(task_id: str) -> str: | |
""" | |
Downloads <task_id>.xlsx (if any) and returns a stringified list of | |
records from the specified sheet. No fallback to user-supplied tables. | |
Expected keys in `task_id`: | |
• task_id – required (used to download the file) | |
returns: stringified list of records from the specified sheet | |
""" | |
print("reached excel_tool") | |
sheet = "Sheet1" | |
local_xlsx = _download_file_for_task(task_id, "xlsx") | |
if not local_xlsx or not os.path.exists(local_xlsx): | |
return "Error: Excel file not found for this task." | |
try: | |
xls = pd.ExcelFile(local_xlsx) | |
df = pd.read_excel( | |
xls, | |
sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0] | |
) | |
print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}") | |
return str(df.to_dict(orient="records")) | |
except Exception as e: | |
return f"Error reading Excel file: {e}" | |
import openai | |
def audio_transcriber_tool(task_id: str) -> str: | |
""" | |
LangGraph tool for transcribing audio via OpenAI's Whisper API. | |
Expects: task_id is a string | |
Returns: | |
"<text or error message>" | |
Always attempts to download the file for the given path or task ID. | |
""" | |
print("reached audio_transcriber_tool") | |
# Always attempt to download the file, regardless of local existence | |
local_audio = "" | |
for ext in ("mp3", "wav", "m4a"): | |
candidate = _download_file_for_task(task_id, ext) | |
if candidate: | |
local_audio = candidate | |
break | |
if not local_audio or not os.path.exists(local_audio): | |
return "Error: No audio file found (download failed)." | |
# Send to OpenAI Whisper | |
try: | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
if not openai.api_key: | |
raise RuntimeError("OPENAI_API_KEY is not set in environment.") | |
with open(local_audio, "rb") as audio_file: | |
print("reached openai.audio.transcriptions.create") | |
response = openai.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
) | |
print("reached response") | |
text = response.text.strip() | |
except Exception as e: | |
text = f"Error during transcription: {e}" | |
print(f"Transcripted as transcript: {text}") | |
return text | |
# tools.py | |
import re | |
import requests | |
def wikipedia_search_tool(wiki_query: str) -> str: | |
""" | |
Searches Wikipedia for the given query and returns the first 5 pages. | |
Expects: wiki_query is a non‐empty string. | |
Returns: text summary of first matching page or an error message>" | |
If no valid wiki_query is provided, returns {}. | |
""" | |
print(f"DEBUG: reached wikipedia_search_tool with query: {wiki_query}") | |
try: | |
docs = WikipediaLoader(query=wiki_query, load_max_docs=3).load() # Reduced from 5 to 3 | |
print(f"DEBUG: WikipediaLoader returned {len(docs)} documents") | |
result = "" | |
counter = 1 | |
for doc in docs: | |
print(f"DEBUG: Processing Wikipedia document {counter}") | |
print(f"DEBUG: Document metadata: {doc.metadata}") | |
print(f"DEBUG: Document content length: {len(doc.page_content)}") | |
# Handle different metadata structures | |
title = "Unknown Title" | |
if hasattr(doc, 'metadata') and doc.metadata: | |
# Try different possible title keys | |
if 'title' in doc.metadata: | |
title = doc.metadata['title'] | |
elif 'Title' in doc.metadata: | |
title = doc.metadata['Title'] | |
elif 'source' in doc.metadata: | |
title = doc.metadata['source'] | |
else: | |
# Use first available key as title | |
if doc.metadata: | |
first_key = list(doc.metadata.keys())[0] | |
title = f"Wikipedia: {doc.metadata[first_key]}" | |
print(f"DEBUG: Using Wikipedia title: {title}") | |
# Trim content to key information only (reduced from 2000 to 800 characters) | |
content = doc.page_content[:800] if len(doc.page_content) > 800 else doc.page_content | |
# Add document but keep it concise | |
result += f"\n\nWikipedia Result {counter}: {title}\nSummary: {content}..." | |
counter += 1 | |
# Stop after 2 documents to keep response manageable | |
if counter > 2: | |
break | |
if not result.strip(): | |
return "No Wikipedia results found for the given query. [END_OF_SEARCH]" | |
# Add clear end marker | |
result += "\n\n[END_OF_SEARCH] - Wikipedia search complete. Use this information to answer the question." | |
print(f"DEBUG: Final Wikipedia result length: {len(result)}") | |
return result | |
except Exception as e: | |
error_msg = f"Error during Wikipedia search: {str(e)} [END_OF_SEARCH]" | |
print(f"DEBUG: {error_msg}") | |
return error_msg | |
def arxiv_search_tool(arxiv_query: str) -> str: | |
""" | |
Searches Arxiv for the given query and returns the first 5 pages. | |
Expects: arxiv_query is a non‐empty string. | |
Returns: text summary of first matching page or an error message>" | |
""" | |
print(f"DEBUG: reached arxiv_search_tool with query: {arxiv_query}") | |
try: | |
docs = ArxivLoader(query=arxiv_query, load_max_docs=3).load() # Reduced from 5 to 3 | |
print(f"DEBUG: ArxivLoader returned {len(docs)} documents") | |
result = "" | |
counter = 1 | |
for doc in docs: | |
print(f"DEBUG: Processing document {counter}") | |
print(f"DEBUG: Document metadata: {doc.metadata}") | |
print(f"DEBUG: Document content length: {len(doc.page_content)}") | |
# Handle different metadata structures | |
title = "Unknown Title" | |
if hasattr(doc, 'metadata') and doc.metadata: | |
# Try different possible title keys | |
if 'title' in doc.metadata: | |
title = doc.metadata['title'] | |
elif 'Title' in doc.metadata: | |
title = doc.metadata['Title'] | |
elif 'entry_id' in doc.metadata: | |
title = doc.metadata['entry_id'] | |
elif 'summary' in doc.metadata: | |
title = f"ArXiv Paper {counter}" | |
else: | |
# Use first available key as title | |
if doc.metadata: | |
first_key = list(doc.metadata.keys())[0] | |
title = f"{first_key}: {doc.metadata[first_key]}" | |
print(f"DEBUG: Using title: {title}") | |
# Trim content to key information only (reduced from 2000 to 800 characters) | |
content = doc.page_content[:800] if len(doc.page_content) > 800 else doc.page_content | |
# Add document but keep it concise | |
result += f"\n\nArXiv Result {counter}: {title}\nAbstract/Summary: {content}..." | |
counter += 1 | |
# Stop after 2 documents to keep response manageable | |
if counter > 2: | |
break | |
if not result.strip(): | |
return "No ArXiv results found for the given query. [END_OF_SEARCH]" | |
# Add clear end marker | |
result += "\n\n[END_OF_SEARCH] - ArXiv search complete. Use this information to answer the question." | |
print(f"DEBUG: Final ArXiv result length: {len(result)}") | |
return result | |
except Exception as e: | |
error_msg = f"Error during Arxiv search: {str(e)} [END_OF_SEARCH]" | |
print(f"DEBUG: {error_msg}") | |
return error_msg | |
from langchain_openai import ChatOpenAI | |
from langchain.schema import SystemMessage, HumanMessage | |
LLM = ChatOpenAI(model_name="gpt-4.1-mini", temperature=0.2) | |
def analyze_code_tool(task_id: str) -> str: | |
""" | |
Either task_id OR (file + task_id) | |
Reads the code (max 400 lines / 10 kB) and asks the LLM for: | |
• plain-language summary | |
• list of key functions/classes | |
• obvious bugs or style smells | |
Returns that analysis as a string. | |
""" | |
print("reached analyze_code_tool") | |
code_txt = "" | |
if not task_id: | |
code_txt = "No code provided." | |
else: | |
path = _download_file_for_task(task_id, "py") | |
if not path: | |
return "Error: .py file not found for this task." | |
code_txt = Path(path).read_text(encoding="utf-8", errors="ignore") | |
# else: | |
# return "Error: neither snippet nor file provided." | |
# Truncate for safety | |
lines = code_txt.splitlines()[:400] | |
code_sample = "\n".join(lines)[:10_000] | |
prompt = [ | |
SystemMessage(content="You are a senior Python code reviewer."), | |
HumanMessage(content=( | |
"Please analyse the following code. " | |
"Summarise what it does, list key functions/classes, " | |
"and point out any obvious bugs, performance issues or style problems.\n\n" | |
f"```python\n{code_sample}\n```" | |
"If you can then find the output of the code and return it in the output." | |
)) | |
] | |
return LLM.invoke(prompt).content.strip() | |
# def web_search_tool(state: AgentState) -> AgentState: | |
# """ | |
# Expects: state["web_search_query"] is a non‐empty string. | |
# Returns: {"web_search_query": None, "web_search_result": <string>}. | |
# Retries up to 5 times on either a DuckDuckGo "202 Ratelimit" response or any exception (e.g. timeout). | |
# """ | |
# print("reached web_search_tool") | |
# query = state.get("web_search_query", "") | |
# if not query: | |
# return {} # nothing to do | |
# ddg = DDGS() | |
# max_retries = 5 | |
# result_text = "" | |
# for attempt in range(1, max_retries + 1): | |
# try: | |
# result_text = str(ddg.text(query, max_results=5)) | |
# except Exception as e: | |
# # Network error or timeout—retry up to max_retries | |
# if attempt < max_retries: | |
# print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})") | |
# time.sleep(4) | |
# continue | |
# else: | |
# # Final attempt failed | |
# return { | |
# "web_search_query": None, | |
# "web_search_result": f"Error during DuckDuckGo search: {e}" | |
# } | |
# # Check for DuckDuckGo rate‐limit indicator | |
# if "202 Ratelimit" in result_text: | |
# if attempt < max_retries: | |
# print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})") | |
# time.sleep(4) | |
# continue | |
# else: | |
# # Final attempt still rate‐limited | |
# break | |
# # Successful response (no exception and no rate‐limit text) | |
# break | |
# return { | |
# "web_search_query": None, | |
# "web_search_result": result_text | |
# } | |