Spaces:
Runtime error
Runtime error
import whisper | |
import pandas as pd | |
import whisper | |
import subprocess | |
from simple_diarizer.diarizer import Diarizer | |
import streamlit as st | |
import base64 | |
import tempfile | |
def create_download_link(val, filename, label): | |
'''Hack to have a stable download link in Streamlit''' | |
b64 = base64.b64encode(val) | |
return f'<a href="data:application/octet-stream;base64,{b64.decode()}" download="{filename}">{label}</a>' | |
def segment(nu_speakers): | |
'''Segment the audio using simple_diarizer. | |
Defaults to the speechbrain ECAPA-TDNN embeddings.''' | |
diar = Diarizer(embed_model='ecapa',cluster_method='sc') | |
segments = diar.diarize(temp_file, num_speakers=nu_speakers) | |
sdf = pd.DataFrame(segments) | |
# reorganize so the first speaker is always speaker 1 | |
speaker_s = sdf['label'].drop_duplicates().reset_index()['label'] | |
speaker_d = dict((v,k+1) for k,v in speaker_s.items()) | |
sdf['speaker'] = sdf['label'].replace(speaker_d) | |
return sdf | |
def monotize(uploaded): | |
'''Convert the upload file to audio file.''' | |
cmd = f"ffmpeg -y -i {uploaded} -acodec pcm_s16le -ar 16000 -ac 1 {temp_file}" | |
subprocess.Popen(cmd, shell=True).wait() | |
def audio_to_df(uploaded): | |
'''Turn the upload file in a segemented dataframe.''' | |
#monotize(uploaded) | |
model = whisper.load_model(model_size) | |
result = model.transcribe(temp_file, | |
without_timestamps=False, | |
task = task) | |
tdf = pd.DataFrame(result['segments']) | |
return tdf | |
def add_preface(row): | |
''' Add speaker prefix to transcript during transcribe().''' | |
text = row['text'].replace('\n','') | |
speaker = row['speaker'] | |
return f'Speaker {speaker}: {text}' | |
def transcribe(uploaded, nu_speakers): | |
# Convert file to mono | |
with st.spinner(text="Converting file..."): | |
monotize('temp_audio') | |
# Make audio available to play in UI | |
audio_file = open(temp_file, 'rb') | |
audio_bytes = audio_file.read() | |
st.audio(temp_file, format='audio/wav') | |
# trancibe file | |
with st.spinner(text=f"Transcribing using {model_size} model..."): | |
tdf = audio_to_df(uploaded) | |
# segement file | |
with st.spinner(text="Segmenting..."): | |
sdf = segment(nu_speakers) | |
# Find the nearest transcript line to the start of each speaker | |
ns_list = sdf[['start','speaker']].to_dict(orient='records') | |
for row in ns_list: | |
input = row['start'] | |
id = tdf.iloc[(tdf['start']-input).abs().argsort()[:1]]['id'].values[0] | |
tdf.loc[tdf['id'] ==id, 'speaker'] = row['speaker'] | |
tdf['speaker'].fillna(method = 'ffill', inplace = True) | |
tdf['speaker'].fillna(method = 'bfill', inplace = True) | |
tdf['n1'] = tdf['speaker'] != tdf['speaker'].shift(1) | |
tdf['speach'] = tdf['n1'].cumsum() | |
# collaps the dataframe by speach turn. | |
binned_df = tdf.groupby(['speach', 'speaker'])['text'].apply('\n'.join).reset_index() | |
binned_df['speaker'] = binned_df['speaker'].astype(int) | |
binned_df['output'] = binned_df.apply(add_preface, axis=1) | |
# Display the transcript and prepare for export | |
lines = [] | |
for row in binned_df['output'].values: | |
st.write(row) | |
lines.append(row) | |
tdf['speaker'] = tdf['speaker'].astype(int) | |
tdf_cols = ['speaker','start','end','text'] | |
#st.dataframe(tdf[tdf_cols]) | |
return {'text':lines, 'df': tdf[tdf_cols]} | |
descript = ("This web app creates transcripts using OpenAI's [Whisper](https://github.com/openai/whisper) to transcribe " | |
"audio files combined with [Chau](https://github.com/cvqluu)'s [Simple Diarizer](https://github.com/cvqluu/simple_diarizer) " | |
"to partition the text by speaker.\n" | |
"* You can upload an audio or video file of up to 200MBs.\n" | |
"* Creating the transcript takes some time. " | |
"The process takes approximately 20% of the length of the audio file using the base Whisper model.\n " | |
"* The transcription process handles a variety of languages, and can also translate the audio to English. The tiny model is not good at translating. \n" | |
"* Speaker segmentation seems to work best with the base model. The small model produces better transcripts, but something seems off with the timecodes, degrading the speaker attribution. \n" | |
"* After uploading the file, be sure to select the number of speakers." ) | |
st.title("Automated Transcription") | |
st.markdown(descript) | |
form = st.form(key='my_form') | |
uploaded = form.file_uploader("Choose a file") | |
nu_speakers = form.slider('Number of speakers in recording:', min_value=1, max_value=8, value=2, step=1) | |
models = form.selectbox( | |
'Which Whisper model?', | |
('Tiny (fast)', 'Base (good)', 'Small (great but slow)', 'Medium (greater but slower)'), index=1) | |
translate = form.checkbox('Translate to English?') | |
submit = form.form_submit_button("Transcribe!") | |
if submit: | |
if models == 'Tiny (fast)': | |
model_size = 'tiny' | |
elif models == 'Base (good)': | |
model_size ='base' | |
elif models == 'Small (great but slow)': | |
model_size = 'small' | |
elif models == 'Medium (greater but slower)': | |
model_size = 'medium' | |
if translate == True: | |
task = 'translate' | |
else: | |
task = 'transcribe' | |
#temporary file to store audio_file | |
tmp_dir = tempfile.TemporaryDirectory() | |
temp_file = tmp_dir.name + '/mono.wav' | |
bytes_data = uploaded.getvalue() | |
with open('temp_audio', 'wb') as outfile: | |
outfile.write(bytes_data) | |
# Transcribe/translate and segment | |
transcript = transcribe('temp_audio', nu_speakers) | |
# Prepare text file for export. | |
text = '\n'.join(transcript['text']).encode('utf-8') | |
download_url = create_download_link(text, 'transcript.txt', 'Download transcript as plain text.') | |
st.markdown(download_url, unsafe_allow_html=True) | |
# prepare CSV file for expport. | |
csv = transcript['df'].to_csv( float_format='%.2f', index=False).encode('utf-8') | |
download_url = create_download_link(csv, 'transcript.csv', 'Download transcript as CSV (with time codes)') | |
st.markdown(download_url, unsafe_allow_html=True) | |
tmp_dir.cleanup() | |