from __future__ import unicode_literals import youtube_dl from pydub import AudioSegment from pyannote.audio import Pipeline import re import whisper import os import ffmpeg import subprocess import gradio as gr import traceback import json pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") from pydub.effects import speedup import moviepy.editor as mp __FILES = set() SPEAKER_DICT = {} SPEAKERS = [] def GetSpeaker(sp): speaker = sp if sp not in list(SPEAKER_DICT.keys): if len(SPEAKERS): SPEAKER_DICT[sp] = SPEAKERS.pop(0) speaker = SPEAKER_DICT[sp] else: speaker = SPEAKER_DICT[sp] return speaker def GenerateSpeakerDict(sp): SPEAKERS = [speaker.strip() for speaker in sp.split(',')] def CreateFile(filename): __FILES.add(filename) return filename def RemoveFile(filename): if (os.path.isfile(filename)): os.remove(filename) def RemoveAllFiles(): for file in __FILES: if (os.path.isfile(file)): os.remove(file) def Transcribe(NumberOfSpeakers, SpeakerNames="", audio="temp_audio.wav"): def millisec(timeStr): spl = timeStr.split(":") s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000) return s def preprocess(audio): t1 = 0 * 1000 t2 = 20 * 60 * 1000 newAudio = AudioSegment.from_wav(audio) a = newAudio[t1:t2] spacermilli = 2000 spacer = AudioSegment.silent(duration=spacermilli) newAudio = spacer.append(a, crossfade=0) newAudio.export(audio, format="wav") return spacermilli, spacer def diarization(audio): as_audio = AudioSegment.from_wav(audio) DEMO_FILE = {'uri': 'blabal', 'audio': audio} dz = pipeline(DEMO_FILE) with open(CreateFile(f"diarization_{audio}.txt"), "w") as text_file: text_file.write(str(dz)) dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines() dzList = [] for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) lex = GetSpeaker(re.findall('(SPEAKER_[0-9][0-9])', string=l)[0]) dzList.append([start, end, lex]) sounds = spacer segments = [] dz = open(CreateFile(f"diarization_{audio}.txt")).read().splitlines() for l in dz: start, end = tuple(re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=l)) start = millisec(start) end = millisec(end) segments.append(len(sounds)) sounds = sounds.append(as_audio[start:end], crossfade=0) sounds = sounds.append(spacer, crossfade=0) sounds.export(CreateFile(f"dz_{audio}.wav"), format="wav") return f"dz_{audio}.wav", dzList, segments def transcribe(dz_audio): model = whisper.load_model("base") result = model.transcribe(dz_audio) # for _ in result['segments']: # print(_['start'], _['end'], _['text']) captions = [[((caption["start"]*1000)), ((caption["end"]*1000)), caption["text"]] for caption in result['segments']] conversation = [] for i in range(len(segments)): idx = 0 for idx in range(len(captions)): if captions[idx][0] >= (segments[i] - spacermilli): break; while (idx < (len(captions))) and ((i == len(segments) - 1) or (captions[idx][1] < segments[i+1])): c = captions[idx] start = dzList[i][0] + (c[0] -segments[i]) if start < 0: start = 0 idx += 1 if not len(conversation): conversation.append([dzList[i][2], c[2]]) elif conversation[-1][0] == dzList[i][2]: conversation[-1][1] += c[2] else: conversation.append([dzList[i][2], c[2]]) #print(f"[{dzList[i][2]}] {c[2]}") return conversation, ("".join([f"{speaker} --> {text}\n" for speaker, text in conversation])) GenerateSpeakerDict(SpeakerNames) spacermilli, spacer = preprocess(audio) dz_audio, dzList, segments = diarization(audio) conversation, t_text = transcribe(dz_audio) RemoveAllFiles() return (t_text, ({ "data": [{"speaker": speaker, "text": text} for speaker, text in conversation]})) def AudioTranscribe(NumberOfSpeakers=None, SpeakerNames="", audio="", retries=5): if retries: subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav']) # try: # subprocess.call(['ffmpeg', '-i', audio,'temp_audio.wav']) # except Exception as ex: # traceback.print_exc() # return AudioTranscribe(NumberOfSpeakers, SpeakerNames, audio, retries-1) if not (os.path.isfile("temp_audio.wav")): return AudioTranscribe(NumberOfSpeakers, SpeakerNames, audio, retries-1) return Transcribe(NumberOfSpeakers, SpeakerNames) else: raise gr.Error("There is some issue ith Audio Transcriber. Please try again later!") def VideoTranscribe(NumberOfSpeakers=None, SpeakerNames="", video="", retries=5): if retries: # command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav" # subprocess.call(command, shell=True) clip = mp.VideoFileClip(video) clip.audio.write_audiofile("temp_audio.wav") # try: # command = f"ffmpeg -i {video} -ab 160k -ac 2 -ar 44100 -vn temp_audio.wav" # subprocess.call(command, shell=True) # except Exception as ex: # traceback.print_exc() # return VideoTranscribe(NumberOfSpeakers, SpeakerNames, video, retries-1) if not (os.path.isfile("temp_audio.wav")): return VideoTranscribe(NumberOfSpeakers, SpeakerNames, video, retries-1) return Transcribe(NumberOfSpeakers, SpeakerNames) else: raise gr.Error("There is some issue ith Video Transcriber. Please try again later!") return Transcribe(NumberOfSpeakers, SpeakerNames) def YoutubeTranscribe(NumberOfSpeakers=None, SpeakerNames="", URL="", retries = 5): if retries: if "youtu" not in URL.lower(): raise gr.Error(f"{URL} is not a valid youtube URL.") else: RemoveFile("temp_audio.wav") ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': 'temp_audio.%(ext)s', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], } try: with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download([URL]) except: return YoutubeTranscribe(NumberOfSpeakers, SpeakerNames, URL, retries-1) stream = ffmpeg.input('temp_audio.m4a') stream = ffmpeg.output(stream, 'temp_audio.wav') RemoveFile("temp_audio.m4a") return Transcribe(NumberOfSpeakers, SpeakerNames) else: raise gr.Error(f"Unable to get video from {URL}") ut = gr.Interface( fn=YoutubeTranscribe, inputs=[gr.Number(label="Number of Speakers", placeholder="2"), gr.Textbox(label="Name of the Speakers (ordered by the time they speak and separated by comma)", placeholder="If Speaker 1 is first to speak followed by Speaker 2 then -> Speaker 1, Speaker 2"), gr.Textbox(label="Youtube Link", placeholder="https://www.youtube.com/watch?v=GECcjrYHH8w"),], outputs=[gr.Textbox(label="Transcribed Text", lines=15), gr.JSON(label="Transcribed JSON")] ) vt = gr.Interface( fn=VideoTranscribe, inputs=[gr.Number(label="Number of Speakers", placeholder="2"), gr.Textbox(label="Name of the Speakers (ordered by the time they speak and separated by comma)", placeholder="If Speaker 1 is first to speak followed by Speaker 2 then -> Speaker 1, Speaker 2"), 'video'], outputs=[gr.Textbox(label="Transcribed Text", lines=15), gr.JSON(label="Transcribed JSON")] ) at = gr.Interface( fn=AudioTranscribe, inputs=[gr.Number(label="Number of Speakers", placeholder="2"), gr.Textbox(label="Name of the Speakers (ordered by the time they speak and separated by comma)", placeholder="If Speaker 1 is first to speak followed by Speaker 2 then -> Speaker 1, Speaker 2"), 'audio'], outputs=[gr.Textbox(label="Transcribed Text", lines=15), gr.JSON(label="Transcribed JSON")] ) demo = gr.TabbedInterface([ut, vt, at], ["Youtube URL", "Video", "Audio"]) demo.launch()