Spaces:
Sleeping
Sleeping
import os | |
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
import cv2 | |
import numpy as np | |
import polars as pl | |
from attrs import define, field | |
from deepface import DeepFace | |
from tqdm import tqdm | |
_SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS: float = 5 | |
class VideoInputException(IOError): | |
pass | |
class VideoEmotionRecognizer: | |
filepath: str | |
_analyzed_frames: pl.DataFrame = field(init=False) | |
def __attrs_post_init__(self): | |
print("Start processing video...") | |
self._analyzed_frames = self._analyze() | |
print("Video processed") | |
def _analyze(self) -> pl.DataFrame: | |
# open video file | |
cap: cv2.VideoCapture = cv2.VideoCapture(self.filepath) | |
if cap.isOpened() == False: | |
raise VideoInputException("Video opening error") | |
# collect timestamps and emotion probabilities for every frame | |
analyzed_frames_data: dict = {"timestamp": [], "emotion": [], "probability": []} | |
total_frame_count: int = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
with tqdm(total=100, bar_format="{desc}: {percentage:.3f}% | {elapsed} < {remaining}") as pbar: | |
while cap.isOpened(): | |
return_flag: bool | |
frame: np.ndarray | |
return_flag, frame = cap.read() | |
if not return_flag: | |
break | |
result = DeepFace.analyze(frame, actions="emotion", enforce_detection=False, silent=True)[0] | |
analyzed_frames_data["timestamp"] += [cap.get(cv2.CAP_PROP_POS_MSEC) / 1000] * len( | |
result["emotion"].keys() | |
) | |
analyzed_frames_data["emotion"] += list(map(str, result["emotion"].keys())) | |
analyzed_frames_data["probability"] += list(map(float, result["emotion"].values())) | |
pbar_update_value = 100 / total_frame_count | |
pbar.update(pbar_update_value) | |
return pl.DataFrame(analyzed_frames_data) | |
def emotions_summary(self) -> dict: | |
# sum probabilities of every emotion by frames | |
emotions_summary: pl.DataFrame = ( | |
self._analyzed_frames.groupby("emotion") | |
.agg(pl.col("probability").sum()) | |
.sort("probability", descending=True) | |
) | |
# normalize probabilities and keep only negative emotions | |
emotions_summary = emotions_summary.with_columns( | |
(pl.col("probability") / pl.sum("probability")).alias("probability") | |
).filter(pl.col("emotion") != "neutral") | |
# return emotion probabilities in form of dict {emotion: probability} | |
output: dict = dict( | |
zip( | |
emotions_summary["emotion"].to_list(), | |
emotions_summary["probability"].to_list(), | |
) | |
) | |
return output | |
def emotions_timestamps(self) -> dict: | |
# keep only most probable emotion in every frame | |
emotions_timestamps: pl.DataFrame = ( | |
self._analyzed_frames.sort("probability", descending=True) | |
.groupby("timestamp") | |
.first() | |
.sort(by="timestamp", descending=False) | |
) | |
# get duration of every consecutive emotion repetition | |
emotions_timestamps = emotions_timestamps.with_columns( | |
(pl.col("emotion") != pl.col("emotion").shift_and_fill(pl.col("emotion").backward_fill(), periods=1)) | |
.cumsum() | |
.alias("emotion_group") | |
) | |
emotions_timestamps = ( | |
emotions_timestamps.groupby(["emotion", "emotion_group"]) | |
.agg( | |
pl.col("timestamp").min().alias("emotion_start_timestamp"), | |
pl.col("timestamp").max().alias("emotion_finish_timestamp"), | |
) | |
.drop("emotion_group") | |
.sort(by="emotion_start_timestamp", descending=False) | |
) | |
# keep only significant negative emotions periods | |
emotions_timestamps = ( | |
emotions_timestamps.with_columns( | |
(pl.col("emotion_finish_timestamp") - pl.col("emotion_start_timestamp")).alias("duration") | |
) | |
.filter(pl.col("emotion") != "neutral") | |
.filter(pl.col("duration") > _SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS) | |
) | |
# return timestamps of significant negative emotions periods in form of dict {emotion: start_timestamp} | |
output: dict = dict( | |
zip( | |
emotions_timestamps["emotion"].to_list(), | |
emotions_timestamps["emotion_start_timestamp"].to_list(), | |
) | |
) | |
return output | |