""" File: app_utils.py Author: Elena Ryumina and Dmitry Ryumin Description: This module contains utility functions for facial expression recognition application. License: MIT License """ import torch import numpy as np import mediapipe as mp import pandas as pd from PIL import Image import cv2 # Importing necessary components for the Gradio app from app.model import ( pth_model_static, pth_model_dynamic, activations, audio_processor, audio_model, device ) from app.utils import ( convert_mp4_to_mp3, pad_wav, pad_wav_zeros, get_box, pth_processing, convert_webm_to_mp4, get_evenly_spaced_frame_indices, get_c_expr_db_pred ) from app.config import DICT_EMO_VIDEO, AV_WEIGHTS, NAME_EMO_AUDIO, DICT_PRED, config_data from app.plot import display_frame_info, plot_images from collections import Counter mp_face_mesh = mp.solutions.face_mesh class EmotionRecognition: def __init__( self, step=2, window=4, sr=16000, save_path="", padding="", ): self.save_path = save_path self.step = step self.window = window self.sr = sr self.padding = padding def predict_emotion(self, path, frame_indices, fps): prob, plt = self.load_audio_features(path, frame_indices, fps) return prob, plt def load_audio_features(self, path, frame_indices, fps): window_a = self.window * self.sr step_a = int(self.step * self.sr) wav, audio_plt = convert_mp4_to_mp3(path, frame_indices, fps, self.sr) probs = [] framess = [] for start_a in range(0, len(wav) + 1, step_a): end_a = min(start_a + window_a, len(wav)) a_fss_chunk = wav[start_a:end_a] if self.padding == "mean" or self.padding == "constant": a_fss = pad_wav_zeros(a_fss_chunk, window_a, mode=self.padding) elif self.padding == "repeat": a_fss = pad_wav(a_fss_chunk, window_a) a_fss = torch.unsqueeze(a_fss, 0) a_fss = audio_processor(a_fss, sampling_rate=self.sr) a_fss = a_fss["input_values"][0] a_fss = torch.from_numpy(a_fss) with torch.no_grad(): prob = audio_model(a_fss.to(device)) prob = prob.cpu().numpy() frames = [ str(i).zfill(6) + ".jpg" for i in range( round(start_a / self.sr * fps), round(end_a / self.sr * fps + 1) ) ] probs.extend([prob] * len(frames)) framess.extend(frames) if len(probs[0]) == 7: emo_ABAW = NAME_EMO_AUDIO[:-1] else: emo_ABAW = NAME_EMO_AUDIO df = pd.DataFrame(np.array(probs), columns=emo_ABAW) df["frames"] = framess return df, audio_plt def preprocess_audio_and_predict( path_video="", save_path="src/pred_results/C-EXPR-DB", frame_indices=[], fps=25, step=0.5, padding="mean", window=4, sr=16000, ): audio_ER = EmotionRecognition( step=step, window=window, sr=sr, save_path=save_path, padding=padding, ) df_pred, audio_plt = audio_ER.predict_emotion(path_video, frame_indices, fps) return df_pred, audio_plt def preprocess_video_and_predict(video): if video: if video.split('.')[-1] == 'webm': video = convert_webm_to_mp4(video) cap = cv2.VideoCapture(video) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = np.round(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_indices = get_evenly_spaced_frame_indices(total_frames, 9) df_probs_audio, audio_plt = preprocess_audio_and_predict( path_video=video, frame_indices=frame_indices, fps=fps, step=config_data.AUDIO_STEP, padding="mean", save_path="", window=4, sr=16000, ) lstm_features = [] count_frame = 1 count_face = 0 probs_dynamic = [] probs_static = [] frames = [] last_output = None cur_face = None faces = [] zeros = np.zeros((1, 7)) with torch.no_grad(): with mp_face_mesh.FaceMesh( max_num_faces=1, refine_landmarks=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh: while cap.isOpened(): _, frame = cap.read() if frame is None: break frame_copy = frame.copy() frame_copy.flags.writeable = False frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) results = face_mesh.process(frame_copy) frame_copy.flags.writeable = True if results.multi_face_landmarks: for fl in results.multi_face_landmarks: startX, startY, endX, endY = get_box(fl, w, h) cur_face = frame_copy[startY:endY, startX: endX] if count_face%config_data.FRAME_DOWNSAMPLING == 0: cur_face_copy = pth_processing(Image.fromarray(cur_face)) prediction = torch.nn.functional.softmax(pth_model_static(cur_face_copy.to(device)), dim=1) features = torch.nn.functional.relu(activations['features']).detach().cpu().numpy() output_s = prediction.clone() output_s = output_s.detach().cpu().numpy() if len(lstm_features) == 0: lstm_features = [features]*10 else: lstm_features = lstm_features[1:] + [features] lstm_f = torch.from_numpy(np.vstack(lstm_features)) lstm_f = torch.unsqueeze(lstm_f, 0) output_d = pth_model_dynamic(lstm_f.to(device)).detach().cpu().numpy() last_output = output_d if count_face == 0: count_face += 1 else: if last_output is not None: output_d = last_output elif last_output is None: output_d = zeros probs_static.append(output_s[0]) probs_dynamic.append(output_d[0]) frames.append(count_frame) else: lstm_features = [] if last_output is not None: probs_static.append(probs_static[-1]) probs_dynamic.append(probs_dynamic[-1]) frames.append(count_frame) elif last_output is None: probs_static.append(zeros[0]) probs_dynamic.append(zeros[0]) frames.append(count_frame) if cur_face is not None: if count_frame-1 in frame_indices: cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA) cur_face = display_frame_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3) faces.append(cur_face) count_frame += 1 if count_face != 0: count_face += 1 img_plt = plot_images(faces) df_dynamic = pd.DataFrame( np.array(probs_dynamic), columns=list(DICT_EMO_VIDEO.values()) ) df_static = pd.DataFrame( np.array(probs_static), columns=list(DICT_EMO_VIDEO.values()) ) df, pred_plt = get_c_expr_db_pred( stat_df=df_static, dyn_df=df_dynamic, audio_df=df_probs_audio, name_video='', weights_1=AV_WEIGHTS, frame_indices=frame_indices, ) av_pred = df['Audio-visual fusion'].tolist() states = ['negative', 'neutral', 'positive'] dict_av_pred = Counter(av_pred) count_states = np.zeros(3) for k, v in dict_av_pred.items(): if k in [0]: count_states[1] += v elif k in [4, 6, 8, 18]: count_states[2] += v else: count_states[0] += v state_percent = count_states/np.sum(count_states) # if np.argmax(state_percent) in [0,2]: # text1 = "The audio-visual model predicts that a person mostly experiences {} ({:.2f}%) emotions. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) # else: text1 = "The audio-visual model predicts that a person is mostly in {} ({:.2f}%) state. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) top_three = dict_av_pred.most_common(3) top_three_text = "Predictions of the three most probable emotions: " for index, count in top_three: percentage = (count / np.sum(count_states)) * 100 top_three_text += f"{DICT_PRED[index]} ({percentage:.2f}%), " top_three_text = top_three_text.rstrip(", ") + "." df.to_csv(video.split('.')[0] + '.csv', index=False) return img_plt, audio_plt, pred_plt, text1+top_three_text, video, video.split('.')[0] + '.csv' else: return None, None, None, None, None, None