import requests
from dotenv import load_dotenv
from openai import OpenAI
from utils import process_image_for_gpt
import pandas as pd
import tempfile
import os
import io
import yt_dlp
import re
import html2text
from requests.exceptions import RequestException
from bs4 import BeautifulSoup
from pydub import AudioSegment
def add_numbers(*nums: list[int]) -> int:
"""Add a list of numbers
Args:
nums: list of numbers"""
def transcribe_image_from_url(image_url: str) -> str:
"""Only works with full http urls"""
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": """Please transcribe all text visible in this image.
Extract the text exactly as it appears, maintaining formatting when possible.
If there's no readable text, respond with 'No text found in image'.""",
},
{
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "high",
},
},
],
}
],
max_tokens=1000,
temperature=0,
)
transcribed_text = response.choices[0].message.content.strip()
return transcribed_text
def truncate_content(content: str, max_length: int = 10000) -> str:
if len(content) <= max_length:
return content
else:
return content[:max_length]
class WebPageTranscription:
def __init__(self):
self.counter = 0
def transcribe_webpage(self, website_url: str) -> str:
"""Visits website url and returns markdown of contents
Args:
website_url:str"""
if self.counter > 1:
return "No more transcriptions, move on"
self.counter += 1
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(website_url, timeout=20)
response.raise_for_status() # Raise an exception for bad status codes
soup = BeautifulSoup(response.text, "html.parser")
content_div = soup.find("div", id="mw-content-text")
if not content_div:
content_div = soup.find("div")
# Only extract
and
tags
elements = content_div.find_all(["p", "table"])
# Join selected HTML chunks
html_subset = "".join(str(el) for el in elements)
# Convert the HTML content to Markdown
markdown_content = html2text.HTML2Text().handle(str(html_subset))
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
return truncate_content(markdown_content, 20000)
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def parse_youtube_video(youtube_url: str) -> str:
"""Returns text transcript of a youtube video
Args:
youtube_url: full url linking to the video to transcribe
"""
load_dotenv()
client = OpenAI()
# Configure yt-dlp to extract audio
ydl_opts = {
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "64",
}
],
"outtmpl": "%(title)s.%(ext)s",
}
with tempfile.TemporaryDirectory() as temp_dir:
ydl_opts["outtmpl"] = os.path.join(temp_dir, "%(title)s.%(ext)s")
# Download audio
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=True)
# Find the downloaded audio file
audio_file = None
for file in os.listdir(temp_dir):
if file.endswith(".mp3"):
audio_file = os.path.join(temp_dir, file)
break
if not audio_file:
raise Exception("Audio file not found")
audio = AudioSegment.from_mp3(audio_file)
chunk_length_ms = 5 * 1000 * 60
chunks = []
for i in range(0, len(audio), chunk_length_ms):
chunk = audio[i : i + chunk_length_ms]
chunk_path = os.path.join(temp_dir, f"chunk_{i // chunk_length_ms}.mp3")
chunk.export(chunk_path, format="mp3")
chunks.append(chunk_path)
# Transcribe each chunk
full_transcript = ""
for chunk_path in chunks:
with open(chunk_path, "rb") as audio_chunk:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_chunk,
)
full_transcript += transcript.text + " "
return full_transcript.strip()
class APIProcessor:
def __init__(self, file_url: str, file_name: str):
load_dotenv()
self.file_url = file_url
self.file_name = file_name
self.client = OpenAI()
def _transcribe_mp3(self, response: requests.Response) -> str:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
temp_file_path = temp_file.name
try:
with open(temp_file_path, "rb") as audio_file:
transcription = self.client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=audio_file,
)
return transcription.text
except Exception as e:
print(str(e))
finally:
os.unlink(temp_file_path)
def _transcribe_image(self, response: requests.Response) -> str:
image_bytes = response.content
base64_image = process_image_for_gpt(image_bytes)
TRANSCRIPTION_PROMPT = """Please in detail transcribe as much of the output information you can via text. Feel free to use ASCII."""
image_message = [
{"type": "text", "text": TRANSCRIPTION_PROMPT},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
},
},
]
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": image_message}],
max_tokens=1000,
)
return response.choices[0].message.content
def _transcribe_spreadsheet(self, response: requests.Response) -> str:
try:
excel_data = io.BytesIO(response.content)
excel_file = pd.ExcelFile(excel_data)
sheets = excel_file.sheet_names
all_sheets_data = {}
for sheet in sheets:
df = excel_file.parse(sheet_name=sheet)
all_sheets_data[sheet] = df.to_string()
return str(all_sheets_data)
except Exception as e:
return f"Error processing spreadsheet: {e}"
def get_and_process_attachment(self) -> str:
"""For current question, download and process the file associated if it exists.
Returns:
Parsed text output of the attachment
"""
if not self.file_name:
return "No attached file for this question"
response = requests.get(self.file_url, timeout=15)
file_extension = self.file_name.split(".")[-1]
if file_extension == "mp3":
parsed_text = self._transcribe_mp3(response)
elif file_extension == "xlsx":
parsed_text = self._transcribe_spreadsheet(response)
elif file_extension == "png":
parsed_text = self._transcribe_image(response)
else:
parsed_text = response.content
return parsed_text
if __name__ == "__main__":
# attempt to process file examples from API
# def get_file_api_url(task_id: str) -> str:
# return "https://agents-course-unit4-scoring.hf.space" + "/files/" + task_id
# audio_task_processor = APIProcessor(
# file_name="",
# file_url=get_file_api_url("8e867cd7-cff9-4e6c-867a-ff5ddc2550be"),
# )
# response = audio_task_processor.get_and_process_attachment()
# print(response)
result = parse_youtube_video("https://www.youtube.com/watch?v=1htKBjuUWec")
print(result)
# text = transcribe_webpage(
# "https://en.wikipedia.org/wiki/Mercedes_Sosa#Studio_albums"
# )
# print(text)