Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from PIL import Image | |
import librosa | |
import time | |
from datetime import datetime | |
import pandas as pd | |
HOME_DIR = "" | |
local_config_path = 'config.json' | |
local_preprocessor_config_path = 'preprocessor_config.json' | |
local_weights_path = 'pytorch_model.bin' | |
local_training_args_path = 'training_args.bin' | |
import torch | |
import torch.nn.functional as F | |
import numpy as np | |
from tqdm import tqdm | |
# Define the id2label mapping | |
id2label = { | |
0: "angry", | |
1: "disgust", | |
2: "fear", | |
3: "happy", | |
4: "neutral", | |
5: "sad", | |
6: "surprise" | |
} | |
def predict(model, feature_extractor, data, max_length, id2label): | |
# Extract features | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), ":Extracting features...") | |
inputs = feature_extractor(data, sampling_rate=16000, max_length=max_length, return_tensors='tf', padding=True, truncation=True) | |
torch_inputs = torch.tensor(inputs['input_values'].numpy(), dtype=torch.float32) | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), ":Predicting...") | |
# Forward pass | |
outputs = model(input_values=torch_inputs) | |
# Extract logits from the output | |
logits = outputs | |
# Apply softmax to get probabilities | |
probabilities = F.softmax(logits, dim=-1) | |
# Get the predicted class index | |
predicted_class_idx = torch.argmax(probabilities, dim=-1).item() | |
predicted_label = id2label[predicted_class_idx] | |
#predicted_label = predicted_class_idx | |
return predicted_label, probabilities | |
from transformers import Wav2Vec2Config, Wav2Vec2Model | |
import torch.nn as nn | |
from huggingface_hub import PyTorchModelHubMixin | |
config = Wav2Vec2Config.from_pretrained(local_config_path) | |
class Wav2Vec2ForSpeechClassification(nn.Module, PyTorchModelHubMixin): | |
def __init__(self, config): | |
super(Wav2Vec2ForSpeechClassification, self).__init__() | |
self.wav2vec2 = Wav2Vec2Model(config) | |
self.classifier = nn.ModuleDict({ | |
'dense': nn.Linear(config.hidden_size, config.hidden_size), | |
'activation': nn.ReLU(), | |
'dropout': nn.Dropout(config.final_dropout), | |
'out_proj': nn.Linear(config.hidden_size, config.num_labels) | |
}) | |
def forward(self, input_values): | |
outputs = self.wav2vec2(input_values) | |
hidden_states = outputs.last_hidden_state | |
x = self.classifier['dense'](hidden_states[:, 0, :]) | |
x = self.classifier['activation'](x) | |
x = self.classifier['dropout'](x) | |
logits = self.classifier['out_proj'](x) | |
return logits | |
import json | |
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Processor | |
# Load the preprocessor configuration from the local file | |
with open(local_preprocessor_config_path, 'r') as file: | |
preprocessor_config = json.load(file) | |
# Initialize the preprocessor using the loaded configuration | |
feature_extractor = Wav2Vec2FeatureExtractor( | |
do_normalize=preprocessor_config["do_normalize"], | |
feature_extractor_type=preprocessor_config["feature_extractor_type"], | |
feature_size=preprocessor_config["feature_size"], | |
padding_side=preprocessor_config["padding_side"], | |
padding_value=preprocessor_config["padding_value"], | |
processor_class_from_name=preprocessor_config["processor_class"], | |
return_attention_mask=preprocessor_config["return_attention_mask"], | |
sampling_rate=preprocessor_config["sampling_rate"] | |
) | |
# load the newly finetuned model from huggingface repo | |
from huggingface_hub import hf_hub_download | |
model_path = hf_hub_download( | |
repo_id="kvilla/wav2vec-english-speech-emotion-recognition-finetuned", | |
filename="model_finetuned.pth" | |
) | |
# load the newly finetuned model! from local | |
saved_model = torch.load(model_path, map_location=torch.device('cpu')) | |
# Create the model with the loaded configuration | |
model = Wav2Vec2ForSpeechClassification(config=config) | |
# Load the state dictionary | |
model.load_state_dict(saved_model) | |
print("Model initialized successfully.") | |
model.eval() | |
def recognize_emotion(audio): | |
# Load the audio file using librosa | |
sample_rate, audio_data = audio | |
# Ensure audio data is in floating-point format | |
if not np.issubdtype(audio_data.dtype, np.floating): | |
audio_data = audio_data.astype(np.float32) | |
# If you still want to process it with librosa, e.g., to change sample rate: | |
if sample_rate != 16000: | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), ":Resampling audio...") | |
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) | |
emotion, probabilities = predict(model, feature_extractor, audio_data, 48000, id2label) # limit to 3seconds | |
print(probabilities) | |
probs = probabilities.detach().numpy().flatten().tolist() | |
print(probs) | |
# Convert probabilities to percentages | |
percentages = [round(prob * 100, 2) for prob in probs] | |
print(percentages) | |
# Define the class labels (adjust to match your specific model's class labels) | |
labels = ["angry", "disgust", "fear", "happy", "neutral", "sad", "surprise"] | |
print(labels) | |
# Create a DataFrame | |
df = pd.DataFrame({"Emotion": labels, "Probability (%)": percentages}) | |
df = df.sort_values(by="Probability (%)", ascending=False) | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), df) | |
return emotion, get_emotion_image(emotion), df | |
def get_emotion_image(emotion): | |
# Here, you would have a dictionary or logic to map emotions to images | |
emotion_to_image = { | |
"angry": "angry.jpeg", | |
"disgust": "disgust.jpeg", | |
"fear": "fear.jpeg", | |
"happy": "happy.jpeg", | |
"neutral": "neutral.jpeg", | |
"sad": "sad.jpeg", | |
"surprise": "surprise.jpeg" | |
# Add other emotions and their corresponding images | |
} | |
# Default image if emotion is not found | |
image_path = emotion_to_image.get(emotion, "default.jpg") | |
# Load and return the image | |
return Image.open(image_path) | |
demo = gr.Blocks() | |
with demo: | |
df_logs = pd.DataFrame(columns=['Timestamp', 'Emotion']) | |
theme= gr.themes.Soft(), | |
audio_input = gr.Audio(type="numpy", | |
sources=["microphone"], | |
show_label=True, | |
streaming=True | |
) | |
text_output = gr.Textbox(label="Recognized Emotion") | |
output_df = gr.DataFrame(label="Emotion Probabilities") | |
image_output = gr.Image(label="Emotion Image", scale = 1, interactive = False) | |
df_logs = gr.DataFrame(label="Output Logs", headers = ['Timestamp', 'Emotion']) | |
def process_audio(audio, emotion, image, state, df_probs, df_logs): | |
current_time = time.time() | |
if state is None or (current_time - state >= 10): | |
state = current_time | |
emotion, image, df_probs = recognize_emotion(audio) | |
# Sample prediction data | |
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
# Create a dictionary for the new row | |
new_row = {'Timestamp': timestamp, 'Emotion': emotion} | |
# Append the new row to the DataFrame | |
df_logs = pd.concat([df_logs, pd.DataFrame([new_row])], ignore_index=True) | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), "Predicted emotion: ", emotion) | |
return emotion, image, state, df_probs, df_logs | |
else: | |
print(datetime.now().strftime('%Y-%m-%d%H:%M:%S'), "Not yet time") | |
return emotion, image, state, df_probs, df_logs | |
# Automatically call the recognize_em otion function when audio is recorded | |
state = gr.State(None) | |
audio_input.stream(fn=process_audio, inputs=[audio_input, text_output, image_output, state, output_df, df_logs], outputs=[text_output, image_output, state, output_df, df_logs]) | |
demo.launch(share=True) | |