from __future__ import unicode_literals import youtube_dl from pydub import AudioSegment from pyannote.audio import Pipeline import re import whisper import os import ffmpeg import subprocess import gradio as gr import traceback import json pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") __FILES = set() def CreateFile(filename): __FILES.add(filename) return filename def RemoveFile(filename): if (os.path.exist(filename)): os.remove(filename) def RemoveAllFiles(): for file in __FILES: if (os.path.exist(file)): os.remove(file) def Transcribe(audio="temp_audio.wav"): def millisec(timeStr): spl = timeStr.split(":") s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000) return s def preprocess(audio): t1 = 0 * 1000 t2 = 20 * 60 * 1000 newAudio = AudioSegment.from_wav(audio) a = newAudio[t1:t2] spacermilli = 2000 spacer = AudioSegment.silent(duration=spacermilli) newAudio = spacer.append(a, crossfade=0) newAudio.export(audio, format="wav") return spacermilli, spacer def diarization(audio): as_audio = AudioSegment.from_wav(audio) DEMO_FILE = {'uri': 'blabal', 'audio': audio} dz = pipeline(DEMO_FILE) with open(CreateFile(f"diarization_{audio}.txt"), "w") as text_file: text_file.write(str(dz)) dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines() dzList = [] for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) lex = re.findall('(SPEAKER_[0-9][0-9])', string=l)[0] dzList.append([start, end, lex]) sounds = spacer segments = [] dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines() for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) segments.append(len(sounds)) sounds = sounds.append(as_audio[start:end], crossfade=0) sounds = sounds.append(spacer, crossfade=0) sounds.export(CreateFile(f"dz_{audio}.wav"), format="wav") return f"dz_{audio}.wav", dzList, segments def transcribe(dz_audio): model = whisper.load_model("base") result = model.transcribe(dz_audio) # for _ in result['segments']: # print(_['start'], _['end'], _['text']) captions = [[((caption["start"]*1000)), ((caption["end"]*1000)), caption["text"]] for caption in result['segments']] conversation = [] for i in range(len(segments)): idx = 0 for idx in range(len(captions)): if captions[idx][0] >= (segments[i] - spacermilli): break; while (idx < (len(captions))) and ((i == len(segments) - 1) or (captions[idx][1] < segments[i+1])): c = captions[idx] start = dzList[i][0] + (c[0] -segments[i]) if start < 0: start = 0 idx += 1 if not len(conversation): conversation.append([dzList[i][2], c[2]]) elif conversation[-1][0] == dzList[i][2]: conversation[-1][1] += c[2] else: conversation.append([dzList[i][2], c[2]]) #print(f"[{dzList[i][2]}] {c[2]}") return conversation, ("".join([f"{speaker} --> {text}\n" for speaker, text in conversation])) spacermilli, spacer = preprocess(audio) dz_audio, dzList, segments = diarization(audio) conversation, t_text = transcribe(dz_audio) try: os.remove("temp_audio.wav") except OSError: pass try: os.remove("dz_temp_audio.wav") except OSError: pass try: os.remove(f"diarization_{audio}.txt") except OSError: pass return t_text, json.dumps(conversation) def AudioTranscribe(audio, retries=5): if retries: try: subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav']) except Exception as ex: traceback.print_exc() return AudioTranscribe(audio, retries-1) if not (os.path.exist("temp_audio.wav")): return AudioTranscribe(audio, retries-1) return Transcribe() else: raise gr.Error("There is some issue ith Audio Transcriber. Please try again later!") def VideoTranscribe(video, retries=5): if retries: try: command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav" subprocess.call(command, shell=True) except Exception as ex: traceback.print_exc() return VideoTranscribe(video, retries-1) if not (os.path.exist("temp_audio.wav")): return VideoTranscribe(video, retries-1) return Transcribe() else: raise gr.Error("There is some issue ith Video Transcriber. Please try again later!") return Transcribe() def YoutubeTranscribe(URL, retries = 5): if retries: if "youtu" not in URL.lower(): raise gr.Error(f"{URL} is not a valid youtube URL.") else: RemoveFile("temp_audio.wav") ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': 'temp_audio.%(ext)s', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], } try: with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download([URL]) except: return YoutubeTranscribe(URL, retries-1) stream = ffmpeg.input('temp_audio.m4a') stream = ffmpeg.output(stream, 'temp_audio.wav') RemoveFile("temp_audio.m4a") return Transcribe() else: raise gr.Error(f"Unable to get video from {URL}") ut = gr.Interface( fn=YoutubeTranscribe, inputs=gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w"), outputs=gr.Textbox(label="Transcribed Text", lines=15) ) vt = gr.Interface( fn=VideoTranscribe, inputs='video', outputs=gr.Textbox(label="Transcribed Text", lines=15) ) at = gr.Interface( fn=AudioTranscribe, inputs='audio', outputs=gr.Textbox(label="Transcribed Text", lines=15) ) demo = gr.TabbedInterface([ut, vt, at], ["Youtube URL", "Video", "Audio"]) demo.launch()