|
import numpy as np |
|
from .utils import get_keypoint_weight |
|
|
|
|
|
class DTWForKeypoints: |
|
def __init__(self, keypoints1, keypoints2): |
|
self.keypoints1 = keypoints1 |
|
self.keypoints2 = keypoints2 |
|
|
|
def get_dtw_path(self): |
|
|
|
norm_kp1 = self.normalize_keypoints(self.keypoints1) |
|
norm_kp2 = self.normalize_keypoints(self.keypoints2) |
|
|
|
kp_weight = get_keypoint_weight() |
|
oks, oks_unnorm = self.object_keypoint_similarity(norm_kp1, |
|
norm_kp2, keypoint_weights=kp_weight) |
|
print(f"OKS max {oks.max():.2f} min {oks.min():.2f}") |
|
|
|
|
|
cost_matrix = 1 - oks |
|
dtw_dist, dtw_path = self.dynamic_time_warp(cost_matrix) |
|
|
|
return dtw_path, oks, oks_unnorm |
|
|
|
def normalize_keypoints(self, keypoints): |
|
centroid = keypoints.mean(axis=1)[:, None] |
|
max_distance = np.max(np.sqrt(np.sum((keypoints - centroid) ** 2, axis=2)), |
|
axis=1) + 1e-6 |
|
|
|
normalized_keypoints = (keypoints - centroid) / max_distance[:, None, None] |
|
return normalized_keypoints |
|
|
|
def keypoints_areas(self, keypoints): |
|
min_coords = np.min(keypoints, axis=1) |
|
max_coords = np.max(keypoints, axis=1) |
|
areas = np.prod(max_coords - min_coords, axis=1) |
|
return areas |
|
|
|
def object_keypoint_similarity(self, keypoints1, |
|
keypoints2, |
|
scale_constant=0.2, |
|
keypoint_weights=None): |
|
""" Calculate the Object Keypoint Similarity (OKS) for multiple objects, |
|
and add weight to each keypoint. Here we choose to normalize the points |
|
using centroid and max distance instead of bounding box area. |
|
""" |
|
|
|
sq_diff = np.sum((keypoints1[:, None] - keypoints2) ** 2, axis=-1) |
|
|
|
oks = np.exp(-sq_diff / (2 * scale_constant ** 2)) |
|
oks_unnorm = oks.copy() |
|
|
|
if keypoint_weights is not None: |
|
oks = oks * keypoint_weights |
|
oks = np.sum(oks, axis=-1) |
|
else: |
|
oks = np.mean(oks, axis=-1) |
|
|
|
return oks, oks_unnorm |
|
|
|
def dynamic_time_warp(self, cost_matrix, R=1000): |
|
"""Compute the Dynamic Time Warping distance and path between two time series. |
|
If the time series is too long, it will use the Sakoe-Chiba Band constraint, |
|
so time complexity is bounded at O(MR). |
|
""" |
|
|
|
M = len(self.keypoints1) |
|
N = len(self.keypoints2) |
|
|
|
|
|
D = np.full((M, N), np.inf) |
|
|
|
|
|
D[0, 0] = cost_matrix[0, 0] |
|
for i in range(1, M): |
|
D[i, 0] = D[i - 1, 0] + cost_matrix[i, 0] |
|
|
|
for j in range(1, N): |
|
D[0, j] = D[0, j - 1] + cost_matrix[0, j] |
|
|
|
|
|
|
|
for i in range(1, M): |
|
for j in range(max(1, i - R), min(N, i + R + 1)): |
|
cost = cost_matrix[i, j] |
|
D[i, j] = cost + min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1]) |
|
|
|
|
|
path = [(M - 1, N - 1)] |
|
i, j = M - 1, N - 1 |
|
while i > 0 or j > 0: |
|
min_idx = np.argmin([D[i - 1, j], D[i, j - 1], D[i - 1, j - 1]]) |
|
if min_idx == 0: |
|
i -= 1 |
|
elif min_idx == 1: |
|
j -= 1 |
|
else: |
|
i -= 1 |
|
j -= 1 |
|
path.append((i, j)) |
|
path.reverse() |
|
|
|
return D[-1, -1], path |
|
|
|
if __name__ == '__main__': |
|
|
|
from mmengine.fileio import load |
|
|
|
keypoints1, kp1_scores = load('tennis1.pkl') |
|
keypoints2, kp2_scores = load('tennis3.pkl') |
|
|
|
|
|
dtw = DTWForKeypoints(keypoints1, keypoints2) |
|
path = dtw.get_dtw_path() |
|
print(path) |