Mohamed Almukhtar
Duplicate from malmukhtar/ImageDetection
fc3814c
import os
from typing import Tuple, List
import cv2
import numpy as np
import torch
from PIL import Image
from blazeface import BlazeFace
class FaceExtractor:
"""Wrapper for face extraction workflow."""
def __init__(self, video_read_fn = None, facedet: BlazeFace = None):
"""Creates a new FaceExtractor.
Arguments:
video_read_fn: a function that takes in a path to a video file
and returns a tuple consisting of a NumPy array with shape
(num_frames, H, W, 3) and a list of frame indices, or None
in case of an error
facedet: the face detector object
"""
self.video_read_fn = video_read_fn
self.facedet = facedet
def process_image(self, path: str = None, img: Image.Image or np.ndarray = None) -> dict:
"""
Process a single image
:param path: Path to the image
:param img: image
:return:
"""
if img is not None and path is not None:
raise ValueError('Only one argument between path and img can be specified')
if img is None and path is None:
raise ValueError('At least one argument between path and img must be specified')
target_size = self.facedet.input_size
if img is None:
img = np.asarray(Image.open(str(path)))
else:
img = np.asarray(img)
# Split the frames into several tiles. Resize the tiles to 128x128.
tiles, resize_info = self._tile_frames(np.expand_dims(img, 0), target_size)
# tiles has shape (num_tiles, target_size, target_size, 3)
# resize_info is a list of four elements [resize_factor_y, resize_factor_x, 0, 0]
# Run the face detector. The result is a list of PyTorch tensors,
# one for each tile in the batch.
detections = self.facedet.predict_on_batch(tiles, apply_nms=False)
# Convert the detections from 128x128 back to the original frame size.
detections = self._resize_detections(detections, target_size, resize_info)
# Because we have several tiles for each frame, combine the predictions
# from these tiles. The result is a list of PyTorch tensors, but now one
# for each frame (rather than each tile).
num_frames = 1
frame_size = (img.shape[1], img.shape[0])
detections = self._untile_detections(num_frames, frame_size, detections)
# The same face may have been detected in multiple tiles, so filter out
# overlapping detections. This is done separately for each frame.
detections = self.facedet.nms(detections)
# Crop the faces out of the original frame.
frameref_detections = self._add_margin_to_detections(detections[0], frame_size, 0.2)
faces = self._crop_faces(img, frameref_detections)
kpts = self._crop_kpts(img, detections[0], 0.3)
# Add additional information about the frame and detections.
scores = list(detections[0][:, 16].cpu().numpy())
frame_dict = {"frame_w": frame_size[0],
"frame_h": frame_size[1],
"faces": faces,
"kpts": kpts,
"detections": frameref_detections.cpu().numpy(),
"scores": scores,
}
# Sort faces by descending confidence
frame_dict = self._soft_faces_by_descending_score(frame_dict)
return frame_dict
def _soft_faces_by_descending_score(self, frame_dict: dict) -> dict:
if len(frame_dict['scores']) > 1:
sort_idxs = np.argsort(frame_dict['scores'])[::-1]
new_faces = [frame_dict['faces'][i] for i in sort_idxs]
new_kpts = [frame_dict['kpts'][i] for i in sort_idxs]
new_detections = frame_dict['detections'][sort_idxs]
new_scores = [frame_dict['scores'][i] for i in sort_idxs]
frame_dict['faces'] = new_faces
frame_dict['kpts'] = new_kpts
frame_dict['detections'] = new_detections
frame_dict['scores'] = new_scores
return frame_dict
def process_videos(self, input_dir, filenames, video_idxs) -> List[dict]:
"""For the specified selection of videos, grabs one or more frames
from each video, runs the face detector, and tries to find the faces
in each frame.
The frames are split into tiles, and the tiles from the different videos
are concatenated into a single batch. This means the face detector gets
a batch of size len(video_idxs) * num_frames * num_tiles (usually 3).
Arguments:
input_dir: base folder where the video files are stored
filenames: list of all video files in the input_dir
video_idxs: one or more indices from the filenames list; these
are the videos we'll actually process
Returns a list of dictionaries, one for each frame read from each video.
This dictionary contains:
- video_idx: the video this frame was taken from
- frame_idx: the index of the frame in the video
- frame_w, frame_h: original dimensions of the frame
- faces: a list containing zero or more NumPy arrays with a face crop
- scores: a list array with the confidence score for each face crop
If reading a video failed for some reason, it will not appear in the
output array. Note that there's no guarantee a given video will actually
have num_frames results (as soon as a reading problem is encountered for
a video, we continue with the next video).
"""
target_size = self.facedet.input_size
videos_read = []
frames_read = []
frames = []
tiles = []
resize_info = []
for video_idx in video_idxs:
# Read the full-size frames from this video.
filename = filenames[video_idx]
video_path = os.path.join(input_dir, filename)
result = self.video_read_fn(video_path)
# Error? Then skip this video.
if result is None: continue
videos_read.append(video_idx)
# Keep track of the original frames (need them later).
my_frames, my_idxs = result
frames.append(my_frames)
frames_read.append(my_idxs)
# Split the frames into several tiles. Resize the tiles to 128x128.
my_tiles, my_resize_info = self._tile_frames(my_frames, target_size)
tiles.append(my_tiles)
resize_info.append(my_resize_info)
if len(tiles) == 0:
return []
# Put all the tiles for all the frames from all the videos into
# a single batch.
batch = np.concatenate(tiles)
# Run the face detector. The result is a list of PyTorch tensors,
# one for each image in the batch.
all_detections = self.facedet.predict_on_batch(batch, apply_nms=False)
result = []
offs = 0
for v in range(len(tiles)):
# Not all videos may have the same number of tiles, so find which
# detections go with which video.
num_tiles = tiles[v].shape[0]
detections = all_detections[offs:offs + num_tiles]
offs += num_tiles
# Convert the detections from 128x128 back to the original frame size.
detections = self._resize_detections(detections, target_size, resize_info[v])
# Because we have several tiles for each frame, combine the predictions
# from these tiles. The result is a list of PyTorch tensors, but now one
# for each frame (rather than each tile).
num_frames = frames[v].shape[0]
frame_size = (frames[v].shape[2], frames[v].shape[1])
detections = self._untile_detections(num_frames, frame_size, detections)
# The same face may have been detected in multiple tiles, so filter out
# overlapping detections. This is done separately for each frame.
detections = self.facedet.nms(detections)
for i in range(len(detections)):
# Crop the faces out of the original frame.
frameref_detections = self._add_margin_to_detections(detections[i], frame_size, 0.2)
faces = self._crop_faces(frames[v][i], frameref_detections)
kpts = self._crop_kpts(frames[v][i], detections[i], 0.3)
# Add additional information about the frame and detections.
scores = list(detections[i][:, 16].cpu().numpy())
frame_dict = {"video_idx": videos_read[v],
"frame_idx": frames_read[v][i],
"frame_w": frame_size[0],
"frame_h": frame_size[1],
"frame": frames[v][i],
"faces": faces,
"kpts": kpts,
"detections": frameref_detections.cpu().numpy(),
"scores": scores,
}
# Sort faces by descending confidence
frame_dict = self._soft_faces_by_descending_score(frame_dict)
result.append(frame_dict)
return result
def process_video(self, video_path):
"""Convenience method for doing face extraction on a single video."""
input_dir = os.path.dirname(video_path)
filenames = [os.path.basename(video_path)]
return self.process_videos(input_dir, filenames, [0])
def _tile_frames(self, frames: np.ndarray, target_size: Tuple[int, int]) -> (np.ndarray, List[float]):
"""Splits each frame into several smaller, partially overlapping tiles
and resizes each tile to target_size.
After a bunch of experimentation, I found that for a 1920x1080 video,
BlazeFace works better on three 1080x1080 windows. These overlap by 420
pixels. (Two windows also work but it's best to have a clean center crop
in there as well.)
I also tried 6 windows of size 720x720 (horizontally: 720|360, 360|720;
vertically: 720|1200, 480|720|480, 1200|720) but that gives many false
positives when a window has no face in it.
For a video in portrait orientation (1080x1920), we only take a single
crop of the top-most 1080 pixels. If we split up the video vertically,
then we might get false positives again.
(NOTE: Not all videos are necessarily 1080p but the code can handle this.)
Arguments:
frames: NumPy array of shape (num_frames, height, width, 3)
target_size: (width, height)
Returns:
- a new (num_frames * N, target_size[1], target_size[0], 3) array
where N is the number of tiles used.
- a list [scale_w, scale_h, offset_x, offset_y] that describes how
to map the resized and cropped tiles back to the original image
coordinates. This is needed for scaling up the face detections
from the smaller image to the original image, so we can take the
face crops in the original coordinate space.
"""
num_frames, H, W, _ = frames.shape
num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W)
splits = np.zeros((num_frames * num_v * num_h, target_size[1], target_size[0], 3), dtype=np.uint8)
i = 0
for f in range(num_frames):
y = 0
for v in range(num_v):
x = 0
for h in range(num_h):
crop = frames[f, y:y + split_size, x:x + split_size, :]
splits[i] = cv2.resize(crop, target_size, interpolation=cv2.INTER_AREA)
x += x_step
i += 1
y += y_step
resize_info = [split_size / target_size[0], split_size / target_size[1], 0, 0]
return splits, resize_info
def get_tiles_params(self, H, W):
split_size = min(H, W, 720)
x_step = (W - split_size) // 2
y_step = (H - split_size) // 2
num_v = (H - split_size) // y_step + 1 if y_step > 0 else 1
num_h = (W - split_size) // x_step + 1 if x_step > 0 else 1
return num_h, num_v, split_size, x_step, y_step
def _resize_detections(self, detections, target_size, resize_info):
"""Converts a list of face detections back to the original
coordinate system.
Arguments:
detections: a list containing PyTorch tensors of shape (num_faces, 17)
target_size: (width, height)
resize_info: [scale_w, scale_h, offset_x, offset_y]
"""
projected = []
target_w, target_h = target_size
scale_w, scale_h, offset_x, offset_y = resize_info
for i in range(len(detections)):
detection = detections[i].clone()
# ymin, xmin, ymax, xmax
for k in range(2):
detection[:, k * 2] = (detection[:, k * 2] * target_h - offset_y) * scale_h
detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_w - offset_x) * scale_w
# keypoints are x,y
for k in range(2, 8):
detection[:, k * 2] = (detection[:, k * 2] * target_w - offset_x) * scale_w
detection[:, k * 2 + 1] = (detection[:, k * 2 + 1] * target_h - offset_y) * scale_h
projected.append(detection)
return projected
def _untile_detections(self, num_frames: int, frame_size: Tuple[int, int], detections: List[torch.Tensor]) -> List[
torch.Tensor]:
"""With N tiles per frame, there also are N times as many detections.
This function groups together the detections for a given frame; it is
the complement to tile_frames().
"""
combined_detections = []
W, H = frame_size
num_h, num_v, split_size, x_step, y_step = self.get_tiles_params(H, W)
i = 0
for f in range(num_frames):
detections_for_frame = []
y = 0
for v in range(num_v):
x = 0
for h in range(num_h):
# Adjust the coordinates based on the split positions.
detection = detections[i].clone()
if detection.shape[0] > 0:
for k in range(2):
detection[:, k * 2] += y
detection[:, k * 2 + 1] += x
for k in range(2, 8):
detection[:, k * 2] += x
detection[:, k * 2 + 1] += y
detections_for_frame.append(detection)
x += x_step
i += 1
y += y_step
combined_detections.append(torch.cat(detections_for_frame))
return combined_detections
def _add_margin_to_detections(self, detections: torch.Tensor, frame_size: Tuple[int, int],
margin: float = 0.2) -> torch.Tensor:
"""Expands the face bounding box.
NOTE: The face detections often do not include the forehead, which
is why we use twice the margin for ymin.
Arguments:
detections: a PyTorch tensor of shape (num_detections, 17)
frame_size: maximum (width, height)
margin: a percentage of the bounding box's height
Returns a PyTorch tensor of shape (num_detections, 17).
"""
offset = torch.round(margin * (detections[:, 2] - detections[:, 0]))
detections = detections.clone()
detections[:, 0] = torch.clamp(detections[:, 0] - offset * 2, min=0) # ymin
detections[:, 1] = torch.clamp(detections[:, 1] - offset, min=0) # xmin
detections[:, 2] = torch.clamp(detections[:, 2] + offset, max=frame_size[1]) # ymax
detections[:, 3] = torch.clamp(detections[:, 3] + offset, max=frame_size[0]) # xmax
return detections
def _crop_faces(self, frame: np.ndarray, detections: torch.Tensor) -> List[np.ndarray]:
"""Copies the face region(s) from the given frame into a set
of new NumPy arrays.
Arguments:
frame: a NumPy array of shape (H, W, 3)
detections: a PyTorch tensor of shape (num_detections, 17)
Returns a list of NumPy arrays, one for each face crop. If there
are no faces detected for this frame, returns an empty list.
"""
faces = []
for i in range(len(detections)):
ymin, xmin, ymax, xmax = detections[i, :4].cpu().numpy().astype(np.int64)
face = frame[ymin:ymax, xmin:xmax, :]
faces.append(face)
return faces
def _crop_kpts(self, frame: np.ndarray, detections: torch.Tensor, face_fraction: float):
"""Copies the parts region(s) from the given frame into a set
of new NumPy arrays.
Arguments:
frame: a NumPy array of shape (H, W, 3)
detections: a PyTorch tensor of shape (num_detections, 17)
face_fraction: float between 0 and 1 indicating how big are the parts to be extracted w.r.t the whole face
Returns a list of NumPy arrays, one for each face crop. If there
are no faces detected for this frame, returns an empty list.
"""
faces = []
for i in range(len(detections)):
kpts = []
size = int(face_fraction * min(detections[i, 2] - detections[i, 0], detections[i, 3] - detections[i, 1]))
kpts_coords = detections[i, 4:16].cpu().numpy().astype(np.int64)
for kpidx in range(6):
kpx, kpy = kpts_coords[kpidx * 2:kpidx * 2 + 2]
kpt = frame[kpy - size // 2:kpy - size // 2 + size, kpx - size // 2:kpx - size // 2 + size, ]
kpts.append(kpt)
faces.append(kpts)
return faces
def remove_large_crops(self, crops, pct=0.1):
"""Removes faces from the results if they take up more than X%
of the video. Such a face is likely a false positive.
This is an optional postprocessing step. Modifies the original
data structure.
Arguments:
crops: a list of dictionaries with face crop data
pct: maximum portion of the frame a crop may take up
"""
for i in range(len(crops)):
frame_data = crops[i]
video_area = frame_data["frame_w"] * frame_data["frame_h"]
faces = frame_data["faces"]
scores = frame_data["scores"]
new_faces = []
new_scores = []
for j in range(len(faces)):
face = faces[j]
face_H, face_W, _ = face.shape
face_area = face_H * face_W
if face_area / video_area < 0.1:
new_faces.append(face)
new_scores.append(scores[j])
frame_data["faces"] = new_faces
frame_data["scores"] = new_scores
def keep_only_best_face(self, crops):
"""For each frame, only keeps the face with the highest confidence.
This gets rid of false positives, but obviously is problematic for
videos with two people!
This is an optional postprocessing step. Modifies the original
data structure.
"""
for i in range(len(crops)):
frame_data = crops[i]
if len(frame_data["faces"]) > 0:
frame_data["faces"] = frame_data["faces"][:1]
frame_data["scores"] = frame_data["scores"][:1]
# TODO: def filter_likely_false_positives(self, crops):
# if only some frames have more than 1 face, it's likely a false positive
# if most frames have more than 1 face, it's probably two people
# so find the % of frames with > 1 face; if > 0.X, keep the two best faces
# TODO: def filter_by_score(self, crops, min_score) to remove any
# crops with a confidence score lower than min_score
# TODO: def sort_by_histogram(self, crops) for videos with 2 people.