Spaces:
Build error
Build error
File size: 8,196 Bytes
858279b |
|
import cv2 as cv
import numpy as np
pyramid_source = []
pyramid_target = []
patch_source_pyramid = []
JT_source_pyramid = []
Hinv_source_pyramid = []
windows = []
class Window:
def __init__(self, center_x, center_y, window_size):
self.center_x = center_x
self.center_y = center_y
self.window_size = window_size
# The displacement vectors.
# Important for simulating the internal calculation vector: g and d. Also they record the final results.
self.Dx = 0
self.Dy = 0
self.map_x = None
self.map_y = None
self.generate_map()
def generate_map(self):
epsilon = 0.001
start_x = self.center_x - self.window_size//2
start_y = self.center_y - self.window_size//2
# print(start_x, start_y)
# When window_size is odd, we must use this form to enforce the size of map to be window_size.
crop_x = np.arange(start_x, start_x+self.window_size - epsilon, 1.0).astype(np.float32).reshape(1, self.window_size)
self.map_x = np.repeat(crop_x, self.window_size, axis=0)
crop_y = np.arange(start_y, start_y + self.window_size - epsilon, 1.0).astype(np.float32).reshape(self.window_size, 1)
self.map_y = np.repeat(crop_y, self.window_size, axis=1)
def pyrDown(self):
"""
When down-sample the original patch, the corresponding point position should be /2.
However the maps' coordinate should not be dimply /2, therefore the maps need regenerate.
"""
self.center_x = self.center_x / 2
self.center_y = self.center_y / 2
self.Dx = self.Dx / 2
self.Dy = self.Dy / 2
self.generate_map()
def pyrUp(self):
"""
When calculating the pyramidal LK and moving to the next (bigger) pyramid, the patch size will be doubled.
Thus the corresponding point position should be *2.
Here we should consider the displacement vector (Dx, Dy), to simulate the equation: g_(L-1) = 2*(g_L + d_L)
(d_L calculated in this level iteration and g_L is inherited from the former level iteration, both stored in
displacement vector)
"""
self.center_x = self.center_x * 2
self.center_y = self.center_y * 2
self.Dx = self.Dx * 2
self.Dy = self.Dy * 2
self.generate_map()
def move(self, delta_x, delta_y):
self.Dx += delta_x
self.Dy += delta_y
def crop(self, img):
# Notice!!: map_column calculated from x, while map_row calculated from y.
# Which contradict to the matrix index.
patch = cv.remap(img, self.map_x + self.Dx,
self.map_y + self.Dy, cv.INTER_LINEAR)
return patch
def generate_weight(patch_size):
"""
Generate the weight matrix
:param patch_size: (Int) The patch_size
:return: The weight map (patch_size * patch_size * 1).
"""
center = [patch_size // 2, patch_size // 2]
sigma_x = sigma_y = patch_size // 2
maps = np.fromfunction(lambda x, y: ((x - center[0])/sigma_x) ** 2 +
((y - center[1])/sigma_y) ** 2,
(patch_size, patch_size),
dtype=int)
return np.expand_dims(np.exp(maps/-2.0), -1)
def craft_pyramid(image, level, pyramid_container):
pyramid_container.clear()
pyramid_container.append(image)
for i in range(level - 1):
image = cv.pyrDown(image)
pyramid_container.append(image)
def lk_track(face_source, face_target, landmarks_source, window_size, pyramid_level):
# Create the image pyramid for both source and target.
craft_pyramid(face_source, pyramid_level, pyramid_source)
craft_pyramid(face_target, pyramid_level, pyramid_target)
# Generate the weight map
weight_map = generate_weight(window_size)
# Create windows for cropping patches.
windows.clear()
for landmark in landmarks_source:
x, y = landmark
# windows.append(Window(x, y, patch_size, face_source.shape[0], face_source.shape[0]))
windows.append(Window(x, y, window_size))
# Initialize the patches of both the source.
# Notice that here both using the same window, i.e., d = 0.
# Afterwards, patch_target will be changed while patch_source will fixed.
patch_source_pyramid.clear()
JT_source_pyramid.clear()
Hinv_source_pyramid.clear()
for level in range(pyramid_level):
patch_source = []
for window in windows:
patch_source.append(window.crop(pyramid_source[level]))
if level < pyramid_level - 1:
window.pyrDown()
# Calculate the Jacobian and Hessen matrix of patch_source
JT_source = []
Hinv_source = []
for patch in patch_source:
"""
# cv.Sobel(_, _, x, y, ...), x indicating the horizontal,
# while it's in fact the y axis, for the y is the column.
# horizontal means increase at column.
"""
gradient_x = cv.Sobel(patch, cv.CV_64F, 1, 0, ksize=3)
gradient_y = cv.Sobel(patch, cv.CV_64F, 0, 1, ksize=3)
gradient_x_w = gradient_x * weight_map
gradient_y_w = gradient_y * weight_map
J_x = np.reshape(gradient_x, (-1, 1))
J_y = np.reshape(gradient_y, (-1, 1))
J_x_w = np.reshape(gradient_x_w, (-1, 1))
J_y_w = np.reshape(gradient_y_w, (-1, 1))
J = np.concatenate((J_x, J_y), axis=1)
J_w = np.concatenate((J_x_w, J_y_w), axis=1)
JT_w = np.transpose(J_w)
H = np.matmul(JT_w, J)
Hinv = np.linalg.inv(H)
# Noticed that we only collect the weighted JT here.
JT_source.append(JT_w)
Hinv_source.append(Hinv)
# Collect all the pre-processed data in each level.
patch_source_pyramid.append(patch_source)
JT_source_pyramid.append(JT_source)
Hinv_source_pyramid.append(Hinv_source)
#
# """
# Sequential Execution
# """
max_iter_step = 15
for level in range(pyramid_level-1, -1, -1):
epsilon_der1 = 1.0 + level
for patch_s, window, JT, Hinv in zip(patch_source_pyramid[level], windows, JT_source_pyramid[level], Hinv_source_pyramid[level]):
count = 1
while True:
# Patch of target. which will move in each iteration.
patch_t = window.crop(pyramid_target[level])
# Calculate the residual
r = patch_t - patch_s
r = np.reshape(r, (-1, 1))
der1 = np.matmul(JT, r)
der1_norm = np.linalg.norm(der1)
delta = - np.matmul(Hinv, der1)
if der1_norm < epsilon_der1 or count > max_iter_step:
if level != 0:
# When reach the final level, stop the up-sample.
window.pyrUp()
break
else:
window.move(delta[0][0], delta[1][0])
count += 1
predictions = []
for window in windows: # type: Window
predictions.append([window.center_x + window.Dx, window.center_y + window.Dy])
return np.array(predictions)
def track_bidirectional(faces, locations):
patch_size = 15
frames_num = len(faces)
pyramid_level = 4
forward_pts = [locations[0].copy()]
for i in range(1, frames_num):
feature_old = faces[i-1] / 255.0
feature_new = faces[i] / 255.0
location_old = forward_pts[i - 1]
forward_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level)
forward_pts.append(forward_pt)
feedback_pts = [None] * (frames_num - 1) + [forward_pts[-1].copy()]
for i in range(frames_num - 2, -1, -1):
feature_old = faces[i+1] / 255.0
feature_new = faces[i] / 255.0
location_old = feedback_pts[i - 1]
feedback_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level)
feedback_pts[i] = feedback_pt
return forward_pts, feedback_pts |