Spaces:
Sleeping
Sleeping
# Importing required libraries | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import os | |
import librosa | |
import time | |
from matplotlib import cm | |
import soundfile as sf | |
import torch | |
import torch.nn as nn | |
from PIL import Image | |
import torch.nn.functional as F | |
import streamlit as st | |
import tempfile | |
import noisereduce as nr | |
import pyaudio | |
import wave | |
import whisper | |
from transformers import ( | |
HubertForSequenceClassification, | |
Wav2Vec2FeatureExtractor, | |
AutoModel, | |
AutoTokenizer, | |
HubertForSequenceClassification, | |
AutoModelForCausalLM | |
) | |
from streamlit.components.v1 import html | |
# Mapping Hubert model's output to GPT input | |
emo2promptMapping = { | |
'Angry':'ANGRY', | |
'Calm':'CALM', | |
'Disgust':'DISGUSTED', | |
'Fearful':'FEARFUL', | |
'Happy': 'HAPPY', | |
'Sad': 'SAD', | |
'Surprised': 'SURPRISED' | |
} | |
# Check if GPU (cuda) is available | |
if torch.cuda.is_available(): | |
device = torch.device('cuda') | |
else: | |
device = torch.device('cpu') | |
#Load speech to text model | |
speech_model = whisper.load_model("base") | |
#Define Labels related info | |
num_labels=7 | |
label_mapping = ['angry', 'calm', 'disgust', 'fearful', 'happy', 'sad', 'surprised'] | |
# Define the model's name from the Hugging Face model hub | |
model_weights_path = "https://huggingface.co/netgvarun2005/MultiModalBertHubert/resolve/main/MultiModal_model_state_dict.pth" | |
# Model name initialization | |
model_id = "facebook/hubert-base-ls960" | |
bert_model_name = "bert-base-uncased" | |
def open_page(url): | |
""" | |
Function to invoke javascript code to redirect to an external URL. | |
Parameters: | |
External URL to redirect to. | |
Returns: | |
None | |
""" | |
open_script= """ | |
<script type="text/javascript"> | |
window.open('%s', '_blank').focus(); | |
</script> | |
""" % (url) | |
html(open_script) | |
def config(): | |
""" | |
Configure the Streamlit application settings and styles. | |
This function sets the page configuration, including the title and icon, adds custom CSS styles | |
for specific elements, and defines a custom style for the application title. | |
Parameters: | |
None | |
Returns: | |
None | |
""" | |
# Loading Image using PIL | |
im = Image.open('./config/icon.png') | |
# Set the page configuration with the title and icon | |
st.set_page_config(page_title="Virtual Therapist", page_icon=im) | |
# Add custom CSS styles | |
st.markdown(""" | |
<style> | |
.mobile-screen { | |
border: 2px solid black; | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
justify-content: center; /* Align content in the middle */ | |
height: 20vh; /* Reduce the height of the box */ | |
padding: 20px; /* Reduce padding */ | |
border-radius: 10px; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Define a custom style for your title | |
title_style = """ | |
<style> | |
h1 { | |
font-family: 'Comic Sans MS', cursive, sans-serif; | |
color: blue; | |
font-size: 22px; /* Add font size here */ | |
} | |
</style> | |
""" | |
# Display the title with the custom style | |
st.markdown(title_style, unsafe_allow_html=True) | |
st.markdown("# WELCOME! HOW ARE YOU FEELING? PLEASE RECORD AN AUDIO!", unsafe_allow_html=True) | |
st.markdown("# BASED ON YOUR EMOTIONAL STATE, I WILL SUGGEST SOME TIPS!", unsafe_allow_html=True) | |
return | |
class MultimodalModel(nn.Module): | |
''' | |
Custom PyTorch model that takes as input both the audio features and the text embeddings, and concatenates the last hidden states from the Hubert and BERT models. | |
''' | |
def __init__(self, bert_model_name, num_labels): | |
super().__init__() | |
self.hubert = HubertForSequenceClassification.from_pretrained("netgvarun2005/HubertStandaloneEmoDetector", num_labels=num_labels).hubert | |
self.bert = AutoModel.from_pretrained(bert_model_name) | |
self.classifier = nn.Linear(self.hubert.config.hidden_size + self.bert.config.hidden_size, num_labels) | |
def forward(self, input_values, text): | |
hubert_output = self.hubert(input_values).last_hidden_state | |
bert_output = self.bert(text).last_hidden_state | |
# Apply mean pooling along the sequence dimension | |
hubert_output = hubert_output.mean(dim=1) | |
bert_output = bert_output.mean(dim=1) | |
concat_output = torch.cat((hubert_output, bert_output), dim=-1) | |
logits = self.classifier(concat_output) | |
return logits | |
def speechtoText(wavfile): | |
""" | |
Convert speech from a WAV audio file to text using a pre-trained Whisper ASR model. | |
This function takes a WAV audio file as input and utilizes a pre-trained Whisper ASR model | |
to transcribe the speech into text. | |
Parameters: | |
wavfile (str): The file path to the input WAV audio file. | |
Returns: | |
str: The transcribed text from the speech in the audio file. | |
""" | |
return speech_model.transcribe(wavfile)['text'] | |
def resampleaudio(wavfile): | |
""" | |
Resample an audio file to a target sample rate and save it back to the same file. | |
This function loads an audio file in WAV format, resamples it to the specified target sample rate, | |
and then saves the resampled audio back to the same file, overwriting the original content. | |
Parameters: | |
wavfile (str): The file path to the input WAV audio file. | |
Returns: | |
str: The file path to the resampled WAV audio file. | |
""" | |
audio, sr = librosa.load(wavfile, sr=None) | |
# Set the desired target sample rate | |
target_sample_rate = 16000 | |
# Resample the audio to the target sample rate | |
resampled_audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sample_rate) | |
# Write to the original file | |
sf.write(wavfile,resampled_audio, target_sample_rate) | |
return wavfile | |
def noiseReduction(wavfile): | |
""" | |
Apply noise reduction to an audio file and save the denoised audio back to the same file. | |
This function loads an audio file in WAV format, performs noise reduction using the specified parameters, | |
and then saves the denoised audio back to the same file, overwriting the original content. | |
Parameters: | |
wavfile (str): The file path to the input WAV audio file. | |
Returns: | |
str: The file path to the denoised WAV audio file. | |
""" | |
audio, sr = librosa.load(wavfile, sr=None) | |
# Set parameters for noise reduction | |
n_fft = 2048 # FFT window size | |
hop_length = 512 # Hop length for STFT | |
# Perform noise reduction | |
reduced_noise = nr.reduce_noise(y=audio, sr=sr, n_fft=n_fft, hop_length=hop_length) | |
# Save the denoised audio to a new WAV file | |
sf.write(wavfile,reduced_noise, sr) | |
return wavfile | |
def removeSilence(wavfile): | |
""" | |
Remove silence from an audio file and save the trimmed audio back to the same file. | |
This function loads an audio file in WAV format, identifies and removes silence based on a specified threshold, | |
and then saves the trimmed audio back to the same file, overwriting the original content. | |
Parameters: | |
wavfile (str): The file path to the input WAV audio file. | |
Returns: | |
str: The file path to the audio file with silence removed. | |
""" | |
# Load the audio file | |
audio_file = wavfile | |
audio, sr = librosa.load(audio_file, sr=None) | |
# Split the audio file based on silence | |
clips = librosa.effects.split(audio, top_db=40) | |
# Combine the audio clips | |
non_silent_audio = [] | |
for start, end in clips: | |
non_silent_audio.extend(audio[start:end]) | |
# Save the audio without silence to a new WAV file | |
sf.write(wavfile,non_silent_audio, sr) | |
return wavfile | |
def preprocessWavFile(wavfile): | |
""" | |
Perform a series of audio preprocessing steps on a WAV file. | |
This function takes an input WAV audio file, applies a series of preprocessing steps, | |
including resampling, noise reduction, and silence removal, and returns the path to the | |
preprocessed audio file. | |
Parameters: | |
wavfile (str): The file path to the input WAV audio file. | |
Returns: | |
str: The file path to the preprocessed WAV audio file. | |
""" | |
resampledwavfile = resampleaudio(wavfile) | |
denoised_file = noiseReduction(resampledwavfile) | |
return removeSilence(denoised_file) | |
def load_model(): | |
""" | |
Load and configure various models and tokenizers for a multi-modal application. | |
This function loads a multi-modal model and its weights from a specified source, | |
initializes tokenizers for the model and an additional language model, and returns | |
these components for use in a multi-modal application. | |
Returns: | |
tuple: A tuple containing the following components: | |
- multiModel (MultimodalModel): The multi-modal model. | |
- tokenizer (AutoTokenizer): Tokenizer for the multi-modal model. | |
- model_gpt (AutoModelForCausalLM): Language model for text generation. | |
- tokenizer_gpt (AutoTokenizer): Tokenizer for the language model. | |
""" | |
# Load the model | |
multiModel = MultimodalModel(bert_model_name, num_labels) | |
# Load the model weights and tokenizer directly from Hugging Face Spaces | |
multiModel.load_state_dict(torch.hub.load_state_dict_from_url(model_weights_path, map_location=device), strict=False) | |
tokenizer = AutoTokenizer.from_pretrained("netgvarun2005/MultiModalBertHubertTokenizer") | |
# GenAI | |
tokenizer_gpt = AutoTokenizer.from_pretrained("netgvarun2005/GPTTherapistDeepSpeedTokenizer", pad_token='<|pad|>',bos_token='<|startoftext|>',eos_token='<|endoftext|>') | |
model_gpt = AutoModelForCausalLM.from_pretrained("netgvarun2005/GPTTherapistDeepSpeedModel") | |
return multiModel,tokenizer,model_gpt,tokenizer_gpt | |
def predict(audio_array,multiModal_model,key,tokenizer,text): | |
""" | |
Perform multimodal prediction using an audio feature array and text input. | |
This function takes an audio feature array and text as input, tokenizes the text, | |
extracts audio features, and uses a multi-modal model to predict a class label based on | |
the combined audio and text inputs. | |
Parameters: | |
audio_array (numpy.ndarray): A numpy array containing audio features. | |
multiModal_model: The multi-modal model for prediction. | |
key: A key for identifying the model (e.g., model_id). | |
tokenizer: Tokenizer for processing the text input. | |
text (str): The input text for prediction. | |
Returns: | |
str: The predicted class label. | |
""" | |
# Tokenize the input text | |
input_text = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
# Extract audio features using a feature extractor | |
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_id) | |
input_audio = feature_extractor( | |
raw_speech=audio_array, | |
sampling_rate=16000, | |
padding=True, | |
return_tensors="pt" | |
) | |
# Make predictions with the multi-modal model | |
logits = multiModal_model(input_audio["input_values"], input_text["input_ids"]) | |
# Calculate class probabilities | |
probabilities = F.softmax(logits, dim=1).to_dense() | |
_, predicted = torch.max(probabilities, 1) | |
class_prob = probabilities.tolist() | |
class_prob = class_prob[0] | |
class_prob = [round(value, 2) for value in class_prob] | |
maxVal = np.argmax(class_prob) | |
# Display the final transcript and handle inference issues | |
if label_mapping[predicted] == "": | |
st.write("Inference impossible, a problem occurred with your audio or your parameters, we apologize :(") | |
return (label_mapping[maxVal]).capitalize() | |
def GenerateText(emo,gpt_tokenizer,gpt_model,t_val,t_k,t_p): | |
""" | |
Generate text based on a given emotion using a GPT-2 model. | |
This function takes an emotion as input, generates text based on the emotion prompt, | |
and displays multiple generated text samples. | |
Parameters: | |
emo (str): The emotion for which text should be generated. | |
gpt_tokenizer: Tokenizer for processing the GPT-2 model input. | |
gpt_model: The GPT-2 model for text generation. | |
Returns: | |
None | |
""" | |
# Create a prompt based on the input emotion | |
prompt = f'<startoftext>{emo2promptMapping[emo]}:' | |
# Tokenize the prompt and convert it to input tensors | |
generated = gpt_tokenizer(prompt, return_tensors="pt").input_ids | |
# Move the generated tensor and GPT model to the specified device (e.g., GPU) | |
generated = generated.to(device) | |
gpt_model.to(device) | |
# Generate multiple text samples based on the prompt | |
sample_outputs = gpt_model.generate(generated, do_sample=True, top_k=t_k, | |
max_length=30, top_p=t_p, temperature=t_val, num_return_sequences=10)#,no_repeat_ngram_size=1) | |
# Extract and split the generated text into words | |
outputs = set([gpt_tokenizer.decode(sample_output, skip_special_tokens=True).split(':')[-1] for sample_output in sample_outputs]) | |
# Display the generated text samples with a delay for readability | |
for i, sample_output in enumerate(outputs): | |
st.write(f"<span style='font-size: 18px; font-family: Arial, sans-serif; font-weight: bold;'>{i+1}: {sample_output}</span>", unsafe_allow_html=True) | |
time.sleep(0.5) | |
def process_file(ser_model,tokenizer,gpt_model,gpt_tokenizer): | |
""" | |
Process and analyze an uploaded WAV file, generating transcriptions and helpful tips. | |
This function allows users to upload a WAV audio file, processes the file to obtain transcriptions, | |
predicts the user's emotional state, and displays helpful tips based on the predicted emotion. | |
Parameters: | |
ser_model: The emotion analysis model for predicting emotions. | |
tokenizer: Tokenizer for processing text inputs. | |
gpt_model: The GPT-3 model for generating text. | |
gpt_tokenizer: Tokenizer for processing GPT-3 model inputs. | |
Returns: | |
None | |
""" | |
emo = "" | |
button_label1 = "Show Helpful Tips (More Creative)" | |
button_label2 = "Show Helpful Tips (More Balanced)" | |
uploaded_file = st.file_uploader("Upload your file! It should be .wav", type=["wav"]) | |
if uploaded_file is not None: | |
# Read the content of the uploaded file | |
audio_content = uploaded_file.read() | |
# Display audio file | |
st.audio(audio_content, format="audio/wav") | |
# Save the audio content to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: | |
temp_filename = temp_file.name | |
temp_file.write(audio_content) | |
try: | |
audio_array, sr = librosa.load(preprocessWavFile(temp_filename), sr=None) | |
#with st.spinner(st.markdown("<p style='font-size: 16px; font-weight: bold;'>Generating transcriptions in the side pane! Please wait...</p>", unsafe_allow_html=True)): | |
with st.spinner("Generating transcriptions in the side pane.Please wait..."): | |
transcription = speechtoText(temp_filename) | |
emo = predict(audio_array,ser_model,2,tokenizer,transcription) | |
# Display the transcription in a textbox | |
st.sidebar.text_area("Transcription", transcription, height=25) | |
except: | |
st.write("Inference impossible, a problem occurred with your audio or your parameters, we apologize :(") | |
txt = f"You seem to be <b>{(emo2promptMapping[emo]).capitalize()}!</b>\n Click on 'Show Helpful Tips' button to proceed further." | |
st.markdown(f"<div class='mobile-screen' style='font-size: 24px;'>{txt} </div>", unsafe_allow_html=True) | |
# Create two columns for the buttons | |
col1, col2 = st.columns([1,1]) | |
# Store the value of emo in the session state | |
st.session_state.emo = emo | |
if col1.button(button_label1): | |
with st.spinner("Generating tips (it may take upto 2-3 mins). Please wait..."): | |
# Retrieve prompt from the emotion | |
emo = st.session_state.emo | |
# Call the function for GENAI | |
temp=0.9 | |
top_k=50 | |
top_p=0.8 | |
GenerateText(emo,gpt_tokenizer,gpt_model,temp,top_k,top_p) | |
if col2.button(button_label2): | |
with st.spinner("Generating tips (it may take upto 2-3 mins). Please wait..."): | |
# Retrieve prompt from the emotion | |
emo = st.session_state.emo | |
temp=0.2 | |
top_k=90 | |
top_p=0.95 | |
# Call the function for GENAI | |
GenerateText(emo,gpt_tokenizer,gpt_model,temp,top_k,top_p) | |
def main(): | |
""" | |
Main function for running a Streamlit-based multi-modal text generation application. | |
This function configures the Streamlit application, loads necessary models and tokenizers, | |
and allows users to process audio files to generate transcriptions and helpful tips. | |
Returns: | |
None | |
""" | |
config() | |
if st.sidebar.button("**Open External Audio Recorder!**"): | |
open_page("https://voice-recorder-online.com/") | |
# Load the models, and tokenizers | |
ser_model,tokenizer,gpt_model,gpt_tokenizer = load_model() | |
# Process and analyze uploaded audio files | |
process_file(ser_model,tokenizer,gpt_model,gpt_tokenizer) | |
if __name__ == '__main__': | |
main() | |