Spaces:
Running
Running
| """ | |
| File: app_utils.py | |
| Author: Elena Ryumina and Dmitry Ryumin | |
| Description: This module contains utility functions for facial expression recognition application. | |
| License: MIT License | |
| """ | |
| import torch | |
| import numpy as np | |
| import mediapipe as mp | |
| import pandas as pd | |
| from PIL import Image | |
| import cv2 | |
| # Importing necessary components for the Gradio app | |
| from app.model import ( | |
| pth_model_static, | |
| pth_model_dynamic, | |
| activations, | |
| audio_processor, | |
| audio_model, | |
| device | |
| ) | |
| from app.utils import ( | |
| convert_mp4_to_mp3, | |
| pad_wav, | |
| pad_wav_zeros, | |
| get_box, | |
| pth_processing, | |
| convert_webm_to_mp4, | |
| get_evenly_spaced_frame_indices, | |
| get_c_expr_db_pred | |
| ) | |
| from app.config import DICT_EMO_VIDEO, AV_WEIGHTS, NAME_EMO_AUDIO, DICT_PRED, config_data | |
| from app.plot import display_frame_info, plot_images | |
| from collections import Counter | |
| mp_face_mesh = mp.solutions.face_mesh | |
| class EmotionRecognition: | |
| def __init__( | |
| self, | |
| step=2, | |
| window=4, | |
| sr=16000, | |
| save_path="", | |
| padding="", | |
| ): | |
| self.save_path = save_path | |
| self.step = step | |
| self.window = window | |
| self.sr = sr | |
| self.padding = padding | |
| def predict_emotion(self, path, frame_indices, fps): | |
| prob, plt = self.load_audio_features(path, frame_indices, fps) | |
| return prob, plt | |
| def load_audio_features(self, path, frame_indices, fps): | |
| window_a = self.window * self.sr | |
| step_a = int(self.step * self.sr) | |
| wav, audio_plt = convert_mp4_to_mp3(path, frame_indices, fps, self.sr) | |
| probs = [] | |
| framess = [] | |
| for start_a in range(0, len(wav) + 1, step_a): | |
| end_a = min(start_a + window_a, len(wav)) | |
| a_fss_chunk = wav[start_a:end_a] | |
| if self.padding == "mean" or self.padding == "constant": | |
| a_fss = pad_wav_zeros(a_fss_chunk, window_a, mode=self.padding) | |
| elif self.padding == "repeat": | |
| a_fss = pad_wav(a_fss_chunk, window_a) | |
| a_fss = torch.unsqueeze(a_fss, 0) | |
| a_fss = audio_processor(a_fss, sampling_rate=self.sr) | |
| a_fss = a_fss["input_values"][0] | |
| a_fss = torch.from_numpy(a_fss) | |
| with torch.no_grad(): | |
| prob = audio_model(a_fss.to(device)) | |
| prob = prob.cpu().numpy() | |
| frames = [ | |
| str(i).zfill(6) + ".jpg" | |
| for i in range( | |
| round(start_a / self.sr * fps), round(end_a / self.sr * fps + 1) | |
| ) | |
| ] | |
| probs.extend([prob] * len(frames)) | |
| framess.extend(frames) | |
| if len(probs[0]) == 7: | |
| emo_ABAW = NAME_EMO_AUDIO[:-1] | |
| else: | |
| emo_ABAW = NAME_EMO_AUDIO | |
| df = pd.DataFrame(np.array(probs), columns=emo_ABAW) | |
| df["frames"] = framess | |
| return df, audio_plt | |
| def preprocess_audio_and_predict( | |
| path_video="", | |
| save_path="src/pred_results/C-EXPR-DB", | |
| frame_indices=[], | |
| fps=25, | |
| step=0.5, | |
| padding="mean", | |
| window=4, | |
| sr=16000, | |
| ): | |
| audio_ER = EmotionRecognition( | |
| step=step, | |
| window=window, | |
| sr=sr, | |
| save_path=save_path, | |
| padding=padding, | |
| ) | |
| df_pred, audio_plt = audio_ER.predict_emotion(path_video, frame_indices, fps) | |
| return df_pred, audio_plt | |
| def preprocess_video_and_predict(video): | |
| if video: | |
| if video.split('.')[-1] == 'webm': | |
| video = convert_webm_to_mp4(video) | |
| cap = cv2.VideoCapture(video) | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = np.round(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_indices = get_evenly_spaced_frame_indices(total_frames, 9) | |
| df_probs_audio, audio_plt = preprocess_audio_and_predict( | |
| path_video=video, | |
| frame_indices=frame_indices, | |
| fps=fps, | |
| step=config_data.AUDIO_STEP, | |
| padding="mean", | |
| save_path="", | |
| window=4, | |
| sr=16000, | |
| ) | |
| lstm_features = [] | |
| count_frame = 1 | |
| count_face = 0 | |
| probs_dynamic = [] | |
| probs_static = [] | |
| frames = [] | |
| last_output = None | |
| cur_face = None | |
| faces = [] | |
| zeros = np.zeros((1, 7)) | |
| with torch.no_grad(): | |
| with mp_face_mesh.FaceMesh( | |
| max_num_faces=1, | |
| refine_landmarks=False, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5) as face_mesh: | |
| while cap.isOpened(): | |
| _, frame = cap.read() | |
| if frame is None: break | |
| frame_copy = frame.copy() | |
| frame_copy.flags.writeable = False | |
| frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) | |
| results = face_mesh.process(frame_copy) | |
| frame_copy.flags.writeable = True | |
| if results.multi_face_landmarks: | |
| for fl in results.multi_face_landmarks: | |
| startX, startY, endX, endY = get_box(fl, w, h) | |
| cur_face = frame_copy[startY:endY, startX: endX] | |
| if count_face%config_data.FRAME_DOWNSAMPLING == 0: | |
| cur_face_copy = pth_processing(Image.fromarray(cur_face)) | |
| prediction = torch.nn.functional.softmax(pth_model_static(cur_face_copy.to(device)), dim=1) | |
| features = torch.nn.functional.relu(activations['features']).detach().cpu().numpy() | |
| output_s = prediction.clone() | |
| output_s = output_s.detach().cpu().numpy() | |
| if len(lstm_features) == 0: | |
| lstm_features = [features]*10 | |
| else: | |
| lstm_features = lstm_features[1:] + [features] | |
| lstm_f = torch.from_numpy(np.vstack(lstm_features)) | |
| lstm_f = torch.unsqueeze(lstm_f, 0) | |
| output_d = pth_model_dynamic(lstm_f.to(device)).detach().cpu().numpy() | |
| last_output = output_d | |
| if count_face == 0: | |
| count_face += 1 | |
| else: | |
| if last_output is not None: | |
| output_d = last_output | |
| elif last_output is None: | |
| output_d = zeros | |
| probs_static.append(output_s[0]) | |
| probs_dynamic.append(output_d[0]) | |
| frames.append(count_frame) | |
| else: | |
| lstm_features = [] | |
| if last_output is not None: | |
| probs_static.append(probs_static[-1]) | |
| probs_dynamic.append(probs_dynamic[-1]) | |
| frames.append(count_frame) | |
| elif last_output is None: | |
| probs_static.append(zeros[0]) | |
| probs_dynamic.append(zeros[0]) | |
| frames.append(count_frame) | |
| if cur_face is not None: | |
| if count_frame-1 in frame_indices: | |
| cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA) | |
| cur_face = display_frame_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3) | |
| faces.append(cur_face) | |
| count_frame += 1 | |
| if count_face != 0: | |
| count_face += 1 | |
| img_plt = plot_images(faces) | |
| df_dynamic = pd.DataFrame( | |
| np.array(probs_dynamic), columns=list(DICT_EMO_VIDEO.values()) | |
| ) | |
| df_static = pd.DataFrame( | |
| np.array(probs_static), columns=list(DICT_EMO_VIDEO.values()) | |
| ) | |
| df, pred_plt = get_c_expr_db_pred( | |
| stat_df=df_static, | |
| dyn_df=df_dynamic, | |
| audio_df=df_probs_audio, | |
| name_video='', | |
| weights_1=AV_WEIGHTS, | |
| frame_indices=frame_indices, | |
| ) | |
| av_pred = df['Audio-visual fusion'].tolist() | |
| states = ['negative', 'neutral', 'positive'] | |
| dict_av_pred = Counter(av_pred) | |
| count_states = np.zeros(3) | |
| for k, v in dict_av_pred.items(): | |
| if k in [0]: | |
| count_states[1] += v | |
| elif k in [4, 6, 8, 18]: | |
| count_states[2] += v | |
| else: | |
| count_states[0] += v | |
| state_percent = count_states/np.sum(count_states) | |
| # if np.argmax(state_percent) in [0,2]: | |
| # text1 = "The audio-visual model predicts that a person mostly experiences {} ({:.2f}%) emotions. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) | |
| # else: | |
| text1 = "The audio-visual model predicts that a person is mostly in {} ({:.2f}%) state. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) | |
| top_three = dict_av_pred.most_common(3) | |
| top_three_text = "Predictions of the three most probable emotions: " | |
| for index, count in top_three: | |
| percentage = (count / np.sum(count_states)) * 100 | |
| top_three_text += f"{DICT_PRED[index]} ({percentage:.2f}%), " | |
| top_three_text = top_three_text.rstrip(", ") + "." | |
| df.to_csv(video.split('.')[0] + '.csv', index=False) | |
| return img_plt, audio_plt, pred_plt, text1+top_three_text, video, video.split('.')[0] + '.csv' | |
| else: | |
| return None, None, None, None, None, None |