import torch import gradio as gr import yt_dlp as youtube_dl import numpy as np from datasets import Dataset, Audio from scipy.io import wavfile from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read import tempfile import os import time import demucs.api MODEL_NAME = "openai/whisper-large-v3" # "patrickvonplaten/wav2vec2-large-960h-lv60-self-4-gram" # DEMUCS_MODEL_NAME = "htdemucs_ft" BATCH_SIZE = 8 FILE_LIMIT_MB = 1000 YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files device = 0 if torch.cuda.is_available() else "cpu" pipe = pipeline( task="automatic-speech-recognition", model=MODEL_NAME, chunk_length_s=30, device=device, ) separator = demucs.api.Separator(model = DEMUCS_MODEL_NAME, ) def separate_vocal(path): origin, separated = separator.separate_audio_file(path) demucs.api.save_audio(separated["vocals"], path, samplerate=separator.samplerate) return path def transcribe(inputs_path, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken | None, progress=gr.Progress()): if inputs_path is None: raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") if dataset_name is None: raise gr.Error("No dataset name submitted! Please submit a dataset name. Should be in the format : / or /. Also accepts , which will default to the namespace of the logged-in user.") if oauth_token is None: gr.Warning("Make sure to click and login before using this demo.") return [["transcripts will appear here"]], "" total_step = 4 current_step = 0 current_step += 1 progress((current_step, total_step), desc="Transcribe using Whisper.") sampling_rate, inputs = wavfile.read(inputs_path) out = pipe(inputs_path, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True) text = out["text"] current_step += 1 progress((current_step, total_step), desc="Merge chunks.") chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, sampling_rate) current_step += 1 progress((current_step, total_step), desc="Create dataset.") transcripts = [] audios = [] with tempfile.TemporaryDirectory() as tmpdirname: for i,chunk in enumerate(progress.tqdm(chunks, desc="Creating dataset (and clean audio if asked for)")): # TODO: make sure 1D or 2D? arr = chunk["audio"] path = os.path.join(tmpdirname, f"{i}.wav") wavfile.write(path, sampling_rate, arr) if use_demucs == "separate-audio": # use demucs tp separate vocals print(f"Separating vocals #{i}") path = separate_vocal(path) audios.append(path) transcripts.append(chunk["text"]) dataset = Dataset.from_dict({"audio": audios, "text": transcripts}).cast_column("audio", Audio()) current_step += 1 progress((current_step, total_step), desc="Push dataset.") dataset.push_to_hub(dataset_name, token=oauth_token.token if oauth_token else oauth_token) return [[transcript] for transcript in transcripts], text def _return_yt_html_embed(yt_url): video_id = yt_url.split("?v=")[-1] HTML_str = ( f'
' "
" ) return HTML_str def download_yt_audio(yt_url, filename): info_loader = youtube_dl.YoutubeDL() try: info = info_loader.extract_info(yt_url, download=False) except youtube_dl.utils.DownloadError as err: raise gr.Error(str(err)) file_length = info["duration_string"] file_h_m_s = file_length.split(":") file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] if len(file_h_m_s) == 1: file_h_m_s.insert(0, 0) if len(file_h_m_s) == 2: file_h_m_s.insert(0, 0) file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] if file_length_s > YT_LENGTH_LIMIT_S: yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: ydl.download([yt_url]) except youtube_dl.utils.ExtractorError as err: raise gr.Error(str(err)) def yt_transcribe(yt_url, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken | None, max_filesize=75.0, dataset_sampling_rate = 24000, progress=gr.Progress()): if yt_url is None: raise gr.Error("No youtube link submitted! Please put a working link.") if dataset_name is None: raise gr.Error("No dataset name submitted! Please submit a dataset name. Should be in the format : / or /. Also accepts , which will default to the namespace of the logged-in user.") total_step = 5 current_step = 0 html_embed_str = _return_yt_html_embed(yt_url) if oauth_token is None: gr.Warning("Make sure to click and login before using this demo.") return html_embed_str, [["transcripts will appear here"]], "" current_step += 1 progress((current_step, total_step), desc="Load video.") with tempfile.TemporaryDirectory() as tmpdirname: filepath = os.path.join(tmpdirname, "video.mp4") download_yt_audio(yt_url, filepath) with open(filepath, "rb") as f: inputs_path = f.read() inputs = ffmpeg_read(inputs_path, pipe.feature_extractor.sampling_rate) inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} current_step += 1 progress((current_step, total_step), desc="Transcribe using Whisper.") out = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True) text = out["text"] inputs = ffmpeg_read(inputs_path, dataset_sampling_rate) current_step += 1 progress((current_step, total_step), desc="Merge chunks.") chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, dataset_sampling_rate) current_step += 1 progress((current_step, total_step), desc="Create dataset.") transcripts = [] audios = [] with tempfile.TemporaryDirectory() as tmpdirname: for i,chunk in enumerate(progress.tqdm(chunks, desc="Creating dataset (and clean audio if asked for).")): # TODO: make sure 1D or 2D? arr = chunk["audio"] path = os.path.join(tmpdirname, f"{i}.wav") wavfile.write(path, dataset_sampling_rate, arr) if use_demucs == "separate-audio": # use demucs tp separate vocals print(f"Separating vocals #{i}") path = separate_vocal(path) audios.append(path) transcripts.append(chunk["text"]) dataset = Dataset.from_dict({"audio": audios, "text": transcripts}).cast_column("audio", Audio()) current_step += 1 progress((current_step, total_step), desc="Push dataset.") dataset.push_to_hub(dataset_name, token=oauth_token.token if oauth_token else oauth_token) return html_embed_str, [[transcript] for transcript in transcripts], text def naive_postprocess_whisper_chunks(chunks, audio_array, sampling_rate, stop_chars = ".!:;?", min_duration = 5): # merge chunks as long as merged audio duration is lower than min_duration and that a stop character is not met # return list of dictionnaries (text, audio) # min duration is in seconds min_duration = int(min_duration * sampling_rate) new_chunks = [] while chunks: current_chunk = chunks.pop(0) begin, end = current_chunk["timestamp"] begin, end = int(begin*sampling_rate), int(end*sampling_rate) current_dur = end-begin text = current_chunk["text"] chunk_to_concat = [audio_array[begin:end]] while chunks and (text[-1] not in stop_chars or (current_dur