Quiz_Solver / tools.py
Vjay15's picture
TRYING IF IT WORKS
ff23105 verified
import requests
import base64
import logging
from playwright.sync_api import sync_playwright
import threading
import speech_recognition as sr
from pydub import AudioSegment
import io
import tempfile
import sys
import os
import pandas as pd
import numpy as np
import speech_recognition as sr
from bs4 import BeautifulSoup
import pydub
from pydub import AudioSegment
import pypdf
import zipfile
import duckdb
from PIL import Image
import json
logger = logging.getLogger(__name__)
def run_python_code(code: str) -> str:
"""
Executes Python code and returns the output.
"""
try:
logger.info("Executing Python code...")
# Robustly extract code from markdown blocks if present
if "```python" in code:
code = code.split("```python")[1].split("```")[0].strip()
elif "```" in code:
code = code.split("```")[1].split("```")[0].strip()
logger.info(f"Code:\n{code}")
# Create a buffer to capture stdout
old_stdout = sys.stdout
redirected_output = io.StringIO()
sys.stdout = redirected_output
# Execution context
local_scope = {
"pd": pd,
"np": np,
"requests": requests,
"io": io,
"sr": sr,
"pydub": pydub,
"sys": sys,
"os": os,
"BeautifulSoup": BeautifulSoup,
"pypdf": pypdf,
"zipfile": zipfile,
"duckdb": duckdb,
"Image": Image
}
try:
exec(code, {}, local_scope)
except Exception as exec_error:
return f"Error executing code: {exec_error}"
finally:
sys.stdout = old_stdout
output = redirected_output.getvalue()
logger.info(f"Code Output:\n{output}")
return output if output.strip() else "Code executed successfully but produced no output. Did you forget to print the result?"
except Exception as e:
logger.error(f"System error during code execution: {e}")
return f"System error: {e}"
def execute_python(code: str) -> str:
"""
Executes Python code and returns the output.
Use this for math, data analysis (pandas, numpy, duckdb), and file processing.
"""
return run_python_code(code)
def read_pdf(url: str) -> str:
"""
Downloads a PDF from a URL and extracts its text content.
"""
try:
logger.info(f"Reading PDF from: {url}")
if not url.startswith("http"):
return f"Error: URL must be absolute. Received: {url}"
response = requests.get(url)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
temp_pdf.write(response.content)
temp_pdf_path = temp_pdf.name
text = ""
try:
reader = pypdf.PdfReader(temp_pdf_path)
for page in reader.pages:
text += page.extract_text() + "\n"
except Exception as e:
return f"Error reading PDF: {e}"
finally:
os.remove(temp_pdf_path)
return text[:5000] # Truncate if too long to avoid context overflow
except Exception as e:
logger.error(f"Error downloading PDF: {e}")
return f"Error downloading PDF: {e}"
def read_zip(url: str) -> str:
"""
Downloads a ZIP file from a URL, lists its contents, and extracts text from small files.
"""
try:
logger.info(f"Reading ZIP from: {url}")
if not url.startswith("http"):
return f"Error: URL must be absolute. Received: {url}"
response = requests.get(url)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_zip:
temp_zip.write(response.content)
temp_zip_path = temp_zip.name
result = "ZIP Contents:\n"
try:
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
for file_info in zip_ref.infolist():
result += f"- {file_info.filename} ({file_info.file_size} bytes)\n"
# If it's a small text file, try to read it
if file_info.file_size < 10000 and not file_info.filename.endswith(('.png', '.jpg', '.jpeg', '.gif')):
try:
with zip_ref.open(file_info) as f:
content = f.read().decode('utf-8', errors='ignore')
result += f" Content: {content[:500]}\n"
except:
pass
except Exception as e:
return f"Error reading ZIP: {e}"
finally:
os.remove(temp_zip_path)
return result
except Exception as e:
logger.error(f"Error downloading ZIP: {e}")
return f"Error downloading ZIP: {e}"
def search_history(query: str) -> str:
"""
Searches the history of solved quizzes for a given query (e.g., a previous URL).
Returns the answer if found.
"""
try:
history_file = "history.jsonl"
if not os.path.exists(history_file):
return "No history found."
results = []
with open(history_file, "r") as f:
for line in f:
try:
entry = json.loads(line)
if query in str(entry):
results.append(str(entry))
except:
pass
if results:
return "\n".join(results)
return "No matching history found."
except Exception as e:
return f"Error searching history: {e}"
def transcribe_audio(url: str) -> str:
"""
Downloads an audio file from a URL and transcribes it using Google Speech Recognition.
Supports MP3, WAV, etc.
"""
try:
logger.info(f"Transcribing audio from: {url}")
if not url.startswith("http"):
return f"Error: URL must be absolute. Received: {url}"
response = requests.get(url)
response.raise_for_status()
# Create a temporary file to save the audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav:
temp_wav_path = temp_wav.name
# Convert to WAV if needed (using pydub)
try:
audio_content = io.BytesIO(response.content)
audio = AudioSegment.from_file(audio_content)
audio.export(temp_wav_path, format="wav")
except Exception as e:
logger.error(f"Error converting audio: {e}")
return f"Error converting audio: {e}"
# Transcribe using local Whisper (tiny model)
# This runs entirely on-device (CPU/GPU) and does not make external API calls.
import whisper
# Load model (downloads once if not cached)
model = whisper.load_model("tiny")
result = model.transcribe(temp_wav_path)
text = result["text"]
logger.info(f"WHISPER OUTPUT: {text}")
# Clean up
os.remove(temp_wav_path)
return text
except Exception as e:
logger.error(f"Error transcribing audio: {e}")
return f"Error transcribing audio: {e}"
def understand_image(url: str, prompt: str = "Describe this image in detail") -> str:
"""
Analyzes an image from a URL using the agent's vision capabilities (via API).
Returns a description of the image.
"""
try:
logger.info(f"Analyzing image from: {url}")
if os.path.exists(url):
# It's a local file, convert to data URI
try:
with open(url, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
# Determine mime type based on extension, default to jpeg
mime_type = "image/jpeg"
if url.lower().endswith(".png"):
mime_type = "image/png"
elif url.lower().endswith(".gif"):
mime_type = "image/gif"
elif url.lower().endswith(".webp"):
mime_type = "image/webp"
url = f"data:{mime_type};base64,{encoded_string}"
except Exception as e:
return f"Error reading local image file: {e}"
if not url.startswith("http") and not url.startswith("data:"):
return f"Error: URL must be absolute (http/https), a data URI, or a valid local file path. Received: {url[:50]}..."
api_key = os.getenv("AI_TOKEN") or os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "Error: AI_TOKEN not found."
# Use OpenRouter API to analyze the image
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "google/gemini-2.0-flash-lite-001",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": url}}
]
}
]
}
response = requests.post("https://aipipe.org/openrouter/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
description = result['choices'][0]['message']['content']
logger.info(f"IMAGE ANALYSIS OUTPUT: {description}")
return description
except Exception as e:
logger.error(f"Error analyzing image: {e}")
return f"Error analyzing image: {e}"
def call_api(url: str, method: str = "GET", headers: dict = None, json_data: dict = None) -> str:
"""
Makes an HTTP request to an external API.
Useful for sourcing data from APIs as required by the quiz.
"""
try:
logger.info(f"Calling API: {method} {url}")
if not url.startswith("http"):
return f"Error: URL must be absolute. Received: {url}"
response = requests.request(method, url, headers=headers, json=json_data)
try:
return json.dumps(response.json(), indent=2)
except:
return response.text
except Exception as e:
logger.error(f"Error calling API: {e}")
return f"Error calling API: {e}"
def fetch_page_text(url: str) -> dict:
"""
Fetches the text content of a web page using Playwright.
Also extracts links, audio sources, and takes a screenshot if visual elements are present.
Args:
url (str): The URL of the page to fetch.
Returns:
dict: A dictionary containing the 'content' (text + links/media) or 'error'.
"""
result = {}
try:
logger.info(f"Fetching page text from: {url}")
if not url.startswith("http"):
result["error"] = f"Error: URL must be absolute. Received: {url}"
return result
def run_playwright():
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
page.wait_for_load_state("networkidle")
html_content = page.content()
# Screenshot Logic
images = []
try:
has_visuals = page.evaluate("() => document.querySelectorAll('canvas, img').length > 0")
if has_visuals:
import uuid
unique_id = str(uuid.uuid4())[:8]
screenshot_path = f"/tmp/screenshot_{unique_id}.jpg"
page.screenshot(path=screenshot_path, full_page=True, type='jpeg', quality=50)
images.append(f"Image: [Page Screenshot]({screenshot_path})")
logger.info(f"Visual elements detected. Saved screenshot to {screenshot_path}")
except Exception as e:
logger.warning(f"Error handling screenshot in fetch_page_text: {e}")
browser.close()
# Parse with BS4
soup = BeautifulSoup(html_content, 'html.parser')
text_content = soup.get_text(separator='\n', strip=True)
links = []
for a in soup.find_all('a', href=True):
href = a['href']
links.append(f"Link: [{a.get_text(strip=True)}]({href})")
audio_sources = []
for audio in soup.find_all('audio'):
if audio.get('src'):
audio_sources.append(f"Audio: {audio['src']}")
for source in audio.find_all('source', src=True):
audio_sources.append(f"Audio: {source['src']}")
result["content"] = text_content + "\n\n--- Extracted Links & Media ---\n" + "\n".join(links + audio_sources + images)
except Exception as e:
result["error"] = str(e)
thread = threading.Thread(target=run_playwright)
thread.start()
thread.join()
if "error" in result:
logger.error(f"Error fetching page: {result['error']}")
return f"Error fetching page: {result['error']}"
content = result.get("content", "")
logger.info(f"Fetched content (first 100 chars): {content[:100]}")
return content
except Exception as e:
logger.error(f"Error fetching page: {e}")
return f"Error fetching page: {e}"
def fetch_page_scripts(url: str) -> str:
"""
Fetches only the scripts (inline and src) from a web page.
Useful when the page mentions embedded logic or hidden code.
"""
result = {}
try:
logger.info(f"Fetching page scripts from: {url}")
if not url.startswith("http"):
return f"Error: URL must be absolute. Received: {url}"
def run_playwright():
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
page.wait_for_load_state("networkidle")
html_content = page.content()
browser.close()
soup = BeautifulSoup(html_content, 'html.parser')
scripts = []
for script in soup.find_all('script'):
if script.get('src'):
scripts.append(f"Script Source: {script['src']}")
elif script.string and script.string.strip():
scripts.append(f"Inline Script: {script.string.strip()[:2000]}")
result["content"] = "--- Extracted Scripts ---\n" + "\n".join(scripts)
except Exception as e:
result["error"] = str(e)
thread = threading.Thread(target=run_playwright)
thread.start()
thread.join()
if "error" in result:
return f"Error fetching scripts: {result['error']}"
return result.get("content", "No scripts found.")
except Exception as e:
return f"Error fetching scripts: {e}"