import requests from dotenv import load_dotenv from openai import OpenAI from utils import process_image_for_gpt import pandas as pd import tempfile import os import io import yt_dlp import re import html2text from requests.exceptions import RequestException from bs4 import BeautifulSoup from pydub import AudioSegment def add_numbers(*nums: list[int]) -> int: """Add a list of numbers Args: nums: list of numbers""" def transcribe_image_from_url(image_url: str) -> str: """Only works with full http urls""" client = OpenAI() response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ { "type": "text", "text": """Please transcribe all text visible in this image. Extract the text exactly as it appears, maintaining formatting when possible. If there's no readable text, respond with 'No text found in image'.""", }, { "type": "image_url", "image_url": { "url": image_url, "detail": "high", }, }, ], } ], max_tokens=1000, temperature=0, ) transcribed_text = response.choices[0].message.content.strip() return transcribed_text def truncate_content(content: str, max_length: int = 10000) -> str: if len(content) <= max_length: return content else: return content[:max_length] class WebPageTranscription: def __init__(self): self.counter = 0 def transcribe_webpage(self, website_url: str) -> str: """Visits website url and returns markdown of contents Args: website_url:str""" if self.counter > 1: return "No more transcriptions, move on" self.counter += 1 try: # Send a GET request to the URL with a 20-second timeout response = requests.get(website_url, timeout=20) response.raise_for_status() # Raise an exception for bad status codes soup = BeautifulSoup(response.text, "html.parser") content_div = soup.find("div", id="mw-content-text") if not content_div: content_div = soup.find("div") # Only extract

and tags elements = content_div.find_all(["p", "table"]) # Join selected HTML chunks html_subset = "".join(str(el) for el in elements) # Convert the HTML content to Markdown markdown_content = html2text.HTML2Text().handle(str(html_subset)) # Remove multiple line breaks markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) return truncate_content(markdown_content, 20000) except requests.exceptions.Timeout: return "The request timed out. Please try again later or check the URL." except RequestException as e: return f"Error fetching the webpage: {str(e)}" except Exception as e: return f"An unexpected error occurred: {str(e)}" def parse_youtube_video(youtube_url: str) -> str: """Returns text transcript of a youtube video Args: youtube_url: full url linking to the video to transcribe """ load_dotenv() client = OpenAI() # Configure yt-dlp to extract audio ydl_opts = { "format": "bestaudio/best", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "64", } ], "outtmpl": "%(title)s.%(ext)s", } with tempfile.TemporaryDirectory() as temp_dir: ydl_opts["outtmpl"] = os.path.join(temp_dir, "%(title)s.%(ext)s") # Download audio with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=True) # Find the downloaded audio file audio_file = None for file in os.listdir(temp_dir): if file.endswith(".mp3"): audio_file = os.path.join(temp_dir, file) break if not audio_file: raise Exception("Audio file not found") audio = AudioSegment.from_mp3(audio_file) chunk_length_ms = 5 * 1000 * 60 chunks = [] for i in range(0, len(audio), chunk_length_ms): chunk = audio[i : i + chunk_length_ms] chunk_path = os.path.join(temp_dir, f"chunk_{i // chunk_length_ms}.mp3") chunk.export(chunk_path, format="mp3") chunks.append(chunk_path) # Transcribe each chunk full_transcript = "" for chunk_path in chunks: with open(chunk_path, "rb") as audio_chunk: transcript = client.audio.transcriptions.create( model="whisper-1", file=audio_chunk, ) full_transcript += transcript.text + " " return full_transcript.strip() class APIProcessor: def __init__(self, file_url: str, file_name: str): load_dotenv() self.file_url = file_url self.file_name = file_name self.client = OpenAI() def _transcribe_mp3(self, response: requests.Response) -> str: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: for chunk in response.iter_content(chunk_size=8192): temp_file.write(chunk) temp_file_path = temp_file.name try: with open(temp_file_path, "rb") as audio_file: transcription = self.client.audio.transcriptions.create( model="gpt-4o-transcribe", file=audio_file, ) return transcription.text except Exception as e: print(str(e)) finally: os.unlink(temp_file_path) def _transcribe_image(self, response: requests.Response) -> str: image_bytes = response.content base64_image = process_image_for_gpt(image_bytes) TRANSCRIPTION_PROMPT = """Please in detail transcribe as much of the output information you can via text. Feel free to use ASCII.""" image_message = [ {"type": "text", "text": TRANSCRIPTION_PROMPT}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}", }, }, ] response = self.client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": image_message}], max_tokens=1000, ) return response.choices[0].message.content def _transcribe_spreadsheet(self, response: requests.Response) -> str: try: excel_data = io.BytesIO(response.content) excel_file = pd.ExcelFile(excel_data) sheets = excel_file.sheet_names all_sheets_data = {} for sheet in sheets: df = excel_file.parse(sheet_name=sheet) all_sheets_data[sheet] = df.to_string() return str(all_sheets_data) except Exception as e: return f"Error processing spreadsheet: {e}" def get_and_process_attachment(self) -> str: """For current question, download and process the file associated if it exists. Returns: Parsed text output of the attachment """ if not self.file_name: return "No attached file for this question" response = requests.get(self.file_url, timeout=15) file_extension = self.file_name.split(".")[-1] if file_extension == "mp3": parsed_text = self._transcribe_mp3(response) elif file_extension == "xlsx": parsed_text = self._transcribe_spreadsheet(response) elif file_extension == "png": parsed_text = self._transcribe_image(response) else: parsed_text = response.content return parsed_text if __name__ == "__main__": # attempt to process file examples from API # def get_file_api_url(task_id: str) -> str: # return "https://agents-course-unit4-scoring.hf.space" + "/files/" + task_id # audio_task_processor = APIProcessor( # file_name="", # file_url=get_file_api_url("8e867cd7-cff9-4e6c-867a-ff5ddc2550be"), # ) # response = audio_task_processor.get_and_process_attachment() # print(response) result = parse_youtube_video("https://www.youtube.com/watch?v=1htKBjuUWec") print(result) # text = transcribe_webpage( # "https://en.wikipedia.org/wiki/Mercedes_Sosa#Studio_albums" # ) # print(text)