File size: 4,684 Bytes
5757396
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os

os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"


import cv2
import numpy as np
import polars as pl
from attrs import define, field
from deepface import DeepFace
from tqdm import tqdm

_SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS: float = 5


class VideoInputException(IOError):
    pass


@define(slots=True, auto_attribs=True)
class VideoEmotionRecognizer:
    filepath: str
    _analyzed_frames: pl.DataFrame = field(init=False)

    def __attrs_post_init__(self):
        print("Start processing video...")
        self._analyzed_frames = self._analyze()
        print("Video processed")

    def _analyze(self) -> pl.DataFrame:
        # open video file
        cap: cv2.VideoCapture = cv2.VideoCapture(self.filepath)
        if cap.isOpened() == False:
            raise VideoInputException("Video opening error")

        # collect timestamps and emotion probabilities for every frame
        analyzed_frames_data: dict = {"timestamp": [], "emotion": [], "probability": []}
        total_frame_count: int = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        with tqdm(total=100, bar_format="{desc}: {percentage:.3f}% | {elapsed} < {remaining}") as pbar:
            while cap.isOpened():
                return_flag: bool
                frame: np.ndarray
                return_flag, frame = cap.read()
                if not return_flag:
                    break
                
                result = DeepFace.analyze(frame, actions="emotion", enforce_detection=False, silent=True)[0]
                analyzed_frames_data["timestamp"] += [cap.get(cv2.CAP_PROP_POS_MSEC) / 1000] * len(
                    result["emotion"].keys()
                )
                analyzed_frames_data["emotion"] += list(map(str, result["emotion"].keys()))
                analyzed_frames_data["probability"] += list(map(float, result["emotion"].values()))

                pbar_update_value = 100 / total_frame_count
                pbar.update(pbar_update_value)

        return pl.DataFrame(analyzed_frames_data)

    def emotions_summary(self) -> dict:
        # sum probabilities of every emotion by frames
        emotions_summary: pl.DataFrame = (
            self._analyzed_frames.groupby("emotion")
            .agg(pl.col("probability").sum())
            .sort("probability", descending=True)
        )

        # normalize probabilities and keep only negative emotions
        emotions_summary = emotions_summary.with_columns(
            (pl.col("probability") / pl.sum("probability")).alias("probability")
        ).filter(pl.col("emotion") != "neutral")

        # return emotion probabilities in form of dict {emotion: probability}
        output: dict = dict(
            zip(
                emotions_summary["emotion"].to_list(),
                emotions_summary["probability"].to_list(),
            )
        )

        return output

    def emotions_timestamps(self) -> dict:
        # keep only most probable emotion in every frame
        emotions_timestamps: pl.DataFrame = (
            self._analyzed_frames.sort("probability", descending=True)
            .groupby("timestamp")
            .first()
            .sort(by="timestamp", descending=False)
        )

        # get duration of every consecutive emotion repetition
        emotions_timestamps = emotions_timestamps.with_columns(
            (pl.col("emotion") != pl.col("emotion").shift_and_fill(pl.col("emotion").backward_fill(), periods=1))
            .cumsum()
            .alias("emotion_group")
        )
        emotions_timestamps = (
            emotions_timestamps.groupby(["emotion", "emotion_group"])
            .agg(
                pl.col("timestamp").min().alias("emotion_start_timestamp"),
                pl.col("timestamp").max().alias("emotion_finish_timestamp"),
            )
            .drop("emotion_group")
            .sort(by="emotion_start_timestamp", descending=False)
        )

        # keep only significant negative emotions periods
        emotions_timestamps = (
            emotions_timestamps.with_columns(
                (pl.col("emotion_finish_timestamp") - pl.col("emotion_start_timestamp")).alias("duration")
            )
            .filter(pl.col("emotion") != "neutral")
            .filter(pl.col("duration") > _SIGNIFICANT_EMOTION_PERIOD_LENGTH_IN_SECONDS)
        )

        # return timestamps of significant negative emotions periods in form of dict {emotion: start_timestamp}
        output: dict = dict(
            zip(
                emotions_timestamps["emotion"].to_list(),
                emotions_timestamps["emotion_start_timestamp"].to_list(),
            )
        )

        return output