Spaces:
Runtime error
Runtime error
| """ | |
| Extract faces | |
| Video Face Manipulation Detection Through Ensemble of CNNs | |
| Image and Sound Processing Lab - Politecnico di Milano | |
| NicolΓ² Bonettini | |
| Edoardo Daniele Cannas | |
| Sara Mandelli | |
| Luca Bondi | |
| Paolo Bestagini | |
| """ | |
| import argparse | |
| import sys | |
| import traceback | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import partial | |
| from pathlib import Path | |
| from typing import Tuple, List | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.cuda | |
| from PIL import Image | |
| from tqdm import tqdm | |
| import blazeface | |
| from blazeface import BlazeFace, VideoReader, FaceExtractor | |
| from isplutils.utils import adapt_bb | |
| def parse_args(argv): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--source', type=Path, help='Videos root directory', required=True) | |
| parser.add_argument('--videodf', type=Path, help='Path to read the videos DataFrame', required=True) | |
| parser.add_argument('--facesfolder', type=Path, help='Faces output root directory', required=True) | |
| parser.add_argument('--facesdf', type=Path, help='Path to save the output DataFrame of faces', required=True) | |
| parser.add_argument('--checkpoint', type=Path, help='Path to save the temporary per-video outputs', required=True) | |
| parser.add_argument('--fpv', type=int, default=32, help='Frames per video') | |
| parser.add_argument('--device', type=torch.device, | |
| default=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'), | |
| help='Device to use for face extraction') | |
| parser.add_argument('--collateonly', help='Only perform collation of pre-existing results', action='store_true') | |
| parser.add_argument('--noindex', help='Do not rebuild the index', action='store_false') | |
| parser.add_argument('--batch', type=int, help='Batch size', default=16) | |
| parser.add_argument('--threads', type=int, help='Number of threads', default=8) | |
| parser.add_argument('--offset', type=int, help='Offset to start extraction', default=0) | |
| parser.add_argument('--num', type=int, help='Number of videos to process', default=0) | |
| parser.add_argument('--lazycheck', action='store_true', help='Lazy check of existing video indexes') | |
| parser.add_argument('--deepcheck', action='store_true', help='Try to open every image') | |
| return parser.parse_args(argv) | |
| def main(argv): | |
| args = parse_args(argv) | |
| ## Parameters parsing | |
| device: torch.device = args.device | |
| source_dir: Path = args.source | |
| facedestination_dir: Path = args.facesfolder | |
| frames_per_video: int = args.fpv | |
| videodataset_path: Path = args.videodf | |
| facesdataset_path: Path = args.facesdf | |
| collateonly: bool = args.collateonly | |
| batch_size: int = args.batch | |
| threads: int = args.threads | |
| offset: int = args.offset | |
| num: int = args.num | |
| lazycheck: bool = args.lazycheck | |
| deepcheck: bool = args.deepcheck | |
| checkpoint_folder: Path = args.checkpoint | |
| index_enable: bool = args.noindex | |
| ## Parameters | |
| face_size = 512 | |
| print('Loading video DataFrame') | |
| df_videos = pd.read_pickle(videodataset_path) | |
| if num > 0: | |
| df_videos_process = df_videos.iloc[offset:offset + num] | |
| else: | |
| df_videos_process = df_videos.iloc[offset:] | |
| if not collateonly: | |
| ## Blazeface loading | |
| print('Loading face extractor') | |
| facedet = BlazeFace().to(device) | |
| facedet.load_weights("blazeface/blazeface.pth") | |
| facedet.load_anchors("blazeface/anchors.npy") | |
| videoreader = VideoReader(verbose=False) | |
| video_read_fn = lambda x: videoreader.read_frames(x, num_frames=frames_per_video) | |
| face_extractor = FaceExtractor(video_read_fn, facedet) | |
| ## Face extraction | |
| with ThreadPoolExecutor(threads) as p: | |
| for batch_idx0 in tqdm(np.arange(start=0, stop=len(df_videos_process), step=batch_size), | |
| desc='Extracting faces'): | |
| tosave_list = list(p.map(partial(process_video, | |
| source_dir=source_dir, | |
| facedestination_dir=facedestination_dir, | |
| checkpoint_folder=checkpoint_folder, | |
| face_size=face_size, | |
| face_extractor=face_extractor, | |
| lazycheck=lazycheck, | |
| deepcheck=deepcheck, | |
| ), | |
| df_videos_process.iloc[batch_idx0:batch_idx0 + batch_size].iterrows())) | |
| for tosave in tosave_list: | |
| if tosave is not None: | |
| if len(tosave[2]): | |
| list(p.map(save_jpg, tosave[2])) | |
| tosave[1].parent.mkdir(parents=True, exist_ok=True) | |
| tosave[0].to_pickle(str(tosave[1])) | |
| if index_enable: | |
| # Collect checkpoints | |
| df_videos['nfaces'] = np.zeros(len(df_videos), np.uint8) | |
| faces_dataset = [] | |
| for idx, record in tqdm(df_videos.iterrows(), total=len(df_videos), desc='Collecting faces results'): | |
| # Checkpoint | |
| video_face_checkpoint_path = checkpoint_folder.joinpath(record['path']).with_suffix('.faces.pkl') | |
| if video_face_checkpoint_path.exists(): | |
| try: | |
| df_video_faces = pd.read_pickle(str(video_face_checkpoint_path)) | |
| # Fix same attribute issue | |
| df_video_faces = df_video_faces.rename(columns={'subject': 'videosubject'}, errors='ignore') | |
| nfaces = len( | |
| np.unique(df_video_faces.index.map(lambda x: int(x.split('_subj')[1].split('.jpg')[0])))) | |
| df_videos.loc[idx, 'nfaces'] = nfaces | |
| faces_dataset.append(df_video_faces) | |
| except Exception as e: | |
| print('Error while reading: {}'.format(video_face_checkpoint_path)) | |
| print(e) | |
| video_face_checkpoint_path.unlink() | |
| if len(faces_dataset) == 0: | |
| raise ValueError(f'No checkpoint found from face extraction. ' | |
| f'Is the the source path {source_dir} correct for the videos in your dataframe?') | |
| # Save videos with updated faces | |
| print('Saving videos DataFrame to {}'.format(videodataset_path)) | |
| df_videos.to_pickle(str(videodataset_path)) | |
| if offset > 0: | |
| if num > 0: | |
| if facesdataset_path.is_dir(): | |
| facesdataset_path = facesdataset_path.joinpath( | |
| 'faces_df_from_video_{}_to_video_{}.pkl'.format(offset, num + offset)) | |
| else: | |
| facesdataset_path = facesdataset_path.parent.joinpath( | |
| str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}_to_video_{}.pkl'.format(offset, | |
| num + offset)) | |
| else: | |
| if facesdataset_path.is_dir(): | |
| facesdataset_path = facesdataset_path.joinpath('faces_df_from_video_{}.pkl'.format(offset)) | |
| else: | |
| facesdataset_path = facesdataset_path.parent.joinpath( | |
| str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}.pkl'.format(offset)) | |
| elif num > 0: | |
| if facesdataset_path.is_dir(): | |
| facesdataset_path = facesdataset_path.joinpath( | |
| 'faces_df_from_video_{}_to_video_{}.pkl'.format(0, num)) | |
| else: | |
| facesdataset_path = facesdataset_path.parent.joinpath( | |
| str(facesdataset_path.parts[-1]).split('.')[0] + '_from_video_{}_to_video_{}.pkl'.format(0, num)) | |
| else: | |
| if facesdataset_path.is_dir(): | |
| facesdataset_path = facesdataset_path.joinpath('faces_df.pkl') # just a check if the path is a dir | |
| # Creates directory (if doesn't exist) | |
| facesdataset_path.parent.mkdir(parents=True, exist_ok=True) | |
| print('Saving faces DataFrame to {}'.format(facesdataset_path)) | |
| df_faces = pd.concat(faces_dataset, axis=0, ) | |
| df_faces['video'] = df_faces['video'].astype('category') | |
| for key in ['kp1x', 'kp1y', 'kp2x', 'kp2y', 'kp3x', | |
| 'kp3y', 'kp4x', 'kp4y', 'kp5x', 'kp5y', 'kp6x', 'kp6y', 'left', | |
| 'top', 'right', 'bottom', ]: | |
| df_faces[key] = df_faces[key].astype(np.int16) | |
| df_faces['videosubject'] = df_faces['videosubject'].astype(np.int8) | |
| # Eventually remove duplicates | |
| df_faces = df_faces.loc[~df_faces.index.duplicated(keep='first')] | |
| fields_to_preserve_from_video = [i for i in | |
| ['folder', 'subject', 'scene', 'cluster', 'nfaces', 'test'] if | |
| i in df_videos] | |
| df_faces = pd.merge(df_faces, df_videos[fields_to_preserve_from_video], left_on='video', | |
| right_index=True) | |
| df_faces.to_pickle(str(facesdataset_path)) | |
| print('Completed!') | |
| def save_jpg(args: Tuple[Image.Image, Path or str]): | |
| image, path = args | |
| image.save(path, quality=95, subsampling='4:4:4') | |
| def process_video(item: Tuple[pd.Index, pd.Series], | |
| source_dir: Path, | |
| facedestination_dir: Path, | |
| checkpoint_folder: Path, | |
| face_size: int, | |
| face_extractor: FaceExtractor, | |
| lazycheck: bool = False, | |
| deepcheck: bool = False, | |
| ) -> (pd.DataFrame, Path, List[Tuple[Image.Image, Path]]) or None: | |
| # Instatiate Index and Series | |
| idx, record = item | |
| # Checkpoint | |
| video_faces_checkpoint_path = checkpoint_folder.joinpath(record['path']).with_suffix('.faces.pkl') | |
| if not lazycheck: | |
| if video_faces_checkpoint_path.exists(): | |
| try: | |
| df_video_faces = pd.read_pickle(str(video_faces_checkpoint_path)) | |
| for _, r in df_video_faces.iterrows(): | |
| face_path = facedestination_dir.joinpath(r.name) | |
| assert (face_path.exists()) | |
| if deepcheck: | |
| img = Image.open(face_path) | |
| img_arr = np.asarray(img) | |
| assert (img_arr.ndim == 3) | |
| assert (np.prod(img_arr.shape) > 0) | |
| except Exception as e: | |
| print('Error while checking: {}'.format(video_faces_checkpoint_path)) | |
| print(e) | |
| video_faces_checkpoint_path.unlink() | |
| if not (video_faces_checkpoint_path.exists()): | |
| try: | |
| video_face_dict_list = [] | |
| # Load faces | |
| current_video_path = source_dir.joinpath(record['path']) | |
| if not current_video_path.exists(): | |
| raise FileNotFoundError(f'Unable to find {current_video_path}.' | |
| f'Are you sure that {source_dir} is the correct source directory for the video ' | |
| f'you indexed in the dataframe?') | |
| frames = face_extractor.process_video(current_video_path) | |
| if len(frames) == 0: | |
| return | |
| face_extractor.keep_only_best_face(frames) | |
| for frame_idx, frame in enumerate(frames): | |
| frames[frame_idx]['subjects'] = [0] * len(frames[frame_idx]['detections']) | |
| # Extract and save faces, bounding boxes, keypoints | |
| images_to_save: List[Tuple[Image.Image, Path]] = [] | |
| for frame_idx, frame in enumerate(frames): | |
| if len(frames[frame_idx]['detections']): | |
| fullframe = Image.fromarray(frames[frame_idx]['frame']) | |
| # Preserve the only found face even if not a good one, otherwise preserve only clusters > -1 | |
| subjects = np.unique(frames[frame_idx]['subjects']) | |
| if len(subjects) > 1: | |
| subjects = np.asarray([s for s in subjects if s > -1]) | |
| for face_idx, _ in enumerate(frame['faces']): | |
| subj_id = frames[frame_idx]['subjects'][face_idx] | |
| if subj_id in subjects: # Exclude outliers if other faces detected | |
| face_path = facedestination_dir.joinpath(record['path'], 'fr{:03d}_subj{:1d}.jpg'.format( | |
| frames[frame_idx]['frame_idx'], subj_id)) | |
| face_dict = {'facepath': str(face_path.relative_to(facedestination_dir)), 'video': idx, | |
| 'label': record['label'], 'videosubject': subj_id, | |
| 'original': record['original']} | |
| # add attibutes for ff++ | |
| if 'class' in record.keys(): | |
| face_dict.update({'class': record['class']}) | |
| if 'source' in record.keys(): | |
| face_dict.update({'source': record['source']}) | |
| if 'quality' in record.keys(): | |
| face_dict.update({'quality': record['quality']}) | |
| for field_idx, key in enumerate(blazeface.BlazeFace.detection_keys): | |
| face_dict[key] = frames[frame_idx]['detections'][face_idx][field_idx] | |
| cropping_bb = adapt_bb(frame_height=fullframe.height, | |
| frame_width=fullframe.width, | |
| bb_height=face_size, | |
| bb_width=face_size, | |
| left=face_dict['xmin'], | |
| top=face_dict['ymin'], | |
| right=face_dict['xmax'], | |
| bottom=face_dict['ymax']) | |
| face = fullframe.crop(cropping_bb) | |
| for key in blazeface.BlazeFace.detection_keys: | |
| if (key[0] == 'k' and key[-1] == 'x') or (key[0] == 'x'): | |
| face_dict[key] -= cropping_bb[0] | |
| elif (key[0] == 'k' and key[-1] == 'y') or (key[0] == 'y'): | |
| face_dict[key] -= cropping_bb[1] | |
| face_dict['left'] = face_dict.pop('xmin') | |
| face_dict['top'] = face_dict.pop('ymin') | |
| face_dict['right'] = face_dict.pop('xmax') | |
| face_dict['bottom'] = face_dict.pop('ymax') | |
| face_path.parent.mkdir(parents=True, exist_ok=True) | |
| images_to_save.append((face, face_path)) | |
| video_face_dict_list.append(face_dict) | |
| if len(video_face_dict_list) > 0: | |
| df_video_faces = pd.DataFrame(video_face_dict_list) | |
| df_video_faces.index = df_video_faces['facepath'] | |
| del df_video_faces['facepath'] | |
| # type conversions | |
| for key in ['kp1x', 'kp1y', 'kp2x', 'kp2y', 'kp3x', 'kp3y', | |
| 'kp4x', 'kp4y', 'kp5x', 'kp5y', 'kp6x', 'kp6y', 'left', 'top', | |
| 'right', 'bottom']: | |
| df_video_faces[key] = df_video_faces[key].astype(np.int16) | |
| df_video_faces['conf'] = df_video_faces['conf'].astype(np.float32) | |
| df_video_faces['video'] = df_video_faces['video'].astype('category') | |
| video_faces_checkpoint_path.parent.mkdir(parents=True, exist_ok=True) | |
| else: | |
| print('No faces extracted for video {}'.format(record['path'])) | |
| df_video_faces = pd.DataFrame() | |
| return df_video_faces, video_faces_checkpoint_path, images_to_save | |
| except Exception as e: | |
| print('Error while processing: {}'.format(record['path'])) | |
| print("-" * 60) | |
| traceback.print_exc(file=sys.stdout, limit=5) | |
| print("-" * 60) | |
| return | |
| if __name__ == '__main__': | |
| main(sys.argv[1:]) | |