Spaces:
Runtime error
Runtime error
import os | |
from typing import Tuple, List | |
import cv2 | |
import numpy as np | |
import torch | |
from PIL import Image | |
from blazeface import BlazeFace | |
class FaceExtractor: | |
"""Wrapper for face extraction workflow.""" | |
def __init__(self, video_read_fn = None, facedet: BlazeFace = None): | |
"""Creates a new FaceExtractor. | |
Arguments: | |
video_read_fn: a function that takes in a path to a video file | |
and returns a tuple consisting of a NumPy array with shape | |
(num_frames, H, W, 3) and a list of frame indices, or None | |
in case of an error | |
facedet: the face detector object | |
""" | |
self.video_read_fn = video_read_fn | |
self.facedet = facedet | |
def process_image(self, path: str = None, img: Image.Image or np.ndarray = None) -> dict: | |
""" | |
Process a single image | |
:param path: Path to the image | |
:param img: image | |
:return: | |
""" | |
if img is not None and path is not None: | |
raise ValueError('Only one argument between path and img can be specified') | |
if img is None and path is None: | |
raise ValueError('At least one argument between path and img must be specified') | |
target_size = self.facedet.input_size | |
if img is None: | |
img = np.asarray(Image.open(str(path))) | |
else: | |
img = np.asarray(img) | |
# Split the frames into several tiles. Resize the tiles to 128x128. | |
tiles, resize_info = self._tile_frames(np.expand_dims(img, 0), target_size) | |
# tiles has shape (num_tiles, target_size, target_size, 3) | |
# resize_info is a list of four elements [resize_factor_y, resize_factor_x, 0, 0] | |
# Run the face detector. The result is a list of PyTorch tensors, | |
# one for each tile in the batch. | |
detections = self.facedet.predict_on_batch(tiles, apply_nms=False) | |
# Convert the detections from 128x128 back to the original frame size. | |
detections = self._resize_detections(detections, target_size, resize_info) | |
# Because we have several tiles for each frame, combine the predictions | |
# from these tiles. The result is a list of PyTorch tensors, but now one | |
# for each frame (rather than each tile). | |
num_frames = 1 | |
frame_size = (img.shape[1], img.shape[0]) | |
detections = self._untile_detections(num_frames, frame_size, detections) | |
# The same face may have been detected in multiple tiles, so filter out | |
# overlapping detections. This is done separately for each frame. | |
detections = self.facedet.nms(detections) | |
# Crop the faces out of the original frame. | |
frameref_detections = self._add_margin_to_detections(detections[0], frame_size, 0.2) | |
faces = self._crop_faces(img, frameref_detections) | |
kpts = self._crop_kpts(img, detections[0], 0.3) | |
# Add additional information about the frame and detections. | |
scores = list(detections[0][:, 16].cpu().numpy()) | |
frame_dict = {"frame_w": frame_size[0], | |
"frame_h": frame_size[1], | |
"faces": faces, | |
"kpts": kpts, | |
"detections": frameref_detections.cpu().numpy(), | |
"scores": scores, | |
} | |
# Sort faces by descending confidence | |
frame_dict = self._soft_faces_by_descending_score(frame_dict) | |
return frame_dict | |
def _soft_faces_by_descending_score(self, frame_dict: dict) -> dict: | |
if len(frame_dict['scores']) > 1: | |
sort_idxs = np.argsort(frame_dict['scores'])[::-1] | |
new_faces = [frame_dict['faces'][i] for i in sort_idxs] | |
new_kpts = [frame_dict['kpts'][i] for i in sort_idxs] | |
new_detections = frame_dict['detections'][sort_idxs] | |
new_scores = [frame_dict['scores'][i] for i in sort_idxs] | |
frame_dict['faces'] = new_faces | |
frame_dict['kpts'] = new_kpts | |
frame_dict['detections'] = new_detections | |
frame_dict['scores'] = new_scores | |
return frame_dict | |
def process_videos(self, input_dir, filenames, video_idxs) -> List[dict]: | |
"""For the specified selection of videos, grabs one or more frames | |
from each video, runs the face detector, and tries to find the faces | |
in each frame. | |
The frames are split into tiles, and the tiles from the different videos | |
are concatenated into a single batch. This means the face detector gets | |
a batch of size len(video_idxs) * num_frames * num_tiles (usually 3). | |
Arguments: | |
input_dir: base folder where the video files are stored | |
filenames: list of all video files in the input_dir | |
video_idxs: one or more indices from the filenames list; these | |
are the videos we'll actually process | |
Returns a list of dictionaries, one for each frame read from each video. | |
This dictionary contains: | |
- video_idx: the video this frame was taken from | |
- frame_idx: the index of the frame in the video | |
- frame_w, frame_h: original dimensions of the frame | |
- faces: a list containing zero or more NumPy arrays with a face crop | |
- scores: a list array with the confidence score for each face crop | |
If reading a video failed for some reason, it will not appear in the | |
output array. Note that there's no guarantee a given video will actually | |
have num_frames results (as soon as a reading problem is encountered for | |
a video, we continue with the next video). | |
""" | |
target_size = self.facedet.input_size | |
videos_read = [] | |
frames_read = [] | |
frames = [] | |
tiles = [] | |
resize_info = [] | |
for video_idx in video_idxs: | |
# Read the full-size frames from this video. | |
filename = filenames[video_idx] | |
video_path = os.path.join(input_dir, filename) | |
result = self.video_read_fn(video_path) | |
# Error? Then skip this video. | |
if result is None: continue | |
videos_read.append(video_idx) | |
# Keep track of the original frames (need them later). | |
my_frames, my_idxs = result | |
frames.append(my_frames) | |
frames_read.append(my_idxs) | |
# Split the frames into several tiles. Resize the tiles to 128x128. | |
my_tiles, my_resize_info = self._tile_frames(my_frames, target_size) | |
tiles.append(my_tiles) | |
resize_info.append(my_resize_info) | |
if len(tiles) == 0: | |
return [] | |
# Put all the tiles for all the frames from all the videos into | |
# a single batch. | |
batch = np.concatenate(tiles) | |
# Run the face detector. The result is a list of PyTorch tensors, | |
# one for each image in the batch. | |
all_detections = self.facedet.predict_on_batch(batch, apply_nms=False) | |
result = [] | |
offs = 0 | |
for v in range(len(tiles)): | |
# Not all videos may have the same number of tiles, so find which | |
# detections go with which video. | |
num_tiles = tiles[v].shape[0] | |
detections = all_detections[offs:offs + num_tiles] | |
offs += num_tiles | |
# Convert the detections from 128x128 back to the original frame size. | |
detections = self._resize_detections(detections, target_size, resize_info[v]) | |
# Because we have several tiles for each frame, combine the predictions | |
# from these tiles. The result is a list of PyTorch tensors, but now one | |
# for each frame (rather than each tile). | |
num_frames = frames[v].shape[0] | |
frame_size = (frames[v].shape[2], frames[v].shape[1]) | |
detections = self._untile_detections(num_frames, frame_size, detections) | |
# The same face may have been detected in multiple tiles, so filter out | |
# overlapping detections. This is done separately for each frame. | |
detections = self.facedet.nms(detections) | |
for i in range(len(detections)): | |
# Crop the faces out of the original frame. | |
frameref_detections = self._add_margin_to_detections(detections[i], frame_size, 0.2) | |
faces = self._crop_faces(frames[v][i], frameref_detections) | |
kpts = self._crop_kpts(frames[v][i], detections[i], 0.3) | |
# Add additional information about the frame and detections. | |
scores = list(detections[i][:, 16].cpu().numpy()) | |
frame_dict = {"video_idx": videos_read[v], | |
"frame_idx": frames_read[v][i], | |
"frame_w": frame_size[0], | |
"frame_h": frame_size[1], | |
"frame": frames[v][i], | |
"faces": faces, | |
"kpts": kpts, | |
"detections": frameref_detections.cpu().numpy(), | |
"scores": scores, | |
} | |
# Sort faces by descending confidence | |
frame_dict = self._soft_faces_by_descending_score(frame_dict) | |
result.append(frame_dict) | |
return result | |
def process_video(self, video_path): | |
"""Convenience method for doing face extraction on a single video.""" | |
input_dir = os.path.dirname(video_path) | |
filenames = [os.path.basename(video_path)] | |
return self.process_videos(input_dir, filenames, [0]) | |
def _tile_frames(self, frames: np.ndarray, target_size: Tuple[int, int]) -> (np.ndarray, List[float]): | |
"""Splits each frame into several smaller, partially overlapping tiles | |
and resizes each tile to target_size. | |
After a bunch of experimentation, I found that for a 1920x1080 video, | |
BlazeFace works better on three 1080x1080 windows. These overlap by 420 | |
pixels. (Two windows also work but it's best to have a clean center crop | |
in there as well.) | |
I also tried 6 windows of size 720x720 (horizontally: 720|360, 360|720; | |
vertically: 720|1200, 480|720|480, 1200|720) but that gives many false | |
positives when a window has no face in it. | |
For a video in portrait orientation (1080x1920), we only take a single | |
crop of the top-most 1080 pixels. If we split up the video vertically, | |
then we might get false positives again. | |
(NOTE: Not all videos are necessarily 1080p but the code can handle this.) | |
Arguments: | |
frames: NumPy array of shape (num_frames, height, width, 3) | |
target_size: (width, height) | |
Returns: | |
- a new (num_frames * N, target_size[1], target_size[0], 3) array | |
where N is the number of tiles used. | |
- a list [scale_w, scale_h, offset_x, offset_y] that describes how | |
to map the resized and cropped tiles back to the original image | |
coordinates. This is needed for scaling up the face detections | |
from the smaller image to the original image, so we can take the | |
face crops in the original coordinate space. | |
""" | |
num_frames, H, W, _ = frames.shape | |
num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W) | |
splits = np.zeros((num_frames * num_v * num_h, target_size[1], target_size[0], 3), dtype=np.uint8) | |
i = 0 | |
for f in range(num_frames): | |
y = 0 | |
for v in range(num_v): | |
x = 0 | |
for h in range(num_h): | |
crop = frames[f, y:y + split_size, x:x + split_size, :] | |
splits[i] = cv2.resize(crop, target_size, interpolation=cv2.INTER_AREA) | |
x += x_step | |
i += 1 | |
y += y_step | |
resize_info = [split_size / target_size[0], split_size / target_size[1], 0, 0] | |
return splits, resize_info | |
def get_tiles_params(self, H, W): | |
split_size = min(H, W, 720) | |
x_step = (W - split_size) // 2 | |
y_step = (H - split_size) // 2 | |
num_v = (H - split_size) // y_step + 1 if y_step > 0 else 1 | |
num_h = (W - split_size) // x_step + 1 if x_step > 0 else 1 | |
return num_h, num_v, split_size, x_step, y_step | |
def _resize_detections(self, detections, target_size, resize_info): | |
"""Converts a list of face detections back to the original | |
coordinate system. | |
Arguments: | |
detections: a list containing PyTorch tensors of shape (num_faces, 17) | |
target_size: (width, height) | |
resize_info: [scale_w, scale_h, offset_x, offset_y] | |
""" | |
projected = [] | |
target_w, target_h = target_size | |
scale_w, scale_h, offset_x, offset_y = resize_info | |
for i in range(len(detections)): | |
detection = detections[i].clone() | |
# ymin, xmin, ymax, xmax | |
for k in range(2): | |
detection[:, k * 2] = (detection[:, k * 2] * target_h - offset_y) * scale_h | |
detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_w - offset_x) * scale_w | |
# keypoints are x,y | |
for k in range(2, 8): | |
detection[:, k * 2] = (detection[:, k * 2] * target_w - offset_x) * scale_w | |
detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_h - offset_y) * scale_h | |
projected.append(detection) | |
return projected | |
def _untile_detections(self, num_frames: int, frame_size: Tuple[int, int], detections: List[torch.Tensor]) -> List[ | |
torch.Tensor]: | |
"""With N tiles per frame, there also are N times as many detections. | |
This function groups together the detections for a given frame; it is | |
the complement to tile_frames(). | |
""" | |
combined_detections = [] | |
W, H = frame_size | |
num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W) | |
i = 0 | |
for f in range(num_frames): | |
detections_for_frame = [] | |
y = 0 | |
for v in range(num_v): | |
x = 0 | |
for h in range(num_h): | |
# Adjust the coordinates based on the split positions. | |
detection = detections[i].clone() | |
if detection.shape[0] > 0: | |
for k in range(2): | |
detection[:, k * 2] += y | |
detection[:, k * 2 + 1] += x | |
for k in range(2, 8): | |
detection[:, k * 2] += x | |
detection[:, k * 2 + 1] += y | |
detections_for_frame.append(detection) | |
x += x_step | |
i += 1 | |
y += y_step | |
combined_detections.append(torch.cat(detections_for_frame)) | |
return combined_detections | |
def _add_margin_to_detections(self, detections: torch.Tensor, frame_size: Tuple[int, int], | |
margin: float = 0.2) -> torch.Tensor: | |
"""Expands the face bounding box. | |
NOTE: The face detections often do not include the forehead, which | |
is why we use twice the margin for ymin. | |
Arguments: | |
detections: a PyTorch tensor of shape (num_detections, 17) | |
frame_size: maximum (width, height) | |
margin: a percentage of the bounding box's height | |
Returns a PyTorch tensor of shape (num_detections, 17). | |
""" | |
offset = torch.round(margin * (detections[:, 2] - detections[:, 0])) | |
detections = detections.clone() | |
detections[:, 0] = torch.clamp(detections[:, 0] - offset * 2, min=0) # ymin | |
detections[:, 1] = torch.clamp(detections[:, 1] - offset, min=0) # xmin | |
detections[:, 2] = torch.clamp(detections[:, 2] + offset, max=frame_size[1]) # ymax | |
detections[:, 3] = torch.clamp(detections[:, 3] + offset, max=frame_size[0]) # xmax | |
return detections | |
def _crop_faces(self, frame: np.ndarray, detections: torch.Tensor) -> List[np.ndarray]: | |
"""Copies the face region(s) from the given frame into a set | |
of new NumPy arrays. | |
Arguments: | |
frame: a NumPy array of shape (H, W, 3) | |
detections: a PyTorch tensor of shape (num_detections, 17) | |
Returns a list of NumPy arrays, one for each face crop. If there | |
are no faces detected for this frame, returns an empty list. | |
""" | |
faces = [] | |
for i in range(len(detections)): | |
ymin, xmin, ymax, xmax = detections[i, :4].cpu().numpy().astype(np.int64) | |
face = frame[ymin:ymax, xmin:xmax, :] | |
faces.append(face) | |
return faces | |
def _crop_kpts(self, frame: np.ndarray, detections: torch.Tensor, face_fraction: float): | |
"""Copies the parts region(s) from the given frame into a set | |
of new NumPy arrays. | |
Arguments: | |
frame: a NumPy array of shape (H, W, 3) | |
detections: a PyTorch tensor of shape (num_detections, 17) | |
face_fraction: float between 0 and 1 indicating how big are the parts to be extracted w.r.t the whole face | |
Returns a list of NumPy arrays, one for each face crop. If there | |
are no faces detected for this frame, returns an empty list. | |
""" | |
faces = [] | |
for i in range(len(detections)): | |
kpts = [] | |
size = int(face_fraction * min(detections[i, 2] - detections[i, 0], detections[i, 3] - detections[i, 1])) | |
kpts_coords = detections[i, 4:16].cpu().numpy().astype(np.int64) | |
for kpidx in range(6): | |
kpx, kpy = kpts_coords[kpidx * 2:kpidx * 2 + 2] | |
kpt = frame[kpy - size // 2:kpy - size // 2 + size, kpx - size // 2:kpx - size // 2 + size, ] | |
kpts.append(kpt) | |
faces.append(kpts) | |
return faces | |
def remove_large_crops(self, crops, pct=0.1): | |
"""Removes faces from the results if they take up more than X% | |
of the video. Such a face is likely a false positive. | |
This is an optional postprocessing step. Modifies the original | |
data structure. | |
Arguments: | |
crops: a list of dictionaries with face crop data | |
pct: maximum portion of the frame a crop may take up | |
""" | |
for i in range(len(crops)): | |
frame_data = crops[i] | |
video_area = frame_data["frame_w"] * frame_data["frame_h"] | |
faces = frame_data["faces"] | |
scores = frame_data["scores"] | |
new_faces = [] | |
new_scores = [] | |
for j in range(len(faces)): | |
face = faces[j] | |
face_H, face_W, _ = face.shape | |
face_area = face_H * face_W | |
if face_area / video_area < 0.1: | |
new_faces.append(face) | |
new_scores.append(scores[j]) | |
frame_data["faces"] = new_faces | |
frame_data["scores"] = new_scores | |
def keep_only_best_face(self, crops): | |
"""For each frame, only keeps the face with the highest confidence. | |
This gets rid of false positives, but obviously is problematic for | |
videos with two people! | |
This is an optional postprocessing step. Modifies the original | |
data structure. | |
""" | |
for i in range(len(crops)): | |
frame_data = crops[i] | |
if len(frame_data["faces"]) > 0: | |
frame_data["faces"] = frame_data["faces"][:1] | |
frame_data["scores"] = frame_data["scores"][:1] | |
# TODO: def filter_likely_false_positives(self, crops): | |
# if only some frames have more than 1 face, it's likely a false positive | |
# if most frames have more than 1 face, it's probably two people | |
# so find the % of frames with > 1 face; if > 0.X, keep the two best faces | |
# TODO: def filter_by_score(self, crops, min_score) to remove any | |
# crops with a confidence score lower than min_score | |
# TODO: def sort_by_histogram(self, crops) for videos with 2 people. | |