File size: 4,907 Bytes
ec24258
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# from __future__ import absolute_import, division
from collections import defaultdict

import numpy as np

from src.evaluation import AngleMAE


class DTW:
    """Class for correspondence between two sequence of keypoints detected
    from videos

      Parameters:
        cost_weightage : dictionary containing weightage of MAE and AngleMAE
        to compute cost for DTW

    """

    def __init__(self, cost_weightage=None):
        self.anglemae_calculator = AngleMAE()

        self.cost_weightage = cost_weightage

    def cost(self, x, y):
        """computes cost for a set of keypoints through MAE or AngleMAE

        Args:
          x: A numpy array representing the keypoints of a reference frame
          y: A numpy array representing the keypoints of a test frame

        Returns:
          A float value representing the mae/angle mae score between
          reference and test pose
        """
        cost = 0
        if self.cost_weightage['angle_mae']:
            mae_angle = self.anglemae_calculator.mean_absolute_error(x, y) * \
                        self.cost_weightage['angle_mae']
            cost += mae_angle

        if self.cost_weightage['mae']:
            mae = self.MAE.mean_absolute_error(x, y) * self.cost_weightage[
                'mae']
            cost += mae

        return cost

    def find_correspondence(self, x, y):
        """applies Dynamic Time Warping algorithm to find correspondence
        between reference video and test video

        Args:
          x: A numpy array representing the keypoints of a reference video
          y: A numpy array representing the keypoints of a test video

        Returns:
          A tuple containing ref_frame_indices, test_frame_indices and costs
          where
            ref_frame_indices: A list of indices for reference video frames
            test_frame_indices : A list of indices for test video frames
            costs : A list of cost between reference and test keypoint
        """

        x = np.asanyarray(x, dtype='float')
        y = np.asanyarray(y, dtype='float')
        len_x, len_y = len(x), len(y)

        dtw_mapping = defaultdict(lambda: (float('inf'),))
        similarity_score = defaultdict(lambda: float('inf'), )

        dtw_mapping[0, 0] = (0, 0, 0)

        similarity_score[0, 0] = 0

        for i in range(1, len_x + 1):
            for j in range(1, len_y + 1):
                dt = self.cost(x[i - 1], y[j - 1])

                dtw_mapping[i, j] = min(
                    (dtw_mapping[i - 1, j][0] + dt, i - 1, j),
                    (dtw_mapping[i, j - 1][0] + dt, i, j - 1),
                    (dtw_mapping[i - 1, j - 1][0] + dt, i - 1, j - 1),
                    key=lambda a: a[0]
                    )
                similarity_score[i, j] = dt

        path = []
        i, j = len_x, len_y
        while not (i == j == 0):
            path.append(
                (i - 1, j - 1, dtw_mapping[i - 1, j - 1][0],
                 similarity_score[i - 1, j - 1])
                )
            i, j = dtw_mapping[i, j][1], dtw_mapping[i, j][2]
        path.reverse()

        ref_frame_idx, test_frame_idx, _, costs = DTW.get_ref_test_mapping(path)
        return (ref_frame_idx, test_frame_idx, costs)

    @staticmethod
    def get_ref_test_mapping(paths):
        """applies Dynamic Time Warping algorithm to find correspondence
        between reference video and test video

        Args:
          paths :  list of lists which consists of [i, j, ps, c] where i and
          j are index of x and y time series respectively which have the
          corespondence, ps is cummulative cost and c is cost between these
          two instances

        Returns:
          A tuple containing ref_frame_indices, test_frame_indices,path_score
          and costs where
            ref_frame_indices: A list of indices for reference video frames
            test_frame_indices : A list of indices for test video frames
            path_score : A list of path score calculated by DTW between
            reference and test keypoints
            costs : A list of cost between reference and test keypoint
        """

        path = np.array(paths)
        ref_2_test = {}
        for i in range(path.shape[0]):
            ref_2_test_val = ref_2_test.get(path[i][0], [])
            ref_2_test_val.append([path[i][1], path[i][2], path[i][3]])
            ref_2_test[path[i][0]] = ref_2_test_val
        ref_frames = []
        test_frames = []
        path_score = []
        costs = []

        for ref_frame, test_frame_list in ref_2_test.items():
            ref_frames.append(int(ref_frame))
            test_frame_list.sort(key=lambda x: x[2])
            test_frames.append(int(test_frame_list[0][0]))
            path_score.append(test_frame_list[0][1])
            costs.append(test_frame_list[0][2])

        return ref_frames, test_frames, path_score, costs