Spaces:
Build error
Build error
| import cv2 as cv | |
| import numpy as np | |
| pyramid_source = [] | |
| pyramid_target = [] | |
| patch_source_pyramid = [] | |
| JT_source_pyramid = [] | |
| Hinv_source_pyramid = [] | |
| windows = [] | |
| class Window: | |
| def __init__(self, center_x, center_y, window_size): | |
| self.center_x = center_x | |
| self.center_y = center_y | |
| self.window_size = window_size | |
| # The displacement vectors. | |
| # Important for simulating the internal calculation vector: g and d. Also they record the final results. | |
| self.Dx = 0 | |
| self.Dy = 0 | |
| self.map_x = None | |
| self.map_y = None | |
| self.generate_map() | |
| def generate_map(self): | |
| epsilon = 0.001 | |
| start_x = self.center_x - self.window_size//2 | |
| start_y = self.center_y - self.window_size//2 | |
| # print(start_x, start_y) | |
| # When window_size is odd, we must use this form to enforce the size of map to be window_size. | |
| crop_x = np.arange(start_x, start_x+self.window_size - epsilon, 1.0).astype(np.float32).reshape(1, self.window_size) | |
| self.map_x = np.repeat(crop_x, self.window_size, axis=0) | |
| crop_y = np.arange(start_y, start_y + self.window_size - epsilon, 1.0).astype(np.float32).reshape(self.window_size, 1) | |
| self.map_y = np.repeat(crop_y, self.window_size, axis=1) | |
| def pyrDown(self): | |
| """ | |
| When down-sample the original patch, the corresponding point position should be /2. | |
| However the maps' coordinate should not be dimply /2, therefore the maps need regenerate. | |
| """ | |
| self.center_x = self.center_x / 2 | |
| self.center_y = self.center_y / 2 | |
| self.Dx = self.Dx / 2 | |
| self.Dy = self.Dy / 2 | |
| self.generate_map() | |
| def pyrUp(self): | |
| """ | |
| When calculating the pyramidal LK and moving to the next (bigger) pyramid, the patch size will be doubled. | |
| Thus the corresponding point position should be *2. | |
| Here we should consider the displacement vector (Dx, Dy), to simulate the equation: g_(L-1) = 2*(g_L + d_L) | |
| (d_L calculated in this level iteration and g_L is inherited from the former level iteration, both stored in | |
| displacement vector) | |
| """ | |
| self.center_x = self.center_x * 2 | |
| self.center_y = self.center_y * 2 | |
| self.Dx = self.Dx * 2 | |
| self.Dy = self.Dy * 2 | |
| self.generate_map() | |
| def move(self, delta_x, delta_y): | |
| self.Dx += delta_x | |
| self.Dy += delta_y | |
| def crop(self, img): | |
| # Notice!!: map_column calculated from x, while map_row calculated from y. | |
| # Which contradict to the matrix index. | |
| patch = cv.remap(img, self.map_x + self.Dx, | |
| self.map_y + self.Dy, cv.INTER_LINEAR) | |
| return patch | |
| def generate_weight(patch_size): | |
| """ | |
| Generate the weight matrix | |
| :param patch_size: (Int) The patch_size | |
| :return: The weight map (patch_size * patch_size * 1). | |
| """ | |
| center = [patch_size // 2, patch_size // 2] | |
| sigma_x = sigma_y = patch_size // 2 | |
| maps = np.fromfunction(lambda x, y: ((x - center[0])/sigma_x) ** 2 + | |
| ((y - center[1])/sigma_y) ** 2, | |
| (patch_size, patch_size), | |
| dtype=int) | |
| return np.expand_dims(np.exp(maps/-2.0), -1) | |
| def craft_pyramid(image, level, pyramid_container): | |
| pyramid_container.clear() | |
| pyramid_container.append(image) | |
| for i in range(level - 1): | |
| image = cv.pyrDown(image) | |
| pyramid_container.append(image) | |
| def lk_track(face_source, face_target, landmarks_source, window_size, pyramid_level): | |
| # Create the image pyramid for both source and target. | |
| craft_pyramid(face_source, pyramid_level, pyramid_source) | |
| craft_pyramid(face_target, pyramid_level, pyramid_target) | |
| # Generate the weight map | |
| weight_map = generate_weight(window_size) | |
| # Create windows for cropping patches. | |
| windows.clear() | |
| for landmark in landmarks_source: | |
| x, y = landmark | |
| # windows.append(Window(x, y, patch_size, face_source.shape[0], face_source.shape[0])) | |
| windows.append(Window(x, y, window_size)) | |
| # Initialize the patches of both the source. | |
| # Notice that here both using the same window, i.e., d = 0. | |
| # Afterwards, patch_target will be changed while patch_source will fixed. | |
| patch_source_pyramid.clear() | |
| JT_source_pyramid.clear() | |
| Hinv_source_pyramid.clear() | |
| for level in range(pyramid_level): | |
| patch_source = [] | |
| for window in windows: | |
| patch_source.append(window.crop(pyramid_source[level])) | |
| if level < pyramid_level - 1: | |
| window.pyrDown() | |
| # Calculate the Jacobian and Hessen matrix of patch_source | |
| JT_source = [] | |
| Hinv_source = [] | |
| for patch in patch_source: | |
| """ | |
| # cv.Sobel(_, _, x, y, ...), x indicating the horizontal, | |
| # while it's in fact the y axis, for the y is the column. | |
| # horizontal means increase at column. | |
| """ | |
| gradient_x = cv.Sobel(patch, cv.CV_64F, 1, 0, ksize=3) | |
| gradient_y = cv.Sobel(patch, cv.CV_64F, 0, 1, ksize=3) | |
| gradient_x_w = gradient_x * weight_map | |
| gradient_y_w = gradient_y * weight_map | |
| J_x = np.reshape(gradient_x, (-1, 1)) | |
| J_y = np.reshape(gradient_y, (-1, 1)) | |
| J_x_w = np.reshape(gradient_x_w, (-1, 1)) | |
| J_y_w = np.reshape(gradient_y_w, (-1, 1)) | |
| J = np.concatenate((J_x, J_y), axis=1) | |
| J_w = np.concatenate((J_x_w, J_y_w), axis=1) | |
| JT_w = np.transpose(J_w) | |
| H = np.matmul(JT_w, J) | |
| Hinv = np.linalg.inv(H) | |
| # Noticed that we only collect the weighted JT here. | |
| JT_source.append(JT_w) | |
| Hinv_source.append(Hinv) | |
| # Collect all the pre-processed data in each level. | |
| patch_source_pyramid.append(patch_source) | |
| JT_source_pyramid.append(JT_source) | |
| Hinv_source_pyramid.append(Hinv_source) | |
| # | |
| # """ | |
| # Sequential Execution | |
| # """ | |
| max_iter_step = 15 | |
| for level in range(pyramid_level-1, -1, -1): | |
| epsilon_der1 = 1.0 + level | |
| for patch_s, window, JT, Hinv in zip(patch_source_pyramid[level], windows, JT_source_pyramid[level], Hinv_source_pyramid[level]): | |
| count = 1 | |
| while True: | |
| # Patch of target. which will move in each iteration. | |
| patch_t = window.crop(pyramid_target[level]) | |
| # Calculate the residual | |
| r = patch_t - patch_s | |
| r = np.reshape(r, (-1, 1)) | |
| der1 = np.matmul(JT, r) | |
| der1_norm = np.linalg.norm(der1) | |
| delta = - np.matmul(Hinv, der1) | |
| if der1_norm < epsilon_der1 or count > max_iter_step: | |
| if level != 0: | |
| # When reach the final level, stop the up-sample. | |
| window.pyrUp() | |
| break | |
| else: | |
| window.move(delta[0][0], delta[1][0]) | |
| count += 1 | |
| predictions = [] | |
| for window in windows: # type: Window | |
| predictions.append([window.center_x + window.Dx, window.center_y + window.Dy]) | |
| return np.array(predictions) | |
| def track_bidirectional(faces, locations): | |
| patch_size = 15 | |
| frames_num = len(faces) | |
| pyramid_level = 4 | |
| forward_pts = [locations[0].copy()] | |
| for i in range(1, frames_num): | |
| feature_old = faces[i-1] / 255.0 | |
| feature_new = faces[i] / 255.0 | |
| location_old = forward_pts[i - 1] | |
| forward_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level) | |
| forward_pts.append(forward_pt) | |
| feedback_pts = [None] * (frames_num - 1) + [forward_pts[-1].copy()] | |
| for i in range(frames_num - 2, -1, -1): | |
| feature_old = faces[i+1] / 255.0 | |
| feature_new = faces[i] / 255.0 | |
| location_old = feedback_pts[i - 1] | |
| feedback_pt = lk_track(feature_old, feature_new, location_old, patch_size, pyramid_level) | |
| feedback_pts[i] = feedback_pt | |
| return forward_pts, feedback_pts |