Spaces:
Paused
Paused
| # Copyright (c) 2021, InterDigital R&D France. All rights reserved. | |
| # | |
| # This source code is made available under the license found in the | |
| # LICENSE.txt in the root directory of this source tree. | |
| import cv2 | |
| import glob | |
| import numpy as np | |
| import os | |
| import face_alignment | |
| import torch | |
| from PIL import Image, ImageFilter | |
| from scipy import ndimage | |
| from scipy.ndimage import gaussian_filter1d | |
| from skimage import io | |
| from torchvision import transforms, utils | |
| def pil_to_cv2(pil_image): | |
| open_cv_image = np.array(pil_image) | |
| return open_cv_image[:, :, ::-1].copy() | |
| def cv2_to_pil(open_cv_image): | |
| return Image.fromarray(open_cv_image[:, :, ::-1].copy()) | |
| def put_text(img, text): | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| bottomLeftCornerOfText = (10,50) | |
| fontScale = 1.5 | |
| fontColor = (255,255,0) | |
| lineType = 2 | |
| return cv2.putText(img, text, | |
| bottomLeftCornerOfText, | |
| font, | |
| fontScale, | |
| fontColor, | |
| lineType) | |
| # Compare frames in two directory | |
| def compare_frames(save_dir, origin_dir, target_dir, strs='Original,Projected,Manipulated', dim=None): | |
| os.makedirs(save_dir, exist_ok=True) | |
| try: | |
| if not isinstance(target_dir, list): | |
| target_dir = [target_dir] | |
| image_list = glob.glob1(origin_dir,'frame*') | |
| image_list.sort() | |
| for name in image_list: | |
| img_l = [] | |
| for idx, dir_path in enumerate([origin_dir] + list(target_dir)): | |
| img_1 = cv2.imread(dir_path + name) | |
| img_1 = put_text(img_1, strs.split(',')[idx]) | |
| img_l.append(img_1) | |
| img = np.concatenate(img_l, axis=1) | |
| cv2.imwrite(save_dir + name, img) | |
| except FileNotFoundError: | |
| pass | |
| # Save frames into video | |
| def create_video(image_folder, fps=24, video_format='.mp4', resize_ratio=1): | |
| video_name = os.path.dirname(image_folder) + video_format | |
| img_list = glob.glob1(image_folder,'frame*') | |
| img_list.sort() | |
| frame = cv2.imread(os.path.join(image_folder, img_list[0])) | |
| frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio) | |
| height, width, layers = frame.shape | |
| if video_format == '.mp4': | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| elif video_format == '.avi': | |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
| video = cv2.VideoWriter(video_name, fourcc, fps, (width,height)) | |
| for image_name in img_list: | |
| frame = cv2.imread(os.path.join(image_folder, image_name)) | |
| frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio) | |
| video.write(frame) | |
| # Split video into frames | |
| def video_to_frames(video_path, frame_path, img_format='.jpg', count_num=1000, resize=False): | |
| os.makedirs(frame_path, exist_ok=True) | |
| vidcap = cv2.VideoCapture(video_path) | |
| success,image = vidcap.read() | |
| count = 0 | |
| while success: | |
| if resize: | |
| image = cv2.resize(image, (0,0), fx=0.5, fy=0.5) | |
| cv2.imwrite(frame_path + '/frame%04d' % count + img_format, image) | |
| success,image = vidcap.read() | |
| count += 1 | |
| if count >= count_num: | |
| break | |
| # Align faces | |
| def align_frames(img_dir, save_dir, output_size=1024, transform_size=1024, optical_flow=True, gaussian=True, filter_size=3): | |
| os.makedirs(save_dir, exist_ok=True) | |
| # load face landmark detector | |
| fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda') | |
| # list images in the directory | |
| img_list = glob.glob1(img_dir, 'frame*') | |
| img_list.sort() | |
| # save align statistics | |
| stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]} | |
| lms = [] | |
| for idx, img_name in enumerate(img_list): | |
| img_path = os.path.join(img_dir, img_name) | |
| img = io.imread(img_path) | |
| lm = [] | |
| preds = fa.get_landmarks(img) | |
| for kk in range(68): | |
| lm.append((preds[0][kk][0], preds[0][kk][1])) | |
| # Eye distance | |
| lm_eye_left = lm[36 : 42] # left-clockwise | |
| lm_eye_right = lm[42 : 48] # left-clockwise | |
| eye_left = np.mean(lm_eye_left, axis=0) | |
| eye_right = np.mean(lm_eye_right, axis=0) | |
| eye_to_eye = eye_right - eye_left | |
| if optical_flow: | |
| if idx > 0: | |
| s = int(np.hypot(*eye_to_eye)/4) | |
| lk_params = dict(winSize=(s, s), maxLevel=5, criteria = (cv2.TERM_CRITERIA_COUNT | cv2.TERM_CRITERIA_EPS, 10, 0.03)) | |
| points_arr = np.array(lm, np.float32) | |
| points_prevarr = np.array(prev_lm, np.float32) | |
| points_arr,status, err = cv2.calcOpticalFlowPyrLK(prev_img, img, points_prevarr, points_arr, **lk_params) | |
| sigma =100 | |
| points_arr_float = np.array(points_arr,np.float32) | |
| points = points_arr_float.tolist() | |
| for k in range(0, len(lm)): | |
| d = cv2.norm(np.array(prev_lm[k]) - np.array(lm[k])) | |
| alpha = np.exp(-d*d/sigma) | |
| lm[k] = (1 - alpha) * np.array(lm[k]) + alpha * np.array(points[k]) | |
| prev_img = img | |
| prev_lm = lm | |
| lms.append(lm) | |
| # Apply gaussian filter on landmarks | |
| if gaussian: | |
| lm_filtered = np.array(lms) | |
| for kk in range(68): | |
| lm_filtered[:, kk, 0] = gaussian_filter1d(lm_filtered[:, kk, 0], filter_size) | |
| lm_filtered[:, kk, 1] = gaussian_filter1d(lm_filtered[:, kk, 1], filter_size) | |
| lms = lm_filtered.tolist() | |
| # save landmarks | |
| landmark_out_dir = os.path.dirname(img_dir) + '_landmark/' | |
| os.makedirs(landmark_out_dir, exist_ok=True) | |
| for idx, img_name in enumerate(img_list): | |
| img_path = os.path.join(img_dir, img_name) | |
| img = io.imread(img_path) | |
| lm = lms[idx] | |
| img_lm = img.copy() | |
| for kk in range(68): | |
| img_lm = cv2.circle(img_lm, (int(lm[kk][0]),int(lm[kk][1])), radius=3, color=(255, 0, 255), thickness=-1) | |
| # Save landmark images | |
| cv2.imwrite(landmark_out_dir + img_name, img_lm[:,:,::-1]) | |
| # Save mask images | |
| """ | |
| seg_mask = np.zeros(img.shape, img.dtype) | |
| poly = np.array(lm[0:17] + lm[17:27][::-1], np.int32) | |
| cv2.fillPoly(seg_mask, [poly], (255, 255, 255)) | |
| cv2.imwrite(img_dir + "mask%04d.jpg"%idx, seg_mask); | |
| """ | |
| # Parse landmarks. | |
| lm_eye_left = lm[36 : 42] # left-clockwise | |
| lm_eye_right = lm[42 : 48] # left-clockwise | |
| lm_mouth_outer = lm[48 : 60] # left-clockwise | |
| # Calculate auxiliary vectors. | |
| eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0) | |
| eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0) | |
| eye_avg = (eye_left + eye_right) * 0.5 | |
| eye_to_eye = eye_right - eye_left | |
| mouth_left = np.array(lm_mouth_outer[0]) | |
| mouth_right = np.array(lm_mouth_outer[6]) | |
| mouth_avg = (mouth_left + mouth_right) * 0.5 | |
| eye_to_mouth = mouth_avg - eye_avg | |
| # Choose oriented crop rectangle. | |
| x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1] | |
| x /= np.hypot(*x) | |
| x *= max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8) | |
| y = np.flipud(x) * [-1, 1] | |
| c = eye_avg + eye_to_mouth * 0.1 | |
| quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y]) | |
| qsize = np.hypot(*x) * 2 | |
| stat_dict['coord'].append(quad) | |
| stat_dict['qsize'].append(qsize) | |
| # Apply gaussian filter on crops | |
| if gaussian: | |
| quads = np.array(stat_dict['coord']) | |
| quads = gaussian_filter1d(quads, 2*filter_size, axis=0) | |
| stat_dict['coord'] = quads.tolist() | |
| qsize = np.array(stat_dict['qsize']) | |
| qsize = gaussian_filter1d(qsize, 2*filter_size, axis=0) | |
| stat_dict['qsize'] = qsize.tolist() | |
| for idx, img_name in enumerate(img_list): | |
| img_path = os.path.join(img_dir, img_name) | |
| img = Image.open(img_path) | |
| qsize = stat_dict['qsize'][idx] | |
| quad = np.array(stat_dict['coord'][idx]) | |
| # Crop. | |
| border = max(int(np.rint(qsize * 0.1)), 3) | |
| crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
| crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1])) | |
| if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]: | |
| img = img.crop(crop) | |
| quad -= crop[0:2] | |
| stat_dict['crop'].append(crop) | |
| stat_dict['quad'].append((quad + 0.5).flatten()) | |
| # Pad. | |
| pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
| pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0)) | |
| if max(pad) > border - 4: | |
| pad = np.maximum(pad, int(np.rint(qsize * 0.3))) | |
| img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'reflect') | |
| h, w, _ = img.shape | |
| y, x, _ = np.ogrid[:h, :w, :1] | |
| img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB') | |
| quad += pad[:2] | |
| # Transform. | |
| img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR) | |
| # resizing | |
| img_pil = img.resize((output_size, output_size), Image.LANCZOS) | |
| img_pil.save(save_dir+img_name) | |
| create_video(landmark_out_dir) | |
| np.save(save_dir+'stat_dict.npy', stat_dict) | |
| def find_coeffs(pa, pb): | |
| matrix = [] | |
| for p1, p2 in zip(pa, pb): | |
| matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]]) | |
| matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]]) | |
| A = np.matrix(matrix, dtype=np.float) | |
| B = np.array(pb).reshape(8) | |
| res = np.dot(np.linalg.inv(A.T * A) * A.T, B) | |
| return np.array(res).reshape(8) | |
| # reproject aligned frames to the original video | |
| def video_reproject(orig_dir_path, recon_dir_path, save_dir_path, state_dir_path, mask_dir_path, seamless=False): | |
| if not os.path.exists(save_dir_path): | |
| os.makedirs(save_dir_path) | |
| img_list_0 = glob.glob1(orig_dir_path,'frame*') | |
| img_list_2 = glob.glob1(recon_dir_path,'frame*') | |
| img_list_0.sort() | |
| img_list_2.sort() | |
| stat_dict = np.load(state_dir_path + 'stat_dict.npy', allow_pickle=True).item() | |
| counter = len(img_list_2) | |
| for idx in range(counter): | |
| img_0 = Image.open(orig_dir_path + img_list_0[idx]) | |
| img_2 = Image.open(recon_dir_path + img_list_2[idx]) | |
| quad_f = stat_dict['quad'][idx] | |
| quad_0 = stat_dict['crop'][idx] | |
| coeffs = find_coeffs( | |
| [(quad_f[0], quad_f[1]), (quad_f[2] , quad_f[3]), (quad_f[4], quad_f[5]), (quad_f[6], quad_f[7])], | |
| [(0, 0), (0, 1024), (1024, 1024), (1024, 0)]) | |
| crop_size = (quad_0[2] - quad_0[0], quad_0[3] - quad_0[1]) | |
| img_2 = img_2.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC) | |
| output = img_0.copy() | |
| output.paste(img_2, (int(quad_0[0]), int(quad_0[1]))) | |
| """ | |
| mask = cv2.imread(orig_dir_path + 'mask%04d.jpg'%idx) | |
| kernel = np.ones((10,10), np.uint8) | |
| mask = cv2.dilate(mask, kernel, iterations=5) | |
| """ | |
| crop_mask = Image.open(mask_dir_path + img_list_0[idx]) | |
| crop_mask = crop_mask.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC) | |
| mask = Image.fromarray(np.zeros(np.array(img_0).shape, np.array(img_0).dtype)) | |
| mask.paste(crop_mask, (int(quad_0[0]), int(quad_0[1]))) | |
| mask = pil_to_cv2(mask) | |
| # Apply mask | |
| if not seamless: | |
| mask = cv2_to_pil(mask).filter(ImageFilter.GaussianBlur(radius=10)).convert('L') | |
| mask = np.array(mask)[:, :, np.newaxis]/255. | |
| output = np.array(img_0)*(1-mask) + np.array(output)*mask | |
| output = Image.fromarray(output.astype(np.uint8)) | |
| output.save(save_dir_path + img_list_2[idx]) | |
| else: | |
| src = pil_to_cv2(output) | |
| dst = pil_to_cv2(img_0) | |
| # clone | |
| br = cv2.boundingRect(cv2.split(mask)[0]) # bounding rect (x,y,width,height) | |
| center = (br[0] + br[2] // 2, br[1] + br[3] // 2) | |
| output = cv2.seamlessClone(src, dst, mask, center, cv2.NORMAL_CLONE) | |
| cv2.imwrite(save_dir_path + img_list_2[idx], output) | |
| # Align faces | |
| def align_image(img_dir, save_dir, output_size=1024, transform_size=1024, format='*.png'): | |
| os.makedirs(save_dir, exist_ok=True) | |
| # load face landmark detector | |
| fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda') | |
| # list images in the directory | |
| img_list = glob.glob1(img_dir, format) | |
| #img_list = os.listdir(img_dir) | |
| img_list.sort() | |
| # save align statistics | |
| stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]} | |
| for idx, img_name in enumerate(img_list): | |
| img_path = os.path.join(img_dir, img_name) | |
| img = Image.open(img_path).convert('RGB') | |
| img_np = np.array(img) | |
| lm = [] | |
| preds = fa.get_landmarks(img_np) | |
| for kk in range(68): | |
| lm.append((preds[0][kk][0], preds[0][kk][1])) | |
| if len(lm)==0: | |
| continue | |
| # Parse landmarks. Code extracted from ffhq-dataset | |
| # pylint: disable=unused-variable | |
| lm_chin = lm[0 : 17] # left-right | |
| lm_eyebrow_left = lm[17 : 22] # left-right | |
| lm_eyebrow_right = lm[22 : 27] # left-right | |
| lm_nose = lm[27 : 31] # top-down | |
| lm_nostrils = lm[31 : 36] # top-down | |
| lm_eye_left = lm[36 : 42] # left-clockwise | |
| lm_eye_right = lm[42 : 48] # left-clockwise | |
| lm_mouth_outer = lm[48 : 60] # left-clockwise | |
| lm_mouth_inner = lm[60 : 68] # left-clockwise | |
| # Calculate auxiliary vectors. | |
| eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0) | |
| eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0) | |
| eye_avg = (eye_left + eye_right) * 0.5 | |
| eye_to_eye = eye_right - eye_left | |
| mouth_left = np.array(lm_mouth_outer[0]) | |
| mouth_right = np.array(lm_mouth_outer[6]) | |
| mouth_avg = (mouth_left + mouth_right) * 0.5 | |
| eye_to_mouth = mouth_avg - eye_avg | |
| # Choose oriented crop rectangle. | |
| x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1] | |
| x /= np.hypot(*x) | |
| x *= np.hypot(*eye_to_eye) * 2.0#max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8) | |
| y = np.flipud(x) * [-1, 1] | |
| c = eye_avg + eye_to_mouth * 0.1 | |
| quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y]) | |
| qsize = np.hypot(*x) * 2 | |
| stat_dict['coord'].append(quad) | |
| stat_dict['qsize'].append(qsize) | |
| qsize = stat_dict['qsize'][idx] | |
| quad = np.array(stat_dict['coord'][idx]) | |
| """ | |
| # Shrink. | |
| shrink = int(np.floor(qsize / output_size * 0.5)) | |
| if shrink > 1: | |
| print('shrink!') | |
| rsize = (int(np.rint(float(img.size[0]) / shrink)), int(np.rint(float(img.size[1]) / shrink))) | |
| img = img.resize(rsize, Image.ANTIALIAS) | |
| quad /= shrink | |
| qsize /= shrink | |
| """ | |
| # Crop. | |
| border = max(int(np.rint(qsize * 0.1)), 3) | |
| crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
| crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1])) | |
| if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]: | |
| img = img.crop(crop) | |
| quad -= crop[0:2] | |
| stat_dict['crop'].append(crop) | |
| stat_dict['quad'].append((quad + 0.5).flatten()) | |
| #img = img.crop(crop) | |
| # Pad. | |
| pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
| pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0)) | |
| if max(pad) > border - 4: | |
| pad = np.maximum(pad, int(np.rint(qsize * 0.3))) | |
| img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'edge') | |
| h, w, _ = img.shape | |
| y, x, _ = np.ogrid[:h, :w, :1] | |
| img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB') | |
| quad += pad[:2] | |
| # Transform. | |
| img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR) | |
| img_pil = img.resize((output_size, output_size), Image.LANCZOS) | |
| # resizing | |
| img_pil.save(save_dir+img_name) | |
| np.save(save_dir+'stat_dict.npy', stat_dict) | |
| img_to_tensor = transforms.Compose([ | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| def generate_mask(img_dir, save_dir, parsing_net, labels=[1,2,3,4,5,6,9,10,11,12,13], output_size=(1024, 1024), device=torch.device('cuda')): | |
| os.makedirs(save_dir, exist_ok=True) | |
| img_list = glob.glob1(img_dir, 'frame*') | |
| img_list.sort() | |
| for img_name in img_list: | |
| img_path = os.path.join(img_dir, img_name) | |
| img = Image.open(img_path).resize((512, 512), Image.LANCZOS) | |
| x_1 = img_to_tensor(img).unsqueeze(0).to(device) | |
| out_1 = parsing_net(x_1) | |
| parsing = out_1[0].squeeze(0).detach().cpu().numpy().argmax(0) | |
| mask = np.uint8(parsing) | |
| for j in labels: | |
| mask = np.where(mask==j, 255, mask) | |
| mask = np.where(mask==255, 255, 0) | |
| mask_pil = Image.fromarray(np.uint8(mask)).resize(output_size, Image.LANCZOS) | |
| save_path = os.path.join(save_dir, img_name) | |
| mask_pil.save(save_path) |