""" File: face_utils.py Author: Elena Ryumina and Dmitry Ryumin Description: This module contains utility functions related to facial landmarks and image processing. License: MIT License """ import numpy as np import pandas as pd import math import subprocess import torchaudio import torch import os from PIL import Image from torchvision import transforms # Importing necessary components for the Gradio app from app.config import NAME_EMO_AUDIO, DICT_CE, config_data from app.plot import plot_compound_expression_prediction, plot_audio def norm_coordinates(normalized_x, normalized_y, image_width, image_height): x_px = min(math.floor(normalized_x * image_width), image_width - 1) y_px = min(math.floor(normalized_y * image_height), image_height - 1) return x_px, y_px def get_box(fl, w, h): idx_to_coors = {} for idx, landmark in enumerate(fl.landmark): landmark_px = norm_coordinates(landmark.x, landmark.y, w, h) if landmark_px: idx_to_coors[idx] = landmark_px x_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 0]) y_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 1]) endX = np.max(np.asarray(list(idx_to_coors.values()))[:, 0]) endY = np.max(np.asarray(list(idx_to_coors.values()))[:, 1]) (startX, startY) = (max(0, x_min), max(0, y_min)) (endX, endY) = (min(w - 1, endX), min(h - 1, endY)) return startX, startY, endX, endY def pth_processing(fp): class PreprocessInput(torch.nn.Module): def init(self): super(PreprocessInput, self).init() def forward(self, x): x = x.to(torch.float32) x = torch.flip(x, dims=(0,)) x[0, :, :] -= 91.4953 x[1, :, :] -= 103.8827 x[2, :, :] -= 131.0912 return x def get_img_torch(img, target_size=(224, 224)): transform = transforms.Compose([transforms.PILToTensor(), PreprocessInput()]) img = img.resize(target_size, Image.Resampling.NEAREST) img = transform(img) img = torch.unsqueeze(img, 0) return img return get_img_torch(fp) def convert_webm_to_mp4(input_file): path_save = input_file.split('.')[0] + ".mp4" if not os.path.exists(path_save): ff_video = "ffmpeg -i {} -c:v copy -c:a aac -strict experimental {}".format( input_file, path_save ) subprocess.call(ff_video, shell=True) return path_save def convert_mp4_to_mp3(path, frame_indices, fps, sampling_rate=16000): path_save = path.split('.')[0] + ".wav" if not os.path.exists(path_save): ff_audio = "ffmpeg -i {} -vn -acodec pcm_s16le -ar 44100 -ac 2 {}".format( path, path_save ) subprocess.call(ff_audio, shell=True) wav, sr = torchaudio.load(path_save) num_frames = wav.numpy().shape[1] time_axis = [i / sr for i in range(num_frames)] plt = plot_audio(time_axis, wav, frame_indices, fps, (12, 2)) if wav.size(0) > 1: wav = wav.mean(dim=0, keepdim=True) if sr != sampling_rate: transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sampling_rate) wav = transform(wav) sr = sampling_rate assert sr == sampling_rate return wav.squeeze(0), plt def pad_wav(wav, max_length): current_length = len(wav) if current_length < max_length: repetitions = (max_length + current_length - 1) // current_length wav = torch.cat([wav] * repetitions, dim=0)[:max_length] elif current_length > max_length: wav = wav[:max_length] return wav def pad_wav_zeros(wav, max_length, mode="constant"): if mode == "mean": wav = torch.nn.functional.pad( wav, (0, max(0, max_length - wav.shape[0])), mode="constant", value=torch.mean(wav), ) else: wav = torch.nn.functional.pad( wav, (0, max(0, max_length - wav.shape[0])), mode=mode ) return wav def softmax(matrix): exp_matrix = np.exp(matrix - np.max(matrix, axis=1, keepdims=True)) return exp_matrix / np.sum(exp_matrix, axis=1, keepdims=True) def get_compound_expression(pred, com_emo): pred = np.asarray(pred) prob = np.zeros((len(pred), len(com_emo))) for idx, (_, v) in enumerate(com_emo.items()): idx_1 = v[0] idx_2 = v[1] prob[:, idx] = pred[:, idx_1] + pred[:, idx_2] return prob def get_image_location(curr_video, frame): frame = int(frame.split(".")[0]) + 1 frame = str(frame).zfill(5) + ".jpg" return f"{curr_video}/{frame}" def save_txt(column_names, file_names, labels, save_name): data_lines = [",".join(column_names)] for file_name, label in zip(file_names, labels): data_lines.append(f"{file_name},{label}") with open(save_name, "w") as file: for line in data_lines: file.write(line + "\n") def get_mix_pred(emo_pred, ce_prob): pred = [] for idx, curr_pred in enumerate(emo_pred): if np.max(curr_pred) > config_data.CONFIDENCE_BE: pred.append(np.argmax(curr_pred)) else: pred.append(ce_prob[idx]+6) return pred def get_c_expr_db_pred( stat_df: pd.DataFrame, dyn_df: pd.DataFrame, audio_df: pd.DataFrame, name_video: str, weights_1: list[float], frame_indices: list[int], ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, list[str]]: """ Predict compound expressions using audio-visual emotional probabilities, optimized weights, and rules. Args: stat_df (pd.DataFrame): DataFrame containing static visual probabilities. dyn_df (pd.DataFrame): DataFrame containing dynamic visual probabilities. audio_df (pd.DataFrame): DataFrame containing audio probabilities. name_video (str): Name of the video. weights_1 (List[float]): List of weights for the Dirichlet-based fusion. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[str]]: Predictions for compound expressions, and list of image locations. """ stat_df["image_location"] = [ f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in stat_df.index ] dyn_df["image_location"] = [ f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in dyn_df.index ] image_location = dyn_df.image_location.tolist() stat_df = stat_df[stat_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values dyn_df = softmax( dyn_df[dyn_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values ) audio_df = audio_df.groupby(["frames"]).mean().reset_index() audio_df = audio_df.rename(columns={"frames": "image_location"}) audio_df["image_location"] = [ get_image_location(name_video, i) for i in audio_df.image_location ] audio_df = softmax( audio_df[audio_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values ) if len(image_location) > len(audio_df): last_pred_audio = audio_df[-1] audio_df = np.vstack( (audio_df, [last_pred_audio] * (len(image_location) - len(audio_df))) ) predictions = [stat_df, dyn_df, audio_df] num_predictions = len(predictions) if weights_1: final_predictions = predictions[0] * weights_1[0] for i in range(1, num_predictions): final_predictions += predictions[i] * weights_1[i] else: final_predictions = np.sum(predictions, axis=0) / num_predictions av_prob = np.argmax(get_compound_expression( final_predictions, DICT_CE, ), axis=1) vs_prob = get_compound_expression( predictions[0], DICT_CE) vd_prob = get_compound_expression( predictions[1], DICT_CE) a_prob = get_compound_expression( predictions[2], DICT_CE) av_pred = get_mix_pred(final_predictions, av_prob) vs_pred = get_mix_pred(predictions[0], np.argmax(vs_prob, axis=1)) vd_pred = get_mix_pred(predictions[1], np.argmax(vd_prob, axis=1)) a_pred = get_mix_pred(predictions[2], np.argmax(a_prob, axis=1)) dict_pred_final = {'Audio-visual fusion':av_pred, 'Static visual model':vs_pred,'Dynamic visual model':vd_pred,'Audio model':a_pred} plt = plot_compound_expression_prediction( dict_preds = dict_pred_final, save_path = None, frame_indices = frame_indices, title = "Basic emotion and compound expression predictions") df = pd.DataFrame(dict_pred_final) return df, plt def get_evenly_spaced_frame_indices(total_frames, num_frames=10): if total_frames <= num_frames: return list(range(total_frames)) step = total_frames / num_frames return [int(np.round(i * step)) for i in range(num_frames)]