from __future__ import unicode_literals import youtube_dl from pydub import AudioSegment from pyannote.audio import Pipeline import re import whisper import os import ffmpeg import subprocess import gradio as gr import traceback pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") def Transcribe(audio="temp_audio.wav"): def millisec(timeStr): spl = timeStr.split(":") s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000) return s def preprocess(audio): t1 = 0 * 1000 t2 = 20 * 60 * 1000 newAudio = AudioSegment.from_wav(audio) a = newAudio[t1:t2] spacermilli = 2000 spacer = AudioSegment.silent(duration=spacermilli) newAudio = spacer.append(a, crossfade=0) newAudio.export(audio, format="wav") return spacermilli, spacer def diarization(audio): as_audio = AudioSegment.from_wav(audio) DEMO_FILE = {'uri': 'blabal', 'audio': audio} dz = pipeline(DEMO_FILE) with open(f"diarization_{audio}.txt", "w") as text_file: text_file.write(str(dz)) dz = open(f"diarization_{audio}.txt").read().splitlines() dzList = [] for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) lex = re.findall('(SPEAKER_[0-9][0-9])', string=l)[0] dzList.append([start, end, lex]) sounds = spacer segments = [] dz = open(f"diarization_{audio}.txt").read().splitlines() for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) segments.append(len(sounds)) sounds = sounds.append(as_audio[start:end], crossfade=0) sounds = sounds.append(spacer, crossfade=0) sounds.export(f"dz_{audio}.wav", format="wav") return f"dz_{audio}.wav", dzList, segments def transcribe(dz_audio): model = whisper.load_model("base") result = model.transcribe(dz_audio) # for _ in result['segments']: # print(_['start'], _['end'], _['text']) captions = [[((caption["start"]*1000)), ((caption["end"]*1000)), caption["text"]] for caption in result['segments']] conversation = [] for i in range(len(segments)): idx = 0 for idx in range(len(captions)): if captions[idx][0] >= (segments[i] - spacermilli): break; while (idx < (len(captions))) and ((i == len(segments) - 1) or (captions[idx][1] < segments[i+1])): c = captions[idx] start = dzList[i][0] + (c[0] -segments[i]) if start < 0: start = 0 idx += 1 if not len(conversation): conversation.append([dzList[i][2], c[2]]) elif conversation[-1][0] == dzList[i][2]: conversation[-1][1] += c[2] else: conversation.append([dzList[i][2], c[2]]) #print(f"[{dzList[i][2]}] {c[2]}") return ("".join([f"{speaker} --> {text}\n" for speaker, text in conversation])) spacermilli, spacer = preprocess(audio) dz_audio, dzList, segments = diarization(audio) t_text = transcribe(dz_audio) try: os.remove("temp_audio.wav") except OSError: pass try: os.remove("dz_temp_audio.wav") except OSError: pass try: os.remove(f"diarization_{audio}.txt") except OSError: pass return t_text # subprocess.call(['ffmpeg', '-i', 'audio.mp3', # 'audio.wav']) def AudioTranscribe(audio, retries=5): if retries: try: subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav']) except Exception as ex: traceback.print_exc() return AudioTranscribe(audio, retries-1) if not (os.path.exist("temp_audio.wav")): return AudioTranscribe(audio, retries-1) return Transcribe() else: raise gr.Error("There is some issue ith Audio Transcriber. Please try again later!") def VideoTranscribe(video): command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav" subprocess.call(command, shell=True) return Transcribe() def YoutubeTranscribe(URL, retries = 5): if retries: if "youtu" not in URL.lower(): raise gr.Error(f"{URL} is not a valid youtube URL.") else: try: os.remove("temp_audio.wav") except OSError: pass ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': 'temp_audio.%(ext)s', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], } try: with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download([URL]) except: return YoutubeTranscribe(URL, retries-1) stream = ffmpeg.input('temp_audio.m4a') stream = ffmpeg.output(stream, 'temp_audio.wav') try: os.remove("temp_audio.m4a") except OSError: pass return Transcribe() else: raise gr.Error(f"Unable to get video from {URL}") ut = gr.Interface( fn=YoutubeTranscribe, inputs=gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w"), outputs=gr.Textbox(label="Transcribed Text", lines=15) ) vt = gr.Interface( fn=VideoTranscribe, inputs='video', outputs=gr.Textbox(label="Transcribed Text", lines=15) ) at = gr.Interface( fn=AudioTranscribe, inputs='audio', outputs=gr.Textbox(label="Transcribed Text", lines=15) ) demo = gr.TabbedInterface([ut, vt, at], ["Youtube URL", "Video", "Audio"]) demo.launch() # YoutubeTranscribe('https://www.youtube.com/watch?v=GECcjrYHH8w')