cat / tools.py
DenviWorking's picture
Create tools.py
c5ed555 verified
import requests
import io
import base64
import openai
from openai import OpenAI
from smolagents import tool
import os
import io, time, itertools, functools
from typing import List, Optional
import sys, contextlib
import av
from pytube import YouTube
from yt_dlp import YoutubeDL
from PIL import Image
from tqdm import tqdm
import wikipediaapi
import tempfile
model_id = "gpt-4.1"
@tool
def read_image(query: str, img_url: str) -> str:
"""
Use a visual question answering (VQA) model to generate a response to a query based on an image.
Args:
query (str): A natural language question about the image.
img_url (str): The URL of the image to analyze.
Returns:
str: A response generated by the VQA model based on the provided image and question.
"""
client = OpenAI()
response = client.responses.create(
model=model_id,
input=[
{
"role": "user",
"content": [
{"type": "input_text", "text": query},
{
"type": "input_image",
"image_url": img_url,
},
],
}
],
)
return response.output_text
@tool
def read_code(file_url: str) -> str:
"""
Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet.
Args:
file_url (str): The URL of the code file to retrieve.
Returns:
str: The content of the file as a string.
"""
response = requests.get(file_url)
response.raise_for_status()
return response.text
@tool
def transcribe_audio(file_url: str, file_name: str) -> str:
"""
Download and transcribe an audio file using transcription model.
Args:
file_url (str): Direct URL to the audio file (e.g., .mp3, .wav).
file_name (str): Filename including extension, used to determine format.
Returns:
str: The transcribed text from the audio file.
"""
response = requests.get(file_url)
response.raise_for_status()
extension = file_name.split(".")[-1].lower() or "mp3"
audio_file = io.BytesIO(response.content)
audio_file.name = f"audio.{extension}"
client = OpenAI()
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe", file=audio_file
)
return transcription.text
def _pytube_buffer(url: str) -> Optional[io.BytesIO]:
try:
from pytube import YouTube
yt = YouTube(url)
stream = (
yt.streams.filter(progressive=True, file_extension="mp4")
.order_by("resolution")
.desc()
.first()
)
if stream is None:
raise RuntimeError("No MP4 with audio found")
buf = io.BytesIO()
stream.stream_to_buffer(buf)
buf.seek(0)
return buf
except Exception as e:
print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr)
return None
def _ytdlp_buffer(url: str) -> io.BytesIO:
"""
Return a BytesIO containing some MP4 video stream for `url`.
Works whether YouTube serves a progressive file or separate A/V.
"""
ydl_opts = {
"quiet": True,
"skip_download": True,
"format": "bestvideo[ext=mp4]/best[ext=mp4]/best",
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if "entries" in info:
info = info["entries"][0]
if "url" in info:
video_urls = [info["url"]]
elif "requested_formats" in info:
video_urls = [
fmt["url"]
for fmt in info["requested_formats"]
if fmt.get("vcodec") != "none"
]
if not video_urls:
raise RuntimeError("yt-dlp returned audio-only formats")
else:
raise RuntimeError("yt-dlp could not extract a stream URL")
buf = io.BytesIO()
for direct_url in video_urls:
with requests.get(direct_url, stream=True) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=1 << 16):
buf.write(chunk)
buf.seek(0)
return buf
@functools.lru_cache(maxsize=8)
def youtube_to_buffer(url: str) -> io.BytesIO:
"""
Return a BytesIO containing a single progressive MP4
(H.264 + AAC) – the safest thing PyAV can open everywhere.
"""
ydl_opts = {
"quiet": True,
"skip_download": True,
"format": (
"best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]"
),
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if "entries" in info:
info = info["entries"][0]
direct_url = info.get("url")
if not direct_url:
raise RuntimeError("yt-dlp could not find a progressive MP4 track")
buf = io.BytesIO()
with requests.get(direct_url, stream=True) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=1 << 17):
buf.write(chunk)
buf.seek(0)
return buf
def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]:
"""Decode `n_frames` uniformly spaced RGB frames as PIL images."""
container = av.open(video_bytes, metadata_errors="ignore")
video = container.streams.video[0]
total = video.frames or 0
step = max(1, total // n_frames) if total else 30
frames: list[Image.Image] = []
for i, frame in enumerate(container.decode(video=0)):
if i % step == 0:
frames.append(frame.to_image())
if len(frames) >= n_frames:
break
container.close()
return frames
def pil_to_data_url(img: Image.Image, quality: int = 80) -> str:
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=quality, optimize=True)
b64 = base64.b64encode(buf.getvalue()).decode()
return f"data:image/jpeg;base64,{b64}"
def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]:
"""
Extracts the audio stream from video_bytes, saves it as a temporary WAV file,
and returns the path to the file.
Returns None if no audio stream is found or an error occurs.
"""
try:
video_bytes.seek(0)
input_container = av.open(video_bytes, metadata_errors="ignore")
if not input_container.streams.audio:
print("No audio streams found in the video.", file=sys.stderr)
return None
input_audio_stream = input_container.streams.audio[0]
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
temp_audio_file_path = tmp_file.name
output_container = av.open(temp_audio_file_path, mode="w", format="wav")
channel_layout = "stereo"
if (
hasattr(input_audio_stream.codec_context, "layout")
and input_audio_stream.codec_context.layout
):
channel_layout = input_audio_stream.codec_context.layout.name
elif (
hasattr(input_audio_stream.codec_context, "channels")
and input_audio_stream.codec_context.channels == 1
):
channel_layout = "mono"
output_audio_stream = output_container.add_stream(
"pcm_s16le",
rate=input_audio_stream.codec_context.sample_rate,
layout=channel_layout,
)
for frame in input_container.decode(input_audio_stream):
for packet in output_audio_stream.encode(frame):
output_container.mux(packet)
for packet in output_audio_stream.encode():
output_container.mux(packet)
output_container.close()
input_container.close()
return temp_audio_file_path
except Exception as e:
print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr)
if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path):
os.remove(temp_audio_file_path)
return None
@tool
def run_video(query: str, url: str) -> str:
"""
Get a YouTube video from url and return an answer to a natural-language query using the video.
Args:
query (str): A natural-language question whose answer is expected to be found in the visual content of the video.
url (str): Fully qualified URL of the YouTube video to analyze.
Returns:
str: A response generated by the VQA model based on the provided video and question.
"""
n_frames = 4
buff = youtube_to_buffer(url)
if buff is None:
return "Error: Could not download or buffer the video."
frames = sample_frames(buff, n_frames=n_frames)
buff.seek(0)
transcript = "[Audio could not be processed]"
audio_file_path = None
try:
audio_file_path = save_audio_stream_to_temp_wav_file(buff)
if audio_file_path:
with open(audio_file_path, "rb") as audio_data:
transcription_response = openai.audio.transcriptions.create(
model="gpt-4o-transcribe", file=audio_data
)
transcript = transcription_response.text
else:
transcript = "[No audio stream found or error during extraction]"
print(
"No audio file path returned, skipping transcription.", file=sys.stderr
)
except Exception as e:
print(f"Error during audio transcription: {e}", file=sys.stderr)
transcript = f"[Error during audio transcription: {e}]"
finally:
if audio_file_path and os.path.exists(audio_file_path):
os.remove(audio_file_path)
prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):"
content = [{"type": "text", "text": prompt_text}]
for img in frames:
content.append(
{
"type": "image_url",
"image_url": {"url": pil_to_data_url(img)},
}
)
try:
resp = openai.chat.completions.create(
model=model_id,
messages=[{"role": "user", "content": content}],
temperature=0.1,
)
result = resp.choices[0].message.content.strip()
except Exception as e:
print(f"Error calling OpenAI API: {e}", file=sys.stderr)
result = f"[Error processing with AI model: {e}]"
return result
@tool
def search_wikipedia(query: str) -> str:
"""
get the contents of wikipedia page retrieved by search query.
Args:
query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words.
Returns:
str: The text content of wikipedia page
"""
get_wiki = wikipediaapi.Wikipedia(
language="en",
user_agent="test_tokki",
extract_format=wikipediaapi.ExtractFormat.WIKI,
)
page_content = get_wiki.page(query)
text_content = page_content.text
cutoff = 25000
text_content = " ".join(text_content.split(" ")[:cutoff])
return text_content