PoseSync-Video-Matching-Tool / src /correspondence.py
RishitJavia's picture
Add code.
ec24258
# from __future__ import absolute_import, division
from collections import defaultdict
import numpy as np
from src.evaluation import AngleMAE
class DTW:
"""Class for correspondence between two sequence of keypoints detected
from videos
Parameters:
cost_weightage : dictionary containing weightage of MAE and AngleMAE
to compute cost for DTW
"""
def __init__(self, cost_weightage=None):
self.anglemae_calculator = AngleMAE()
self.cost_weightage = cost_weightage
def cost(self, x, y):
"""computes cost for a set of keypoints through MAE or AngleMAE
Args:
x: A numpy array representing the keypoints of a reference frame
y: A numpy array representing the keypoints of a test frame
Returns:
A float value representing the mae/angle mae score between
reference and test pose
"""
cost = 0
if self.cost_weightage['angle_mae']:
mae_angle = self.anglemae_calculator.mean_absolute_error(x, y) * \
self.cost_weightage['angle_mae']
cost += mae_angle
if self.cost_weightage['mae']:
mae = self.MAE.mean_absolute_error(x, y) * self.cost_weightage[
'mae']
cost += mae
return cost
def find_correspondence(self, x, y):
"""applies Dynamic Time Warping algorithm to find correspondence
between reference video and test video
Args:
x: A numpy array representing the keypoints of a reference video
y: A numpy array representing the keypoints of a test video
Returns:
A tuple containing ref_frame_indices, test_frame_indices and costs
where
ref_frame_indices: A list of indices for reference video frames
test_frame_indices : A list of indices for test video frames
costs : A list of cost between reference and test keypoint
"""
x = np.asanyarray(x, dtype='float')
y = np.asanyarray(y, dtype='float')
len_x, len_y = len(x), len(y)
dtw_mapping = defaultdict(lambda: (float('inf'),))
similarity_score = defaultdict(lambda: float('inf'), )
dtw_mapping[0, 0] = (0, 0, 0)
similarity_score[0, 0] = 0
for i in range(1, len_x + 1):
for j in range(1, len_y + 1):
dt = self.cost(x[i - 1], y[j - 1])
dtw_mapping[i, j] = min(
(dtw_mapping[i - 1, j][0] + dt, i - 1, j),
(dtw_mapping[i, j - 1][0] + dt, i, j - 1),
(dtw_mapping[i - 1, j - 1][0] + dt, i - 1, j - 1),
key=lambda a: a[0]
)
similarity_score[i, j] = dt
path = []
i, j = len_x, len_y
while not (i == j == 0):
path.append(
(i - 1, j - 1, dtw_mapping[i - 1, j - 1][0],
similarity_score[i - 1, j - 1])
)
i, j = dtw_mapping[i, j][1], dtw_mapping[i, j][2]
path.reverse()
ref_frame_idx, test_frame_idx, _, costs = DTW.get_ref_test_mapping(path)
return (ref_frame_idx, test_frame_idx, costs)
@staticmethod
def get_ref_test_mapping(paths):
"""applies Dynamic Time Warping algorithm to find correspondence
between reference video and test video
Args:
paths : list of lists which consists of [i, j, ps, c] where i and
j are index of x and y time series respectively which have the
corespondence, ps is cummulative cost and c is cost between these
two instances
Returns:
A tuple containing ref_frame_indices, test_frame_indices,path_score
and costs where
ref_frame_indices: A list of indices for reference video frames
test_frame_indices : A list of indices for test video frames
path_score : A list of path score calculated by DTW between
reference and test keypoints
costs : A list of cost between reference and test keypoint
"""
path = np.array(paths)
ref_2_test = {}
for i in range(path.shape[0]):
ref_2_test_val = ref_2_test.get(path[i][0], [])
ref_2_test_val.append([path[i][1], path[i][2], path[i][3]])
ref_2_test[path[i][0]] = ref_2_test_val
ref_frames = []
test_frames = []
path_score = []
costs = []
for ref_frame, test_frame_list in ref_2_test.items():
ref_frames.append(int(ref_frame))
test_frame_list.sort(key=lambda x: x[2])
test_frames.append(int(test_frame_list[0][0]))
path_score.append(test_frame_list[0][1])
costs.append(test_frame_list[0][2])
return ref_frames, test_frames, path_score, costs