File size: 4,252 Bytes
2b34e02
 
 
 
 
 
 
 
 
 
 
eae1cca
2b34e02
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eae1cca
2b34e02
 
48798aa
2b34e02
 
 
 
 
 
 
 
 
 
 
 
eae1cca
2b34e02
 
 
 
 
eae1cca
2b34e02
 
 
 
 
 
eae1cca
2b34e02
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eae1cca
2b34e02
eae1cca
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import face_detection
import numpy as np
import cv2
from tqdm import tqdm
import torch
import glob
import os
from natsort import natsorted

device = "cuda" if torch.cuda.is_available() else "cpu"


def get_squre_coords(coords, image, size=None, last_size=None):
    y1, y2, x1, x2 = coords
    w, h = x2 - x1, y2 - y1
    center = (x1 + w // 2, y1 + h // 2)
    if size is None:
        size = (w + h) // 2
    if last_size is not None:
        size = (w + h) // 2
        size = (size - last_size) // 5 + last_size
    x1, y1 = center[0] - size // 2, center[1] - size // 2
    x2, y2 = x1 + size, y1 + size
    return size, [y1, y2, x1, x2]


def get_smoothened_boxes(boxes, T):
    for i in range(len(boxes)):
        if i + T > len(boxes):
            window = boxes[len(boxes) - T :]
        else:
            window = boxes[i : i + T]
        boxes[i] = np.mean(window, axis=0)
    return boxes


def face_detect(images, pads):
    detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, flip_input=False, device=device)

    batch_size = 32 if device == "cuda" else 4
    print("face detect batch size:", batch_size)
    while 1:
        predictions = []
        try:
            for i in tqdm(range(0, len(images), batch_size)):
                predictions.extend(detector.get_detections_for_batch(np.array(images[i : i + batch_size])))
        except RuntimeError:
            if batch_size == 1:
                raise RuntimeError("Image too big to run face detection on GPU. Please use the --resize_factor argument")
            batch_size //= 2
            print("Recovering from OOM error; New batch size: {}".format(batch_size))
            continue
        break

    results = []
    pady1, pady2, padx1, padx2 = pads
    for rect, image in zip(predictions, images):
        if rect is None:
            cv2.imwrite(".temp/faulty_frame.jpg", image)  # check this frame where the face was not detected.
            raise ValueError("Face not detected! Ensure the video contains a face in all the frames.")

        y1 = max(0, rect[1] - pady1)
        y2 = min(image.shape[0], rect[3] + pady2)
        x1 = max(0, rect[0] - padx1)
        x2 = min(image.shape[1], rect[2] + padx2)
        # y_gap, x_gap = ((y2 - y1) * 2) // 3, ((x2 - x1) * 2) // 3
        y_gap, x_gap = (y2 - y1) // 2, (x2 - x1) // 2
        coords_ = [y1 - y_gap, y2 + y_gap, x1 - x_gap, x2 + x_gap]

        _, coords = get_squre_coords(coords_, image)

        y1, y2, x1, x2 = coords
        y1 = max(0, y1)
        y2 = min(image.shape[0], y2)
        x1 = max(0, x1)
        x2 = min(image.shape[1], x2)

        results.append([x1, y1, x2, y2])

    print("Number of frames cropped: {}".format(len(results)))
    print("First coords: {}".format(results[0]))
    boxes = np.array(results)
    boxes = get_smoothened_boxes(boxes, T=25)
    # results = [[image[y1:y2, x1:x2], (y1, y2, x1, x2)] for image, (x1, y1, x2, y2) in zip(images, boxes)]

    del detector
    return boxes


def add_black(imgs):
    for i in range(len(imgs)):
        imgs[i] = cv2.vconcat([np.zeros((100, imgs[i].shape[1], 3), dtype=np.uint8), imgs[i], np.zeros((20, imgs[i].shape[1], 3), dtype=np.uint8)])

    return imgs


def preprocess(video_dir="./assets/videos", save_dir="./assets/coords"):
    all_videos = natsorted(glob.glob(os.path.join(video_dir, "*.mp4")))
    for video_path in all_videos:
        video_stream = cv2.VideoCapture(video_path)

        # print('Reading video frames...')
        full_frames = []
        while 1:
            still_reading, frame = video_stream.read()
            if not still_reading:
                video_stream.release()
                break
            full_frames.append(frame)
        print("Number of frames available for inference: " + str(len(full_frames)))
        full_frames = add_black(full_frames)
        # print('Face detection running...')
        coords = face_detect(full_frames, pads=(0, 0, 0, 0))
        np.savez_compressed(os.path.join(save_dir, os.path.basename(video_path).split(".")[0]), coords=coords)


def load_from_npz(video_name, save_dir="./assets/coords"):
    npz = np.load(os.path.join(save_dir, video_name + ".npz"))
    return npz["coords"]


if __name__ == "__main__":
    preprocess()