salmanmapkar's picture
Update app.py
5da7484
raw
history blame
6.83 kB
from __future__ import unicode_literals
import youtube_dl
from pydub import AudioSegment
from pyannote.audio import Pipeline
import re
import whisper
import os
import ffmpeg
import subprocess
import gradio as gr
import traceback
import json
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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")
__FILES = set()
def CreateFile(filename):
__FILES.add(filename)
return filename
def RemoveFile(filename):
if (os.path.exist(filename)):
os.remove(filename)
def RemoveAllFiles():
for file in __FILES:
if (os.path.exist(file)):
os.remove(file)
def Transcribe(audio="temp_audio.wav"):
def millisec(timeStr):
spl = timeStr.split(":")
s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
return s
def preprocess(audio):
t1 = 0 * 1000
t2 = 20 * 60 * 1000
newAudio = AudioSegment.from_wav(audio)
a = newAudio[t1:t2]
spacermilli = 2000
spacer = AudioSegment.silent(duration=spacermilli)
newAudio = spacer.append(a, crossfade=0)
newAudio.export(audio, format="wav")
return spacermilli, spacer
def diarization(audio):
as_audio = AudioSegment.from_wav(audio)
DEMO_FILE = {'uri': 'blabal', 'audio': audio}
dz = pipeline(DEMO_FILE)
with open(CreateFile(f"diarization_{audio}.txt"), "w") as text_file:
text_file.write(str(dz))
dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines()
dzList = []
for l in dz:
start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l))
start = millisec(start)
end = millisec(end)
lex = re.findall('(SPEAKER_[0-9][0-9])', string=l)[0]
dzList.append([start, end, lex])
sounds = spacer
segments = []
dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines()
for l in dz:
start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l))
start = millisec(start)
end = millisec(end)
segments.append(len(sounds))
sounds = sounds.append(as_audio[start:end], crossfade=0)
sounds = sounds.append(spacer, crossfade=0)
sounds.export(CreateFile(f"dz_{audio}.wav"), format="wav")
return f"dz_{audio}.wav", dzList, segments
def transcribe(dz_audio):
model = whisper.load_model("base")
result = model.transcribe(dz_audio)
# for _ in result['segments']:
# print(_['start'], _['end'], _['text'])
captions = [[((caption["start"]*1000)), ((caption["end"]*1000)), caption["text"]] for caption in result['segments']]
conversation = []
for i in range(len(segments)):
idx = 0
for idx in range(len(captions)):
if captions[idx][0] >= (segments[i] - spacermilli):
break;
while (idx < (len(captions))) and ((i == len(segments) - 1) or (captions[idx][1] < segments[i+1])):
c = captions[idx]
start = dzList[i][0] + (c[0] -segments[i])
if start < 0:
start = 0
idx += 1
if not len(conversation):
conversation.append([dzList[i][2], c[2]])
elif conversation[-1][0] == dzList[i][2]:
conversation[-1][1] += c[2]
else:
conversation.append([dzList[i][2], c[2]])
#print(f"[{dzList[i][2]}] {c[2]}")
return conversation, ("".join([f"{speaker} --> {text}\n" for speaker, text in conversation]))
spacermilli, spacer = preprocess(audio)
dz_audio, dzList, segments = diarization(audio)
conversation, t_text = transcribe(dz_audio)
try:
os.remove("temp_audio.wav")
except OSError:
pass
try:
os.remove("dz_temp_audio.wav")
except OSError:
pass
try:
os.remove(f"diarization_{audio}.txt")
except OSError:
pass
return t_text, json.dumps(conversation)
def AudioTranscribe(audio, retries=5):
if retries:
try:
subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav'])
except Exception as ex:
traceback.print_exc()
return AudioTranscribe(audio, retries-1)
if not (os.path.exist("temp_audio.wav")):
return AudioTranscribe(audio, retries-1)
return Transcribe()
else:
raise gr.Error("There is some issue ith Audio Transcriber. Please try again later!")
def VideoTranscribe(video, retries=5):
if retries:
try:
command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav"
subprocess.call(command, shell=True)
except Exception as ex:
traceback.print_exc()
return VideoTranscribe(video, retries-1)
if not (os.path.exist("temp_audio.wav")):
return VideoTranscribe(video, retries-1)
return Transcribe()
else:
raise gr.Error("There is some issue ith Video Transcriber. Please try again later!")
return Transcribe()
def YoutubeTranscribe(URL, retries = 5):
if retries:
if "youtu" not in URL.lower():
raise gr.Error(f"{URL} is not a valid youtube URL.")
else:
RemoveFile("temp_audio.wav")
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': 'temp_audio.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
}
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([URL])
except:
return YoutubeTranscribe(URL, retries-1)
stream = ffmpeg.input('temp_audio.m4a')
stream = ffmpeg.output(stream, 'temp_audio.wav')
RemoveFile("temp_audio.m4a")
return Transcribe()
else:
raise gr.Error(f"Unable to get video from {URL}")
ut = gr.Interface(
fn=YoutubeTranscribe,
inputs=gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w"),
outputs=gr.Textbox(label="Transcribed Text", lines=15)
)
vt = gr.Interface(
fn=VideoTranscribe,
inputs='video',
outputs=gr.Textbox(label="Transcribed Text", lines=15)
)
at = gr.Interface(
fn=AudioTranscribe,
inputs='audio',
outputs=gr.Textbox(label="Transcribed Text", lines=15)
)
demo = gr.TabbedInterface([ut, vt, at], ["Youtube URL", "Video", "Audio"])
demo.launch()