Spaces:
Sleeping
Sleeping
import requests | |
import io | |
import base64 | |
import openai | |
from openai import OpenAI | |
from smolagents import tool | |
import os | |
import io, time, itertools, functools | |
from typing import List, Optional | |
import sys, contextlib | |
import av | |
from pytube import YouTube | |
from yt_dlp import YoutubeDL | |
from PIL import Image | |
from tqdm import tqdm | |
import wikipediaapi | |
import tempfile | |
model_id = "gpt-4.1" | |
def read_image(query: str, img_url: str) -> str: | |
""" | |
Use a visual question answering (VQA) model to generate a response to a query based on an image. | |
Args: | |
query (str): A natural language question about the image. | |
img_url (str): The URL of the image to analyze. | |
Returns: | |
str: A response generated by the VQA model based on the provided image and question. | |
""" | |
client = OpenAI() | |
response = client.responses.create( | |
model=model_id, | |
input=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "input_text", "text": query}, | |
{ | |
"type": "input_image", | |
"image_url": img_url, | |
}, | |
], | |
} | |
], | |
) | |
return response.output_text | |
def read_code(file_url: str) -> str: | |
""" | |
Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet. | |
Args: | |
file_url (str): The URL of the code file to retrieve. | |
Returns: | |
str: The content of the file as a string. | |
""" | |
response = requests.get(file_url) | |
response.raise_for_status() | |
return response.text | |
def transcribe_audio(file_url: str, file_name: str) -> str: | |
""" | |
Download and transcribe an audio file using transcription model. | |
Args: | |
file_url (str): Direct URL to the audio file (e.g., .mp3, .wav). | |
file_name (str): Filename including extension, used to determine format. | |
Returns: | |
str: The transcribed text from the audio file. | |
""" | |
response = requests.get(file_url) | |
response.raise_for_status() | |
extension = file_name.split(".")[-1].lower() or "mp3" | |
audio_file = io.BytesIO(response.content) | |
audio_file.name = f"audio.{extension}" | |
client = OpenAI() | |
transcription = client.audio.transcriptions.create( | |
model="gpt-4o-transcribe", file=audio_file | |
) | |
return transcription.text | |
def _pytube_buffer(url: str) -> Optional[io.BytesIO]: | |
try: | |
from pytube import YouTube | |
yt = YouTube(url) | |
stream = ( | |
yt.streams.filter(progressive=True, file_extension="mp4") | |
.order_by("resolution") | |
.desc() | |
.first() | |
) | |
if stream is None: | |
raise RuntimeError("No MP4 with audio found") | |
buf = io.BytesIO() | |
stream.stream_to_buffer(buf) | |
buf.seek(0) | |
return buf | |
except Exception as e: | |
print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr) | |
return None | |
def _ytdlp_buffer(url: str) -> io.BytesIO: | |
""" | |
Return a BytesIO containing some MP4 video stream for `url`. | |
Works whether YouTube serves a progressive file or separate A/V. | |
""" | |
ydl_opts = { | |
"quiet": True, | |
"skip_download": True, | |
"format": "bestvideo[ext=mp4]/best[ext=mp4]/best", | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=False) | |
if "entries" in info: | |
info = info["entries"][0] | |
if "url" in info: | |
video_urls = [info["url"]] | |
elif "requested_formats" in info: | |
video_urls = [ | |
fmt["url"] | |
for fmt in info["requested_formats"] | |
if fmt.get("vcodec") != "none" | |
] | |
if not video_urls: | |
raise RuntimeError("yt-dlp returned audio-only formats") | |
else: | |
raise RuntimeError("yt-dlp could not extract a stream URL") | |
buf = io.BytesIO() | |
for direct_url in video_urls: | |
with requests.get(direct_url, stream=True) as r: | |
r.raise_for_status() | |
for chunk in r.iter_content(chunk_size=1 << 16): | |
buf.write(chunk) | |
buf.seek(0) | |
return buf | |
def youtube_to_buffer(url: str) -> io.BytesIO: | |
""" | |
Return a BytesIO containing a single progressive MP4 | |
(H.264 + AAC) – the safest thing PyAV can open everywhere. | |
""" | |
ydl_opts = { | |
"quiet": True, | |
"skip_download": True, | |
"format": ( | |
"best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]" | |
), | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=False) | |
if "entries" in info: | |
info = info["entries"][0] | |
direct_url = info.get("url") | |
if not direct_url: | |
raise RuntimeError("yt-dlp could not find a progressive MP4 track") | |
buf = io.BytesIO() | |
with requests.get(direct_url, stream=True) as r: | |
r.raise_for_status() | |
for chunk in r.iter_content(chunk_size=1 << 17): | |
buf.write(chunk) | |
buf.seek(0) | |
return buf | |
def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]: | |
"""Decode `n_frames` uniformly spaced RGB frames as PIL images.""" | |
container = av.open(video_bytes, metadata_errors="ignore") | |
video = container.streams.video[0] | |
total = video.frames or 0 | |
step = max(1, total // n_frames) if total else 30 | |
frames: list[Image.Image] = [] | |
for i, frame in enumerate(container.decode(video=0)): | |
if i % step == 0: | |
frames.append(frame.to_image()) | |
if len(frames) >= n_frames: | |
break | |
container.close() | |
return frames | |
def pil_to_data_url(img: Image.Image, quality: int = 80) -> str: | |
buf = io.BytesIO() | |
img.save(buf, format="JPEG", quality=quality, optimize=True) | |
b64 = base64.b64encode(buf.getvalue()).decode() | |
return f"data:image/jpeg;base64,{b64}" | |
def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]: | |
""" | |
Extracts the audio stream from video_bytes, saves it as a temporary WAV file, | |
and returns the path to the file. | |
Returns None if no audio stream is found or an error occurs. | |
""" | |
try: | |
video_bytes.seek(0) | |
input_container = av.open(video_bytes, metadata_errors="ignore") | |
if not input_container.streams.audio: | |
print("No audio streams found in the video.", file=sys.stderr) | |
return None | |
input_audio_stream = input_container.streams.audio[0] | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
temp_audio_file_path = tmp_file.name | |
output_container = av.open(temp_audio_file_path, mode="w", format="wav") | |
channel_layout = "stereo" | |
if ( | |
hasattr(input_audio_stream.codec_context, "layout") | |
and input_audio_stream.codec_context.layout | |
): | |
channel_layout = input_audio_stream.codec_context.layout.name | |
elif ( | |
hasattr(input_audio_stream.codec_context, "channels") | |
and input_audio_stream.codec_context.channels == 1 | |
): | |
channel_layout = "mono" | |
output_audio_stream = output_container.add_stream( | |
"pcm_s16le", | |
rate=input_audio_stream.codec_context.sample_rate, | |
layout=channel_layout, | |
) | |
for frame in input_container.decode(input_audio_stream): | |
for packet in output_audio_stream.encode(frame): | |
output_container.mux(packet) | |
for packet in output_audio_stream.encode(): | |
output_container.mux(packet) | |
output_container.close() | |
input_container.close() | |
return temp_audio_file_path | |
except Exception as e: | |
print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr) | |
if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path): | |
os.remove(temp_audio_file_path) | |
return None | |
def run_video(query: str, url: str) -> str: | |
""" | |
Get a YouTube video from url and return an answer to a natural-language query using the video. | |
Args: | |
query (str): A natural-language question whose answer is expected to be found in the visual content of the video. | |
url (str): Fully qualified URL of the YouTube video to analyze. | |
Returns: | |
str: A response generated by the VQA model based on the provided video and question. | |
""" | |
n_frames = 4 | |
buff = youtube_to_buffer(url) | |
if buff is None: | |
return "Error: Could not download or buffer the video." | |
frames = sample_frames(buff, n_frames=n_frames) | |
buff.seek(0) | |
transcript = "[Audio could not be processed]" | |
audio_file_path = None | |
try: | |
audio_file_path = save_audio_stream_to_temp_wav_file(buff) | |
if audio_file_path: | |
with open(audio_file_path, "rb") as audio_data: | |
transcription_response = openai.audio.transcriptions.create( | |
model="gpt-4o-transcribe", file=audio_data | |
) | |
transcript = transcription_response.text | |
else: | |
transcript = "[No audio stream found or error during extraction]" | |
print( | |
"No audio file path returned, skipping transcription.", file=sys.stderr | |
) | |
except Exception as e: | |
print(f"Error during audio transcription: {e}", file=sys.stderr) | |
transcript = f"[Error during audio transcription: {e}]" | |
finally: | |
if audio_file_path and os.path.exists(audio_file_path): | |
os.remove(audio_file_path) | |
prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):" | |
content = [{"type": "text", "text": prompt_text}] | |
for img in frames: | |
content.append( | |
{ | |
"type": "image_url", | |
"image_url": {"url": pil_to_data_url(img)}, | |
} | |
) | |
try: | |
resp = openai.chat.completions.create( | |
model=model_id, | |
messages=[{"role": "user", "content": content}], | |
temperature=0.1, | |
) | |
result = resp.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"Error calling OpenAI API: {e}", file=sys.stderr) | |
result = f"[Error processing with AI model: {e}]" | |
return result | |
def search_wikipedia(query: str) -> str: | |
""" | |
get the contents of wikipedia page retrieved by search query. | |
Args: | |
query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words. | |
Returns: | |
str: The text content of wikipedia page | |
""" | |
get_wiki = wikipediaapi.Wikipedia( | |
language="en", | |
user_agent="test_tokki", | |
extract_format=wikipediaapi.ExtractFormat.WIKI, | |
) | |
page_content = get_wiki.page(query) | |
text_content = page_content.text | |
cutoff = 25000 | |
text_content = " ".join(text_content.split(" ")[:cutoff]) | |
return text_content |