# For downloading from youtube and transcribing audio from pytube import YouTube from moviepy.editor import * from pydub import AudioSegment from pydub.utils import make_chunks import pydub from yt_dlp import YoutubeDL from pathlib import Path import subprocess # For getting text from PDF from zipfile import ZipFile from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import TextConverter from pdfminer.layout import LAParams from pdfminer.pdfpage import PDFPage from io import StringIO # For transcription import openai, whisper, torch from faster_whisper import WhisperModel import tiktoken from nltk import tokenize # For other stuff import os, re import time, math from threading import Thread # USEFUL CONSTANTS # Duration is set to 6 minutes = 360 seconds = 360000 milliseconds DURATION = 360000 # Maximum audio file size is 18MB MAX_FILE_SIZE_BYTES = 18000000 # The model to use for transcription WHISPER_MODEL = "tiny" MODEL_SIZE = "base" class DownloadAudio: """Downloads the audio from a youtube video and saves it to multiple .wav files in the specified folder""" def __init__(self, link) -> None: self.link = link with YoutubeDL() as ydl: self.yt = ydl.extract_info(self.link, download=False) self.YOUTUBE_VIDEO_ID = link.split("=")[1] self.WAV_FILE_NAME = f"{self.YOUTUBE_VIDEO_ID}.wav" def get_yt_title(self) -> str: """Returns the title of the youtube video""" return self.yt["title"] def download(self, pathname:str) -> list: """ Download the audio from the youtube video and saves it to multiple .wav files in the specified folder. Returns a list of the paths to the .wav files. """ # Check if the folder for the VIDEO_ID exists if not os.path.exists(pathname): os.mkdir(pathname) FINAL_WAV_PATH = f"{pathname}/{self.WAV_FILE_NAME}" if not os.path.exists(FINAL_WAV_PATH): print("\n\n\n DOWNLOADING AUDIO \n\n\n") current_dir = os.getcwd() print(current_dir) executable_path = os.path.join(current_dir, "exec/yt-dlp_linux") # Download the video as an audio file using youtube-dl original_download_path = f"{pathname}/audio.wav" result = subprocess.run([executable_path, "-x", "--audio-format", "wav", "-o", original_download_path, self.link]) if result.returncode != 0: print("Failed to download audio. Retrying...") return "FAILED" sound = AudioSegment.from_wav(original_download_path) sound.set_channels(1) sound = sound.set_frame_rate(16000) sound = sound.set_channels(1) sound.export(FINAL_WAV_PATH, format="wav") os.remove(original_download_path) # Load the input .wav file audio = AudioSegment.from_wav(FINAL_WAV_PATH) # Get the duration of the input file in milliseconds total_byte_size = os.path.getsize(FINAL_WAV_PATH) # If the total duration is less than the duration of each segment, # then just return the original file if total_byte_size < MAX_FILE_SIZE_BYTES: return [FINAL_WAV_PATH] # Get the size of the wav file channels = audio.channels sample_width = audio.sample_width duration_in_sec = math.ceil(len(audio) / 1000) sample_rate = audio.frame_rate bit_rate = sample_width * 8 wav_file_size = (sample_rate * bit_rate * channels * duration_in_sec) / 8 # Get the length of each chunk in milliseconds and make the chunks chunk_length_in_sec = math.ceil((duration_in_sec * MAX_FILE_SIZE_BYTES ) / wav_file_size) #in sec chunk_length_ms = chunk_length_in_sec * 1000 chunks = make_chunks(audio, chunk_length_ms) # Export all of the individual chunks as wav files chunk_names = [] for i, chunk in enumerate(chunks): print(f"exporting chunk {i}") chunk_name = f"{self.YOUTUBE_VIDEO_ID}_{i}.wav" output_chunk_path = f"{pathname}/{chunk_name}" chunk_names.append(output_chunk_path) chunk.export(f"{output_chunk_path}", format="wav") return chunk_names class VideoTranscription: """Performs transcription on a PDF or a link to a youtube video""" def __init__(self, datalink) -> None: self.datalink = datalink self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") self.model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8") openai.api_key = os.environ.get("OPENAI_API_KEY") def transcribe(self) -> dict: """Returns the transcription of the PDF or youtube video as a string""" start_time = time.time() if self.datalink.startswith("http"): transcript = self.get_text_from_link() else: transcript = self.get_text_from_pdf() end_time = time.time() print(f"transcription took {end_time - start_time} seconds") return transcript def get_text_from_link(self) -> dict: # Get the names of the stored wav files YOUTUBE_VIDEO_ID = self.datalink.split("=")[1] FOLDER_NAME = f"./tests/{YOUTUBE_VIDEO_ID}" # Get the audio file audio_file = DownloadAudio(self.datalink) # Get the names of the stored wav files file_names = audio_file.download(FOLDER_NAME) print("FILE NAMES", file_names) text_transcriptions = [""] * len(file_names) def perform_transcription(file_name, i): print("transcribing", file_name, " for ", i) chunk_segments, _ = self.model.transcribe(file_name, beam_size=5) for chunk_segment in chunk_segments: text_transcriptions[i] += chunk_segment.text.replace("$", "\$") # Initialize the threads threads = [] for i, file_name in enumerate(file_names): threads.append(Thread(target=perform_transcription, args=(file_name, i))) # Start the threads for thread in threads: thread.start() # Wait for the threads to finish for thread in threads: thread.join() # Get the transcription of each audio chunk # for file_name in file_names: # Get the transcription # chunk_segments, _ = self.model.transcribe(original_file_name, beam_size=5) # for chunk_segment in chunk_segments: # text_transcriptions += chunk_segment.text.replace("$", "\$") final_text_transcription = " ".join(text_transcriptions) # Tokenize each sentence of the transcription. sentences = tokenize.sent_tokenize(final_text_transcription) segments = [] for i, sentence in enumerate(sentences): segment = { "id":i, "text":sentence, "tokens":self.encoding.encode(sentence) } segments.append(segment) final_transcription = { "title": audio_file.get_yt_title(), "text": final_text_transcription, "segments": segments } return final_transcription class AudioTranscription: """Performs transcription on a MP3 file""" def __init__(self, audio_file) -> None: self.file = audio_file self.title = self.file.name self.folder_name = f"./tests/{self.title}".replace(' ', '') self.folder_name = self.folder_name[:self.folder_name.rindex('.')] self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") self.model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8") openai.api_key = os.environ.get("OPENAI_API_KEY") def get_redacted_name(self): return self.folder_name def transcribe(self) -> dict: """Returns the transcription of the MP3 audio as a string""" start_time = time.time() if not os.path.exists(self.folder_name): os.mkdir(self.folder_name) if self.title.endswith('wav'): audio = pydub.AudioSegment.from_wav(self.file) file_type = 'wav' elif self.title.endswith('mp3'): audio = pydub.AudioSegment.from_mp3(self.file) file_type = 'mp3' save_path = Path(self.folder_name) / self.file.name audio.export(save_path, format=file_type) final_wav_path = save_path if file_type == 'mp3': sound = AudioSegment.from_mp3(save_path) final_wav_path = self.folder_name + "/" + self.title[:-4]+'.wav' sound.export(final_wav_path, format="wav") chunk_segments, info = self.model.transcribe(final_wav_path, beam_size=5) text_transcriptions = "" for chunk_segment in chunk_segments: text_transcriptions += chunk_segment.text.replace("$", "\$") # Tokenize each sentence of the transcription. sentences = tokenize.sent_tokenize(text_transcriptions) segments = [] for i, sentence in enumerate(sentences): segment = { "id":i, "text":sentence, "tokens":self.encoding.encode(sentence) } segments.append(segment) final_transcription = { "title": self.title, "text": text_transcriptions, "segments": segments } end_time = time.time() print(f"transcription took {end_time - start_time} seconds") return final_transcription def convert_pdf_to_txt_pages(path): texts = [] rsrcmgr = PDFResourceManager() retstr = StringIO() laparams = LAParams() device = TextConverter(rsrcmgr, retstr, laparams=laparams) interpreter = PDFPageInterpreter(rsrcmgr, device) size = 0 c = 0 file_pages = PDFPage.get_pages(path) nbPages = len(list(file_pages)) for page in PDFPage.get_pages(path): interpreter.process_page(page) t = retstr.getvalue() if c == 0: texts.append(t) else: texts.append(t[size:]) c = c + 1 size = len(t) device.close() retstr.close() return texts, nbPages class PDFTranscription: def __init__(self, pdf_file): self.file = pdf_file self.title = pdf_file.name self.folder_name = f"./tests/{self.title}".replace(' ', '') self.folder_name = self.folder_name[:self.folder_name.rindex('.')] self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") def get_redacted_name(self): return self.folder_name def transcribe(self): text, nbpages = convert_pdf_to_txt_pages(self.file) pdf_transcription = ''.join(text) sentences = tokenize.sent_tokenize(pdf_transcription) segments = [] for i, sentence in enumerate(sentences): segment = { "id":i, "text":sentence, "tokens":self.encoding.encode(sentence) } segments.append(segment) final_transcription = { "title":self.title, "text":pdf_transcription, "segments":segments } return final_transcription