Face-forgery-detection / landmark_utils.py
asdasdasdasd's picture
Upload landmark_utils.py
c794a89
raw
history blame
11.5 kB
from tqdm import tqdm
import numpy as np
import dlib
from collections import OrderedDict
import cv2
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
FACIAL_LANDMARKS_68_IDXS = OrderedDict([
("mouth", (48, 68)),
("inner_mouth", (60, 68)),
("right_eyebrow", (17, 22)),
("left_eyebrow", (22, 27)),
("right_eye", (36, 42)),
("left_eye", (42, 48)),
("nose", (27, 36)),
("jaw", (0, 17))
])
def shape_to_face(shape, width, height, scale=1.2):
"""
Recalculate the face bounding box based on coarse landmark location(shape)
:param
shape: landmark locations
scale: the scale parameter of face, to enlarge the bounding box
:return:
face_new: new bounding box of face (1*4 list [x1, y1, x2, y2])
# face_center: the center coordinate of face (1*2 list [x_c, y_c])
face_size: the face is rectangular( width = height = size)(int)
"""
x_min, y_min = np.min(shape, axis=0)
x_max, y_max = np.max(shape, axis=0)
x_center = (x_min + x_max) // 2
y_center = (y_min + y_max) // 2
face_size = int(max(x_max - x_min, y_max - y_min) * scale)
# Enforce it to be even
# Thus the real whole bounding box size will be an odd
# But after cropping the face size will become even and
# keep same to the face_size parameter.
face_size = face_size // 2 * 2
x1 = max(x_center - face_size // 2, 0)
y1 = max(y_center - face_size // 2, 0)
face_size = min(width - x1, face_size)
face_size = min(height - y1, face_size)
x2 = x1 + face_size
y2 = y1 + face_size
face_new = [int(x1), int(y1), int(x2), int(y2)]
return face_new, face_size
def predict_single_frame(frame):
"""
:param frame: A full frame of video
:return:
face_num: the number of face (just to verify if successfully detect a face)
shape: landmark locations
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = detector(gray, 0)
if len(faces) < 1:
return 0, None
face = faces[0]
landmarks = predictor(frame, face)
face_landmark_list = [(p.x, p.y) for p in landmarks.parts()]
shape = np.array(face_landmark_list)
return 1, shape
def landmark_align(shape):
desiredLeftEye = (0.35, 0.25)
desiredFaceWidth = 2
desiredFaceHeight = 2
(lStart, lEnd) = FACIAL_LANDMARKS_68_IDXS["left_eye"]
(rStart, rEnd) = FACIAL_LANDMARKS_68_IDXS["right_eye"]
leftEyePts = shape[lStart:lEnd]
rightEyePts = shape[rStart:rEnd]
# compute the center of mass for each eye
leftEyeCenter = leftEyePts.mean(axis=0) # .astype("int")
rightEyeCenter = rightEyePts.mean(axis=0) # .astype("int")
# compute the angle between the eye centroids
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) # - 180
# compute the desired right eye x-coordinate based on the
# desired x-coordinate of the left eye
desiredRightEyeX = 1.0 - desiredLeftEye[0]
# determine the scale of the new resulting image by taking
# the ratio of the distance between eyes in the *current*
# image to the ratio of distance between eyes in the
# *desired* image
dist = np.sqrt((dX ** 2) + (dY ** 2))
desiredDist = (desiredRightEyeX - desiredLeftEye[0])
desiredDist *= desiredFaceWidth
scale = desiredDist / dist
# compute center (x, y)-coordinates (i.e., the median point)
# between the two eyes in the input image
eyesCenter = ((leftEyeCenter[0] + rightEyeCenter[0]) // 2,
(leftEyeCenter[1] + rightEyeCenter[1]) // 2)
# grab the rotation matrix for rotating and scaling the face
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
# update the translation component of the matrix
tX = 0 # desiredFaceWidth * 0.5
tY = desiredFaceHeight * desiredLeftEye[1]
M[0, 2] += (tX - eyesCenter[0])
M[1, 2] += (tY - eyesCenter[1])
n, d = shape.shape
temp = np.zeros((n, d + 1), dtype="int")
temp[:, 0:2] = shape
temp[:, 2] = 1
aligned_landmarks = np.matmul(M, temp.T)
return aligned_landmarks.T # .astype("int"))
def check_and_merge(location, forward, feedback, P_predict, status_fw=None, status_fb=None):
num_pts = 68
check = [True] * num_pts
target = location[1]
forward_predict = forward[1]
# To ensure the robustness through feedback-check
forward_base = forward[0] # Also equal to location[0]
feedback_predict = feedback[0]
feedback_diff = feedback_predict - forward_base
feedback_dist = np.linalg.norm(feedback_diff, axis=1, keepdims=True)
# For Kalman Filtering
detect_diff = location[1] - location[0]
detect_dist = np.linalg.norm(detect_diff, axis=1, keepdims=True)
predict_diff = forward[1] - forward[0]
predict_dist = np.linalg.norm(predict_diff, axis=1, keepdims=True)
predict_dist[np.where(predict_dist == 0)] = 1 # Avoid nan
P_detect = (detect_dist / predict_dist).reshape(num_pts)
for ipt in range(num_pts):
if feedback_dist[ipt] > 2: # When use float
check[ipt] = False
if status_fw is not None and np.sum(status_fw) != num_pts:
for ipt in range(num_pts):
if status_fw[ipt][0] == 0:
check[ipt] = False
if status_fw is not None and np.sum(status_fb) != num_pts:
for ipt in range(num_pts):
if status_fb[ipt][0] == 0:
check[ipt] = False
location_merge = target.copy()
# Merge the results:
"""
Use Kalman Filter to combine the calculate result and detect result.
"""
Q = 0.3 # Process variance
for ipt in range(num_pts):
if check[ipt]:
# Kalman parameter
P_predict[ipt] += Q
K = P_predict[ipt] / (P_predict[ipt] + P_detect[ipt])
location_merge[ipt] = forward_predict[ipt] + K * (target[ipt] - forward_predict[ipt])
# Update the P_predict by the current K
P_predict[ipt] = (1 - K) * P_predict[ipt]
return location_merge, check, P_predict
def detect_frames_track(frames, fps, video):
frames_num = len(frames)
assert frames_num != 0
frame_height, frame_width = frames[0].shape[:2]
"""
Pre-process:
To detect the original results,
and normalize each face to a certain width,
also its corresponding landmarks locations and
scale parameter.
"""
face_size_normalized = 400
faces = []
locations = []
shapes_origin = []
shapes_para = [] # Use to recover the shape in whole frame. ([x1, y1, scale_shape])
face_size = 0
skipped = 0
"""
Use single frame to detect face on Dlib (CPU)
"""
# ----------------------------------------------------------------------------#
print("Detecting:")
for i in tqdm(range(frames_num)):
frame = frames[i]
face_num, shape = predict_single_frame(frame)
if face_num == 0:
if len(shapes_origin) == 0:
skipped += 1
# print("Skipped", skipped, "Frame_num", frames_num)
continue
shape = shapes_origin[i - 1 - skipped]
face, face_size = shape_to_face(shape, frame_width, frame_height, 1.2)
faceFrame = frame[face[1]: face[3],
face[0]:face[2]]
if face_size < face_size_normalized:
inter_para = cv2.INTER_CUBIC
else:
inter_para = cv2.INTER_AREA
face_norm = cv2.resize(faceFrame, (face_size_normalized, face_size_normalized), interpolation=inter_para)
scale_shape = face_size_normalized / face_size
shape_norm = np.rint((shape - np.array([face[0], face[1]])) * scale_shape).astype(int)
faces.append(face_norm)
shapes_para.append([face[0], face[1], scale_shape])
shapes_origin.append(shape)
locations.append(shape_norm)
"""
Calibration module.
"""
segment_length = 2
locations_sum = len(locations)
if locations_sum == 0:
return []
locations_track = [locations[0]]
num_pts = 68
P_predict = np.array([0] * num_pts).reshape(num_pts).astype(float)
print("Tracking")
for i in tqdm(range(locations_sum - 1)):
faces_seg = faces[i:i + segment_length]
locations_seg = locations[i:i + segment_length]
# ----------------------------------------------------------------------#
"""
Numpy Version (DEPRECATED)
"""
# locations_track_start = [locations_track[i]]
# forward_pts, feedback_pts = track_bidirectional(faces_seg, locations_track_start)
#
# forward_pts = np.rint(forward_pts).astype(int)
# feedback_pts = np.rint(feedback_pts).astype(int)
# merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict)
# ----------------------------------------------------------------------#
"""
OpenCV Version
"""
lk_params = dict(winSize=(15, 15),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
# Use the tracked current location as input. Also use the next frame's predicted location for
# auxiliary initialization.
start_pt = locations_track[i].astype(np.float32)
target_pt = locations_seg[1].astype(np.float32)
forward_pt, status_fw, err_fw = cv2.calcOpticalFlowPyrLK(faces_seg[0], faces_seg[1],
start_pt, target_pt, **lk_params,
flags=cv2.OPTFLOW_USE_INITIAL_FLOW)
feedback_pt, status_fb, err_fb = cv2.calcOpticalFlowPyrLK(faces_seg[1], faces_seg[0],
forward_pt, start_pt, **lk_params,
flags=cv2.OPTFLOW_USE_INITIAL_FLOW)
forward_pts = [locations_track[i].copy(), forward_pt]
feedback_pts = [feedback_pt, forward_pt.copy()]
forward_pts = np.rint(forward_pts).astype(int)
feedback_pts = np.rint(feedback_pts).astype(int)
merge_pt, check, P_predict = check_and_merge(locations_seg, forward_pts, feedback_pts, P_predict, status_fw,
status_fb)
# ----------------------------------------------------------------------#
locations_track.append(merge_pt)
"""
If us visualization, write the results to the visualize output folder.
"""
if locations_sum != frames_num:
print("INFO: Landmarks detection failed in some frames. Therefore we disable the "
"visualization for this video. It will be optimized in future version.")
aligned_landmarks = []
for i in locations_track:
shape = landmark_align(i)
shape = shape.ravel()
shape = shape.tolist()
aligned_landmarks.append(shape)
return aligned_landmarks