Spaces:
Runtime error
Runtime error
File size: 4,907 Bytes
ec24258 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# from __future__ import absolute_import, division
from collections import defaultdict
import numpy as np
from src.evaluation import AngleMAE
class DTW:
"""Class for correspondence between two sequence of keypoints detected
from videos
Parameters:
cost_weightage : dictionary containing weightage of MAE and AngleMAE
to compute cost for DTW
"""
def __init__(self, cost_weightage=None):
self.anglemae_calculator = AngleMAE()
self.cost_weightage = cost_weightage
def cost(self, x, y):
"""computes cost for a set of keypoints through MAE or AngleMAE
Args:
x: A numpy array representing the keypoints of a reference frame
y: A numpy array representing the keypoints of a test frame
Returns:
A float value representing the mae/angle mae score between
reference and test pose
"""
cost = 0
if self.cost_weightage['angle_mae']:
mae_angle = self.anglemae_calculator.mean_absolute_error(x, y) * \
self.cost_weightage['angle_mae']
cost += mae_angle
if self.cost_weightage['mae']:
mae = self.MAE.mean_absolute_error(x, y) * self.cost_weightage[
'mae']
cost += mae
return cost
def find_correspondence(self, x, y):
"""applies Dynamic Time Warping algorithm to find correspondence
between reference video and test video
Args:
x: A numpy array representing the keypoints of a reference video
y: A numpy array representing the keypoints of a test video
Returns:
A tuple containing ref_frame_indices, test_frame_indices and costs
where
ref_frame_indices: A list of indices for reference video frames
test_frame_indices : A list of indices for test video frames
costs : A list of cost between reference and test keypoint
"""
x = np.asanyarray(x, dtype='float')
y = np.asanyarray(y, dtype='float')
len_x, len_y = len(x), len(y)
dtw_mapping = defaultdict(lambda: (float('inf'),))
similarity_score = defaultdict(lambda: float('inf'), )
dtw_mapping[0, 0] = (0, 0, 0)
similarity_score[0, 0] = 0
for i in range(1, len_x + 1):
for j in range(1, len_y + 1):
dt = self.cost(x[i - 1], y[j - 1])
dtw_mapping[i, j] = min(
(dtw_mapping[i - 1, j][0] + dt, i - 1, j),
(dtw_mapping[i, j - 1][0] + dt, i, j - 1),
(dtw_mapping[i - 1, j - 1][0] + dt, i - 1, j - 1),
key=lambda a: a[0]
)
similarity_score[i, j] = dt
path = []
i, j = len_x, len_y
while not (i == j == 0):
path.append(
(i - 1, j - 1, dtw_mapping[i - 1, j - 1][0],
similarity_score[i - 1, j - 1])
)
i, j = dtw_mapping[i, j][1], dtw_mapping[i, j][2]
path.reverse()
ref_frame_idx, test_frame_idx, _, costs = DTW.get_ref_test_mapping(path)
return (ref_frame_idx, test_frame_idx, costs)
@staticmethod
def get_ref_test_mapping(paths):
"""applies Dynamic Time Warping algorithm to find correspondence
between reference video and test video
Args:
paths : list of lists which consists of [i, j, ps, c] where i and
j are index of x and y time series respectively which have the
corespondence, ps is cummulative cost and c is cost between these
two instances
Returns:
A tuple containing ref_frame_indices, test_frame_indices,path_score
and costs where
ref_frame_indices: A list of indices for reference video frames
test_frame_indices : A list of indices for test video frames
path_score : A list of path score calculated by DTW between
reference and test keypoints
costs : A list of cost between reference and test keypoint
"""
path = np.array(paths)
ref_2_test = {}
for i in range(path.shape[0]):
ref_2_test_val = ref_2_test.get(path[i][0], [])
ref_2_test_val.append([path[i][1], path[i][2], path[i][3]])
ref_2_test[path[i][0]] = ref_2_test_val
ref_frames = []
test_frames = []
path_score = []
costs = []
for ref_frame, test_frame_list in ref_2_test.items():
ref_frames.append(int(ref_frame))
test_frame_list.sort(key=lambda x: x[2])
test_frames.append(int(test_frame_list[0][0]))
path_score.append(test_frame_list[0][1])
costs.append(test_frame_list[0][2])
return ref_frames, test_frames, path_score, costs
|