EdControl / emrec_model.py
Danil Islamov
Dev (#9)
5757396
raw
history blame contribute delete
No virus
4.68 kB
import os
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import cv2
import numpy as np
import polars as pl
from attrs import define, field
from deepface import DeepFace
from tqdm import tqdm
_SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS: float = 5
class VideoInputException(IOError):
pass
@define(slots=True, auto_attribs=True)
class VideoEmotionRecognizer:
filepath: str
_analyzed_frames: pl.DataFrame = field(init=False)
def __attrs_post_init__(self):
print("Start processing video...")
self._analyzed_frames = self._analyze()
print("Video processed")
def _analyze(self) -> pl.DataFrame:
# open video file
cap: cv2.VideoCapture = cv2.VideoCapture(self.filepath)
if cap.isOpened() == False:
raise VideoInputException("Video opening error")
# collect timestamps and emotion probabilities for every frame
analyzed_frames_data: dict = {"timestamp": [], "emotion": [], "probability": []}
total_frame_count: int = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
with tqdm(total=100, bar_format="{desc}: {percentage:.3f}% | {elapsed} < {remaining}") as pbar:
while cap.isOpened():
return_flag: bool
frame: np.ndarray
return_flag, frame = cap.read()
if not return_flag:
break
result = DeepFace.analyze(frame, actions="emotion", enforce_detection=False, silent=True)[0]
analyzed_frames_data["timestamp"] += [cap.get(cv2.CAP_PROP_POS_MSEC) / 1000] * len(
result["emotion"].keys()
)
analyzed_frames_data["emotion"] += list(map(str, result["emotion"].keys()))
analyzed_frames_data["probability"] += list(map(float, result["emotion"].values()))
pbar_update_value = 100 / total_frame_count
pbar.update(pbar_update_value)
return pl.DataFrame(analyzed_frames_data)
def emotions_summary(self) -> dict:
# sum probabilities of every emotion by frames
emotions_summary: pl.DataFrame = (
self._analyzed_frames.groupby("emotion")
.agg(pl.col("probability").sum())
.sort("probability", descending=True)
)
# normalize probabilities and keep only negative emotions
emotions_summary = emotions_summary.with_columns(
(pl.col("probability") / pl.sum("probability")).alias("probability")
).filter(pl.col("emotion") != "neutral")
# return emotion probabilities in form of dict {emotion: probability}
output: dict = dict(
zip(
emotions_summary["emotion"].to_list(),
emotions_summary["probability"].to_list(),
)
)
return output
def emotions_timestamps(self) -> dict:
# keep only most probable emotion in every frame
emotions_timestamps: pl.DataFrame = (
self._analyzed_frames.sort("probability", descending=True)
.groupby("timestamp")
.first()
.sort(by="timestamp", descending=False)
)
# get duration of every consecutive emotion repetition
emotions_timestamps = emotions_timestamps.with_columns(
(pl.col("emotion") != pl.col("emotion").shift_and_fill(pl.col("emotion").backward_fill(), periods=1))
.cumsum()
.alias("emotion_group")
)
emotions_timestamps = (
emotions_timestamps.groupby(["emotion", "emotion_group"])
.agg(
pl.col("timestamp").min().alias("emotion_start_timestamp"),
pl.col("timestamp").max().alias("emotion_finish_timestamp"),
)
.drop("emotion_group")
.sort(by="emotion_start_timestamp", descending=False)
)
# keep only significant negative emotions periods
emotions_timestamps = (
emotions_timestamps.with_columns(
(pl.col("emotion_finish_timestamp") - pl.col("emotion_start_timestamp")).alias("duration")
)
.filter(pl.col("emotion") != "neutral")
.filter(pl.col("duration") > _SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS)
)
# return timestamps of significant negative emotions periods in form of dict {emotion: start_timestamp}
output: dict = dict(
zip(
emotions_timestamps["emotion"].to_list(),
emotions_timestamps["emotion_start_timestamp"].to_list(),
)
)
return output