Spaces:
Runtime error
Runtime error
# !pip install youtube-dl | |
from __future__ import unicode_literals | |
import youtube_dl | |
from pydub import AudioSegment | |
from pyannote.audio import Pipeline | |
import re | |
import webvtt | |
import whisper | |
import os | |
from pydub.utils import which | |
import ffmpeg | |
import webvtt | |
import pprint | |
from urllib.error import HTTPError | |
import subprocess | |
import gradio as gr | |
import traceback | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") | |
def Transcribe(audio="temp_audio.wav"): | |
def millisec(timeStr): | |
spl = timeStr.split(":") | |
s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000) | |
return s | |
def preprocess(audio): | |
t1 = 0 * 1000 | |
t2 = 20 * 60 * 1000 | |
newAudio = AudioSegment.from_wav(audio) | |
a = newAudio[t1:t2] | |
spacermilli = 2000 | |
spacer = AudioSegment.silent(duration=spacermilli) | |
newAudio = spacer.append(a, crossfade=0) | |
newAudio.export(audio, format="wav") | |
return spacermilli, spacer | |
def diarization(audio): | |
as_audio = AudioSegment.from_wav(audio) | |
DEMO_FILE = {'uri': 'blabal', 'audio': audio} | |
dz = pipeline(DEMO_FILE) | |
with open(f"diarization_{audio}.txt", "w") as text_file: | |
text_file.write(str(dz)) | |
dz = open(f"diarization_{audio}.txt").read().splitlines() | |
dzList = [] | |
for l in dz: | |
start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) | |
start = millisec(start) | |
end = millisec(end) | |
lex = re.findall('(SPEAKER_[0-9][0-9])', string=l)[0] | |
dzList.append([start, end, lex]) | |
sounds = spacer | |
segments = [] | |
dz = open(f"diarization_{audio}.txt").read().splitlines() | |
for l in dz: | |
start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) | |
start = millisec(start) | |
end = millisec(end) | |
segments.append(len(sounds)) | |
sounds = sounds.append(as_audio[start:end], crossfade=0) | |
sounds = sounds.append(spacer, crossfade=0) | |
sounds.export(f"dz_{audio}.wav", format="wav") | |
return f"dz_{audio}.wav", dzList, segments | |
def transcribe(dz_audio): | |
model = whisper.load_model("base") | |
result = model.transcribe(dz_audio) | |
# for _ in result['segments']: | |
# print(_['start'], _['end'], _['text']) | |
captions = [[((caption["start"]*1000)), ((caption["end"]*1000)), caption["text"]] for caption in result['segments']] | |
conversation = [] | |
for i in range(len(segments)): | |
idx = 0 | |
for idx in range(len(captions)): | |
if captions[idx][0] >= (segments[i] - spacermilli): | |
break; | |
while (idx < (len(captions))) and ((i == len(segments) - 1) or (captions[idx][1] < segments[i+1])): | |
c = captions[idx] | |
start = dzList[i][0] + (c[0] -segments[i]) | |
if start < 0: | |
start = 0 | |
idx += 1 | |
if not len(conversation): | |
conversation.append([dzList[i][2], c[2]]) | |
elif conversation[-1][0] == dzList[i][2]: | |
conversation[-1][1] += c[2] | |
else: | |
conversation.append([dzList[i][2], c[2]]) | |
#print(f"[{dzList[i][2]}] {c[2]}") | |
return ("".join([f"{speaker} --> {text}\n" for speaker, text in conversation])) | |
spacermilli, spacer = preprocess(audio) | |
dz_audio, dzList, segments = diarization(audio) | |
t_text = transcribe(dz_audio) | |
try: | |
os.remove("temp_audio.wav") | |
except OSError: | |
pass | |
try: | |
os.remove("dz_temp_audio.wav") | |
except OSError: | |
pass | |
try: | |
os.remove(f"diarization_{audio}.txt") | |
except OSError: | |
pass | |
return t_text | |
# subprocess.call(['ffmpeg', '-i', 'audio.mp3', | |
# 'audio.wav']) | |
def AudioTranscribe(audio, retries=5): | |
if retries: | |
try: | |
subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav']) | |
except Exception as ex: | |
traceback.print_exc() | |
return AudioTranscribe(audio, retries-1) | |
if not (os.path.exist("temp_audio.wav")): | |
return AudioTranscribe(audio, retries-1) | |
return Transcribe() | |
else: | |
raise gr.Error("There is some issue ith Audio Transcriber. Please try again later!") | |
def VideoTranscribe(video): | |
command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav" | |
subprocess.call(command, shell=True) | |
return Transcribe() | |
def YoutubeTranscribe(URL, retries = 5): | |
if retries: | |
if "youtu" not in URL.lower(): | |
raise gr.Error(f"{URL} is not a valid youtube URL.") | |
else: | |
try: | |
os.remove("temp_audio.wav") | |
except OSError: | |
pass | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': 'temp_audio.%(ext)s', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
}], | |
} | |
try: | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([URL]) | |
except: | |
return YoutubeTranscribe(URL, retries-1) | |
stream = ffmpeg.input('temp_audio.m4a') | |
stream = ffmpeg.output(stream, 'temp_audio.wav') | |
try: | |
os.remove("temp_audio.m4a") | |
except OSError: | |
pass | |
return Transcribe() | |
else: | |
raise gr.Error(f"Unable to get video from {URL}") | |
# with gr.Blocks() as i: | |
# with gr.Row(): | |
# with gr.Column(): | |
# with gr.Row(): | |
# video = gr.Video() | |
# audio = gr.Audio() | |
# text = gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w") | |
# btn = gr.Button("Run") | |
# with gr.Row(): | |
# output = gr.Textbox(label="Transcribed Text", lines=15) | |
# if not video and not text: | |
# raise gr.Error("Either input url or video (not both)") | |
# else: | |
# btn.click(fn=YoutubeTranscribe, inputs=text, outputs=output) | |
# i.launch() | |
ut = gr.Interface( | |
fn=YoutubeTranscribe, | |
inputs=gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w"), | |
outputs=gr.Textbox(label="Transcribed Text", lines=15) | |
) | |
vt = gr.Interface( | |
fn=VideoTranscribe, | |
inputs='video', | |
outputs=gr.Textbox(label="Transcribed Text", lines=15) | |
) | |
at = gr.Interface( | |
fn=AudioTranscribe, | |
inputs='audio', | |
outputs=gr.Textbox(label="Transcribed Text", lines=15) | |
) | |
demo = gr.TabbedInterface([ut, vt, at], ["Youtube URL", "Video", "Audio"]) | |
demo.launch() | |
# YoutubeTranscribe('https://www.youtube.com/watch?v=GECcjrYHH8w') |