Spaces:
Build error
Build error
| from tqdm import tqdm | |
| import numpy as np | |
| import dlib | |
| from collections import OrderedDict | |
| import cv2 | |
| detector = dlib.get_frontal_face_detector() | |
| predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") | |
| FACIAL_LANDMARKS_68_IDXS = OrderedDict([ | |
| ("mouth", (48, 68)), | |
| ("inner_mouth", (60, 68)), | |
| ("right_eyebrow", (17, 22)), | |
| ("left_eyebrow", (22, 27)), | |
| ("right_eye", (36, 42)), | |
| ("left_eye", (42, 48)), | |
| ("nose", (27, 36)), | |
| ("jaw", (0, 17)) | |
| ]) | |
| def shape_to_face(shape, width, height, scale=1.2): | |
| """ | |
| Recalculate the face bounding box based on coarse landmark location(shape) | |
| :param | |
| shape: landmark locations | |
| scale: the scale parameter of face, to enlarge the bounding box | |
| :return: | |
| face_new: new bounding box of face (1*4 list [x1, y1, x2, y2]) | |
| # face_center: the center coordinate of face (1*2 list [x_c, y_c]) | |
| face_size: the face is rectangular( width = height = size)(int) | |
| """ | |
| x_min, y_min = np.min(shape, axis=0) | |
| x_max, y_max = np.max(shape, axis=0) | |
| x_center = (x_min + x_max) // 2 | |
| y_center = (y_min + y_max) // 2 | |
| face_size = int(max(x_max - x_min, y_max - y_min) * scale) | |
| # Enforce it to be even | |
| # Thus the real whole bounding box size will be an odd | |
| # But after cropping the face size will become even and | |
| # keep same to the face_size parameter. | |
| face_size = face_size // 2 * 2 | |
| x1 = max(x_center - face_size // 2, 0) | |
| y1 = max(y_center - face_size // 2, 0) | |
| face_size = min(width - x1, face_size) | |
| face_size = min(height - y1, face_size) | |
| x2 = x1 + face_size | |
| y2 = y1 + face_size | |
| face_new = [int(x1), int(y1), int(x2), int(y2)] | |
| return face_new, face_size | |
| def predict_single_frame(frame): | |
| """ | |
| :param frame: A full frame of video | |
| :return: | |
| face_num: the number of face (just to verify if successfully detect a face) | |
| shape: landmark locations | |
| """ | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = detector(gray, 0) | |
| if len(faces) < 1: | |
| return 0, None | |
| face = faces[0] | |
| landmarks = predictor(frame, face) | |
| face_landmark_list = [(p.x, p.y) for p in landmarks.parts()] | |
| shape = np.array(face_landmark_list) | |
| return 1, shape | |
| def landmark_align(shape): | |
| desiredLeftEye = (0.35, 0.25) | |
| desiredFaceWidth = 2 | |
| desiredFaceHeight = 2 | |
| (lStart, lEnd) = FACIAL_LANDMARKS_68_IDXS["left_eye"] | |
| (rStart, rEnd) = FACIAL_LANDMARKS_68_IDXS["right_eye"] | |
| leftEyePts = shape[lStart:lEnd] | |
| rightEyePts = shape[rStart:rEnd] | |
| # compute the center of mass for each eye | |
| leftEyeCenter = leftEyePts.mean(axis=0) # .astype("int") | |
| rightEyeCenter = rightEyePts.mean(axis=0) # .astype("int") | |
| # compute the angle between the eye centroids | |
| dY = rightEyeCenter[1] - leftEyeCenter[1] | |
| dX = rightEyeCenter[0] - leftEyeCenter[0] | |
| angle = np.degrees(np.arctan2(dY, dX)) # - 180 | |
| # compute the desired right eye x-coordinate based on the | |
| # desired x-coordinate of the left eye | |
| desiredRightEyeX = 1.0 - desiredLeftEye[0] | |
| # determine the scale of the new resulting image by taking | |
| # the ratio of the distance between eyes in the *current* | |
| # image to the ratio of distance between eyes in the | |
| # *desired* image | |
| dist = np.sqrt((dX ** 2) + (dY ** 2)) | |
| desiredDist = (desiredRightEyeX - desiredLeftEye[0]) | |
| desiredDist *= desiredFaceWidth | |
| scale = desiredDist / dist | |
| # compute center (x, y)-coordinates (i.e., the median point) | |
| # between the two eyes in the input image | |
| eyesCenter = ((leftEyeCenter[0] + rightEyeCenter[0]) // 2, | |
| (leftEyeCenter[1] + rightEyeCenter[1]) // 2) | |
| # grab the rotation matrix for rotating and scaling the face | |
| M = cv2.getRotationMatrix2D(eyesCenter, angle, scale) | |
| # update the translation component of the matrix | |
| tX = 0 # desiredFaceWidth * 0.5 | |
| tY = desiredFaceHeight * desiredLeftEye[1] | |
| M[0, 2] += (tX - eyesCenter[0]) | |
| M[1, 2] += (tY - eyesCenter[1]) | |
| n, d = shape.shape | |
| temp = np.zeros((n, d + 1), dtype="int") | |
| temp[:, 0:2] = shape | |
| temp[:, 2] = 1 | |
| aligned_landmarks = np.matmul(M, temp.T) | |
| return aligned_landmarks.T # .astype("int")) | |
| def check_and_merge(location, forward, feedback, P_predict, status_fw=None, status_fb=None): | |
| num_pts = 68 | |
| check = [True] * num_pts | |
| target = location[1] | |
| forward_predict = forward[1] | |
| # To ensure the robustness through feedback-check | |
| forward_base = forward[0] # Also equal to location[0] | |
| feedback_predict = feedback[0] | |
| feedback_diff = feedback_predict - forward_base | |
| feedback_dist = np.linalg.norm(feedback_diff, axis=1, keepdims=True) | |
| # For Kalman Filtering | |
| detect_diff = location[1] - location[0] | |
| detect_dist = np.linalg.norm(detect_diff, axis=1, keepdims=True) | |
| predict_diff = forward[1] - forward[0] | |
| predict_dist = np.linalg.norm(predict_diff, axis=1, keepdims=True) | |
| predict_dist[np.where(predict_dist == 0)] = 1 # Avoid nan | |
| P_detect = (detect_dist / predict_dist).reshape(num_pts) | |
| for ipt in range(num_pts): | |
| if feedback_dist[ipt] > 2: # When use float | |
| check[ipt] = False | |
| if status_fw is not None and np.sum(status_fw) != num_pts: | |
| for ipt in range(num_pts): | |
| if status_fw[ipt][0] == 0: | |
| check[ipt] = False | |
| if status_fw is not None and np.sum(status_fb) != num_pts: | |
| for ipt in range(num_pts): | |
| if status_fb[ipt][0] == 0: | |
| check[ipt] = False | |
| location_merge = target.copy() | |
| # Merge the results: | |
| """ | |
| Use Kalman Filter to combine the calculate result and detect result. | |
| """ | |
| Q = 0.3 # Process variance | |
| for ipt in range(num_pts): | |
| if check[ipt]: | |
| # Kalman parameter | |
| P_predict[ipt] += Q | |
| K = P_predict[ipt] / (P_predict[ipt] + P_detect[ipt]) | |
| location_merge[ipt] = forward_predict[ipt] + K * (target[ipt] - forward_predict[ipt]) | |
| # Update the P_predict by the current K | |
| P_predict[ipt] = (1 - K) * P_predict[ipt] | |
| return location_merge, check, P_predict | |
| def detect_frames_track(frames): | |
| frames_num = len(frames) | |
| assert frames_num != 0 | |
| frame_height, frame_width = frames[0].shape[:2] | |
| """ | |
| Pre-process: | |
| To detect the original results, | |
| and normalize each face to a certain width, | |
| also its corresponding landmarks locations and | |
| scale parameter. | |
| """ | |
| face_size_normalized = 400 | |
| faces = [] | |
| locations = [] | |
| shapes_origin = [] | |
| shapes_para = [] # Use to recover the shape in whole frame. ([x1, y1, scale_shape]) | |
| face_size = 0 | |
| skipped = 0 | |
| """ | |
| Use single frame to detect face on Dlib (CPU) | |
| """ | |
| # ----------------------------------------------------------------------------# | |
| print("Detecting:") | |
| for i in tqdm(range(frames_num)): | |
| frame = frames[i] | |
| face_num, shape = predict_single_frame(frame) | |
| if face_num == 0: | |
| if len(shapes_origin) == 0: | |
| skipped += 1 | |
| # print("Skipped", skipped, "Frame_num", frames_num) | |
| continue | |
| shape = shapes_origin[i - 1 - skipped] | |
| face, face_size = shape_to_face(shape, frame_width, frame_height, 1.2) | |
| faceFrame = frame[face[1]: face[3], | |
| face[0]:face[2]] | |
| if face_size < face_size_normalized: | |
| inter_para = cv2.INTER_CUBIC | |
| else: | |
| inter_para = cv2.INTER_AREA | |
| face_norm = cv2.resize(faceFrame, (face_size_normalized, face_size_normalized), interpolation=inter_para) | |
| scale_shape = face_size_normalized / face_size | |
| shape_norm = np.rint((shape - np.array([face[0], face[1]])) * scale_shape).astype(int) | |
| faces.append(face_norm) | |
| shapes_para.append([face[0], face[1], scale_shape]) | |
| shapes_origin.append(shape) | |
| locations.append(shape_norm) | |
| """ | |
| Calibration module. | |
| """ | |
| segment_length = 2 | |
| locations_sum = len(locations) | |
| if locations_sum == 0: | |
| return [] | |
| locations_track = [locations[0]] | |
| num_pts = 68 | |
| P_predict = np.array([0] * num_pts).reshape(num_pts).astype(float) | |
| print("Tracking") | |
| for i in tqdm(range(locations_sum - 1)): | |
| faces_seg = faces[i:i + segment_length] | |
| locations_seg = locations[i:i + segment_length] | |
| # ----------------------------------------------------------------------# | |
| """ | |
| Numpy Version (DEPRECATED) | |
| """ | |
| # locations_track_start = [locations_track[i]] | |
| # forward_pts, feedback_pts = track_bidirectional(faces_seg, locations_track_start) | |
| # | |
| # forward_pts = np.rint(forward_pts).astype(int) | |
| # feedback_pts = np.rint(feedback_pts).astype(int) | |
| # merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict) | |
| # ----------------------------------------------------------------------# | |
| """ | |
| OpenCV Version | |
| """ | |
| lk_params = dict(winSize=(15, 15), | |
| maxLevel=3, | |
| criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)) | |
| # Use the tracked current location as input. Also use the next frame's predicted location for | |
| # auxiliary initialization. | |
| start_pt = locations_track[i].astype(np.float32) | |
| target_pt = locations_seg[1].astype(np.float32) | |
| forward_pt, status_fw, err_fw = cv2.calcOpticalFlowPyrLK(faces_seg[0], faces_seg[1], | |
| start_pt, target_pt, **lk_params, | |
| flags=cv2.OPTFLOW_USE_INITIAL_FLOW) | |
| feedback_pt, status_fb, err_fb = cv2.calcOpticalFlowPyrLK(faces_seg[1], faces_seg[0], | |
| forward_pt, start_pt, **lk_params, | |
| flags=cv2.OPTFLOW_USE_INITIAL_FLOW) | |
| forward_pts = [locations_track[i].copy(), forward_pt] | |
| feedback_pts = [feedback_pt, forward_pt.copy()] | |
| forward_pts = np.rint(forward_pts).astype(int) | |
| feedback_pts = np.rint(feedback_pts).astype(int) | |
| merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict, status_fw, | |
| status_fb) | |
| # ----------------------------------------------------------------------# | |
| locations_track.append(merge_pt) | |
| """ | |
| If us visualization, write the results to the visualize output folder. | |
| """ | |
| if locations_sum != frames_num: | |
| print("INFO: Landmarks detection failed in some frames. Therefore we disable the " | |
| "visualization for this video. It will be optimized in future version.") | |
| aligned_landmarks = [] | |
| for i in locations_track: | |
| shape = landmark_align(i) | |
| shape = shape.ravel() | |
| shape = shape.tolist() | |
| aligned_landmarks.append(shape) | |
| return aligned_landmarks | |