|
import streamlit as st |
|
import sparknlp |
|
import os |
|
import pandas as pd |
|
import librosa |
|
|
|
from sparknlp.base import * |
|
from sparknlp.common import * |
|
from sparknlp.annotator import * |
|
from pyspark.ml import Pipeline |
|
from sparknlp.pretrained import PretrainedPipeline |
|
from pyspark.sql.types import * |
|
import pyspark.sql.functions as F |
|
|
|
|
|
st.set_page_config( |
|
layout="wide", |
|
initial_sidebar_state="auto" |
|
) |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.main-title { |
|
font-size: 36px; |
|
color: #4A90E2; |
|
font-weight: bold; |
|
text-align: center; |
|
} |
|
.section { |
|
background-color: #f9f9f9; |
|
padding: 10px; |
|
border-radius: 10px; |
|
margin-top: 10px; |
|
} |
|
.section p, .section ul { |
|
color: #666666; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
@st.cache_resource |
|
def init_spark(): |
|
"""Initialize Spark NLP.""" |
|
return sparknlp.start() |
|
|
|
@st.cache_resource |
|
def create_pipeline(model): |
|
"""Create a Spark NLP pipeline for audio processing.""" |
|
audio_assembler = AudioAssembler() \ |
|
.setInputCol("audio_content") \ |
|
.setOutputCol("audio_assembler") |
|
|
|
speech_to_text = Wav2Vec2ForCTC \ |
|
.pretrained(model)\ |
|
.setInputCols("audio_assembler") \ |
|
.setOutputCol("text") |
|
|
|
pipeline = Pipeline(stages=[ |
|
audio_assembler, |
|
speech_to_text |
|
]) |
|
return pipeline |
|
|
|
def fit_data(pipeline, fed_data): |
|
"""Fit the data into the pipeline and return the transcription.""" |
|
data, sampling_rate = librosa.load(fed_data, sr=16000) |
|
data = data.tolist() |
|
spark_df = spark.createDataFrame([[data]], ["audio_content"]) |
|
|
|
model = pipeline.fit(spark_df) |
|
lp = LightPipeline(model) |
|
lp_result = lp.fullAnnotate(data)[0] |
|
return lp_result |
|
|
|
def save_uploadedfile(uploadedfile, path): |
|
"""Save the uploaded file to the specified path.""" |
|
filepath = os.path.join(path, uploadedfile.name) |
|
with open(filepath, "wb") as f: |
|
if hasattr(uploadedfile, 'getbuffer'): |
|
f.write(uploadedfile.getbuffer()) |
|
else: |
|
f.write(uploadedfile.read()) |
|
|
|
|
|
model_list = [ |
|
"asr_wav2vec2_large_xlsr_53_english_by_jonatasgrosman", |
|
"asr_wav2vec2_base_100h_13K_steps", |
|
"asr_wav2vec2_base_100h_ngram", |
|
"asr_wav2vec2_base_100h_by_facebook", |
|
"asr_wav2vec2_base_100h_test", |
|
"asr_wav2vec2_base_960h" |
|
] |
|
|
|
model = st.sidebar.selectbox( |
|
"Choose the pretrained model", |
|
model_list, |
|
help="For more info about the models visit: https://sparknlp.org/models" |
|
) |
|
|
|
|
|
st.markdown('<div class="main-title">Speech Recognition With Wav2Vec2ForCTC</div>', unsafe_allow_html=True) |
|
st.markdown('<div class="section"><p>This demo transcribes audio files into texts using the <code>Wav2Vec2ForCTC</code> Annotator and advanced speech recognition models.</p></div>', unsafe_allow_html=True) |
|
|
|
|
|
st.sidebar.markdown('Reference notebook:') |
|
st.sidebar.markdown(""" |
|
<a href="https://githubtocolab.com/JohnSnowLabs/spark-nlp-workshop/blob/master/open-source-nlp/17.0.Automatic_Speech_Recognition_Wav2Vec2.ipynb"> |
|
<img src="https://colab.research.google.com/assets/colab-badge.svg" style="zoom: 1.3" alt="Open In Colab"/> |
|
</a> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
AUDIO_FILE_PATH = "inputs" |
|
audio_files = sorted(os.listdir(AUDIO_FILE_PATH)) |
|
|
|
selected_audio = st.selectbox("Select an audio", audio_files) |
|
|
|
|
|
audio_file_types = ["mp3", "flac", "wav", "aac", "ogg", "aiff", "wma", "m4a", "ape", "dsf", "dff", "midi", "mid", "opus", "amr"] |
|
uploadedfile = st.file_uploader("Try it for yourself!", type=audio_file_types) |
|
|
|
if uploadedfile: |
|
selected_audio = f"{AUDIO_FILE_PATH}/{uploadedfile.name}" |
|
save_uploadedfile(uploadedfile, AUDIO_FILE_PATH) |
|
elif selected_audio: |
|
selected_audio = f"{AUDIO_FILE_PATH}/{selected_audio}" |
|
|
|
|
|
st.subheader("Play Audio") |
|
|
|
with open(selected_audio, 'rb') as audio_file: |
|
audio_bytes = audio_file.read() |
|
st.audio(audio_bytes) |
|
|
|
spark = init_spark() |
|
pipeline = create_pipeline(model) |
|
output = fit_data(pipeline, selected_audio) |
|
|
|
st.subheader(f"Transcription:") |
|
st.markdown(f"{(output['text'][0].result).title()}") |