Spaces:
Build error
Build error
import cv2 as cv | |
import numpy as np | |
pyramid_source = [] | |
pyramid_target = [] | |
patch_source_pyramid = [] | |
JT_source_pyramid = [] | |
Hinv_source_pyramid = [] | |
windows = [] | |
class Window: | |
def __init__(self, center_x, center_y, window_size): | |
self.center_x = center_x | |
self.center_y = center_y | |
self.window_size = window_size | |
# The displacement vectors. | |
# Important for simulating the internal calculation vector: g and d. Also they record the final results. | |
self.Dx = 0 | |
self.Dy = 0 | |
self.map_x = None | |
self.map_y = None | |
self.generate_map() | |
def generate_map(self): | |
epsilon = 0.001 | |
start_x = self.center_x - self.window_size//2 | |
start_y = self.center_y - self.window_size//2 | |
# print(start_x, start_y) | |
# When window_size is odd, we must use this form to enforce the size of map to be window_size. | |
crop_x = np.arange(start_x, start_x+self.window_size - epsilon, 1.0).astype(np.float32).reshape(1, self.window_size) | |
self.map_x = np.repeat(crop_x, self.window_size, axis=0) | |
crop_y = np.arange(start_y, start_y + self.window_size - epsilon, 1.0).astype(np.float32).reshape(self.window_size, 1) | |
self.map_y = np.repeat(crop_y, self.window_size, axis=1) | |
def pyrDown(self): | |
""" | |
When down-sample the original patch, the corresponding point position should be /2. | |
However the maps' coordinate should not be dimply /2, therefore the maps need regenerate. | |
""" | |
self.center_x = self.center_x / 2 | |
self.center_y = self.center_y / 2 | |
self.Dx = self.Dx / 2 | |
self.Dy = self.Dy / 2 | |
self.generate_map() | |
def pyrUp(self): | |
""" | |
When calculating the pyramidal LK and moving to the next (bigger) pyramid, the patch size will be doubled. | |
Thus the corresponding point position should be *2. | |
Here we should consider the displacement vector (Dx, Dy), to simulate the equation: g_(L-1) = 2*(g_L + d_L) | |
(d_L calculated in this level iteration and g_L is inherited from the former level iteration, both stored in | |
displacement vector) | |
""" | |
self.center_x = self.center_x * 2 | |
self.center_y = self.center_y * 2 | |
self.Dx = self.Dx * 2 | |
self.Dy = self.Dy * 2 | |
self.generate_map() | |
def move(self, delta_x, delta_y): | |
self.Dx += delta_x | |
self.Dy += delta_y | |
def crop(self, img): | |
# Notice!!: map_column calculated from x, while map_row calculated from y. | |
# Which contradict to the matrix index. | |
patch = cv.remap(img, self.map_x + self.Dx, | |
self.map_y + self.Dy, cv.INTER_LINEAR) | |
return patch | |
def generate_weight(patch_size): | |
""" | |
Generate the weight matrix | |
:param patch_size: (Int) The patch_size | |
:return: The weight map (patch_size * patch_size * 1). | |
""" | |
center = [patch_size // 2, patch_size // 2] | |
sigma_x = sigma_y = patch_size // 2 | |
maps = np.fromfunction(lambda x, y: ((x - center[0])/sigma_x) ** 2 + | |
((y - center[1])/sigma_y) ** 2, | |
(patch_size, patch_size), | |
dtype=int) | |
return np.expand_dims(np.exp(maps/-2.0), -1) | |
def craft_pyramid(image, level, pyramid_container): | |
pyramid_container.clear() | |
pyramid_container.append(image) | |
for i in range(level - 1): | |
image = cv.pyrDown(image) | |
pyramid_container.append(image) | |
def lk_track(face_source, face_target, landmarks_source, window_size, pyramid_level): | |
# Create the image pyramid for both source and target. | |
craft_pyramid(face_source, pyramid_level, pyramid_source) | |
craft_pyramid(face_target, pyramid_level, pyramid_target) | |
# Generate the weight map | |
weight_map = generate_weight(window_size) | |
# Create windows for cropping patches. | |
windows.clear() | |
for landmark in landmarks_source: | |
x, y = landmark | |
# windows.append(Window(x, y, patch_size, face_source.shape[0], face_source.shape[0])) | |
windows.append(Window(x, y, window_size)) | |
# Initialize the patches of both the source. | |
# Notice that here both using the same window, i.e., d = 0. | |
# Afterwards, patch_target will be changed while patch_source will fixed. | |
patch_source_pyramid.clear() | |
JT_source_pyramid.clear() | |
Hinv_source_pyramid.clear() | |
for level in range(pyramid_level): | |
patch_source = [] | |
for window in windows: | |
patch_source.append(window.crop(pyramid_source[level])) | |
if level < pyramid_level - 1: | |
window.pyrDown() | |
# Calculate the Jacobian and Hessen matrix of patch_source | |
JT_source = [] | |
Hinv_source = [] | |
for patch in patch_source: | |
""" | |
# cv.Sobel(_, _, x, y, ...), x indicating the horizontal, | |
# while it's in fact the y axis, for the y is the column. | |
# horizontal means increase at column. | |
""" | |
gradient_x = cv.Sobel(patch, cv.CV_64F, 1, 0, ksize=3) | |
gradient_y = cv.Sobel(patch, cv.CV_64F, 0, 1, ksize=3) | |
gradient_x_w = gradient_x * weight_map | |
gradient_y_w = gradient_y * weight_map | |
J_x = np.reshape(gradient_x, (-1, 1)) | |
J_y = np.reshape(gradient_y, (-1, 1)) | |
J_x_w = np.reshape(gradient_x_w, (-1, 1)) | |
J_y_w = np.reshape(gradient_y_w, (-1, 1)) | |
J = np.concatenate((J_x, J_y), axis=1) | |
J_w = np.concatenate((J_x_w, J_y_w), axis=1) | |
JT_w = np.transpose(J_w) | |
H = np.matmul(JT_w, J) | |
Hinv = np.linalg.inv(H) | |
# Noticed that we only collect the weighted JT here. | |
JT_source.append(JT_w) | |
Hinv_source.append(Hinv) | |
# Collect all the pre-processed data in each level. | |
patch_source_pyramid.append(patch_source) | |
JT_source_pyramid.append(JT_source) | |
Hinv_source_pyramid.append(Hinv_source) | |
# | |
# """ | |
# Sequential Execution | |
# """ | |
max_iter_step = 15 | |
for level in range(pyramid_level-1, -1, -1): | |
epsilon_der1 = 1.0 + level | |
for patch_s, window, JT, Hinv in zip(patch_source_pyramid[level], windows, JT_source_pyramid[level], Hinv_source_pyramid[level]): | |
count = 1 | |
while True: | |
# Patch of target. which will move in each iteration. | |
patch_t = window.crop(pyramid_target[level]) | |
# Calculate the residual | |
r = patch_t - patch_s | |
r = np.reshape(r, (-1, 1)) | |
der1 = np.matmul(JT, r) | |
der1_norm = np.linalg.norm(der1) | |
delta = - np.matmul(Hinv, der1) | |
if der1_norm < epsilon_der1 or count > max_iter_step: | |
if level != 0: | |
# When reach the final level, stop the up-sample. | |
window.pyrUp() | |
break | |
else: | |
window.move(delta[0][0], delta[1][0]) | |
count += 1 | |
predictions = [] | |
for window in windows: # type: Window | |
predictions.append([window.center_x + window.Dx, window.center_y + window.Dy]) | |
return np.array(predictions) | |
def track_bidirectional(faces, locations): | |
patch_size = 15 | |
frames_num = len(faces) | |
pyramid_level = 4 | |
forward_pts = [locations[0].copy()] | |
for i in range(1, frames_num): | |
feature_old = faces[i-1] / 255.0 | |
feature_new = faces[i] / 255.0 | |
location_old = forward_pts[i - 1] | |
forward_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level) | |
forward_pts.append(forward_pt) | |
feedback_pts = [None] * (frames_num - 1) + [forward_pts[-1].copy()] | |
for i in range(frames_num - 2, -1, -1): | |
feature_old = faces[i+1] / 255.0 | |
feature_new = faces[i] / 255.0 | |
location_old = feedback_pts[i - 1] | |
feedback_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level) | |
feedback_pts[i] = feedback_pt | |
return forward_pts, feedback_pts |