File size: 3,774 Bytes
16c8067
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
import platform
import subprocess
from pathlib import Path

import cv2
import numpy as np
from loguru import logger
from tqdm import tqdm

import face_detection
from nota_wav2lip.util import FFMPEG_LOGGING_MODE

detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, flip_input=False, device='cpu')
PADDING = [0, 10, 0, 0]


def get_smoothened_boxes(boxes, T):
    for i in range(len(boxes)):
        window = boxes[len(boxes) - T:] if i + T > len(boxes) else boxes[i:i + T]
        boxes[i] = np.mean(window, axis=0)
    return boxes


def face_detect(images, pads, no_smooth=False, batch_size=1):

    predictions = []
    images_array = [cv2.imread(str(image)) for image in images]
    for i in tqdm(range(0, len(images_array), batch_size)):
        predictions.extend(detector.get_detections_for_batch(np.array(images_array[i:i + batch_size])))

    results = []
    pady1, pady2, padx1, padx2 = pads
    for rect, image_array in zip(predictions, images_array):
        if rect is None:
            cv2.imwrite('temp/faulty_frame.jpg', image_array)  # check this frame where the face was not detected.
            raise ValueError('Face not detected! Ensure the video contains a face in all the frames.')

        y1 = max(0, rect[1] - pady1)
        y2 = min(image_array.shape[0], rect[3] + pady2)
        x1 = max(0, rect[0] - padx1)
        x2 = min(image_array.shape[1], rect[2] + padx2)
        results.append([x1, y1, x2, y2])

    boxes = np.array(results)
    bbox_format = "(y1, y2, x1, x2)"
    if not no_smooth:
        boxes = get_smoothened_boxes(boxes, T=5)
    outputs = {
        'bbox': {str(image_path): tuple(map(int, (y1, y2, x1, x2))) for image_path, (x1, y1, x2, y2) in zip(images, boxes)},
        'format': bbox_format
    }
    return outputs


def save_video_frame(video_path, output_dir=None):
    video_path = Path(video_path)
    output_dir = output_dir if output_dir is not None else video_path.with_suffix('')
    output_dir.mkdir(exist_ok=True)
    return subprocess.call(
        f"ffmpeg {FFMPEG_LOGGING_MODE['ERROR']} -y -i {video_path} -r 25 -f image2 {output_dir}/%05d.jpg",
        shell=platform.system() != 'Windows'
    )


def save_audio_file(video_path, output_path=None):
    video_path = Path(video_path)
    output_path = output_path if output_path is not None else video_path.with_suffix('.wav')
    subprocess.call(
        f"ffmpeg {FFMPEG_LOGGING_MODE['ERROR']} -y -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 {output_path}",
        shell=platform.system() != 'Windows'
    )


def save_bbox_file(video_path, bbox_dict, output_path=None):
    video_path = Path(video_path)
    output_path = output_path if output_path is not None else video_path.with_suffix('.json')

    with open(output_path, 'w') as f:
        json.dump(bbox_dict, f, indent=4)

def get_preprocessed_data(video_path: Path):
    video_path = Path(video_path)

    image_sequence_dir = video_path.with_suffix('')
    audio_path = video_path.with_suffix('.wav')
    face_bbox_json_path = video_path.with_suffix('.json')

    logger.info(f"Save 25 FPS video frames as image files ... will be saved at {video_path}")
    save_video_frame(video_path=video_path, output_dir=image_sequence_dir)

    logger.info(f"Save the audio as wav file ... will be saved at {audio_path}")
    save_audio_file(video_path=video_path, output_path=audio_path)  # bonus

    # Load images, extract bboxes and save the coords(to directly use as array indicies)
    logger.info(f"Extract face boxes and save the coords with json format ... will be saved at {face_bbox_json_path}")
    results = face_detect(sorted(image_sequence_dir.glob("*.jpg")), pads=PADDING)
    save_bbox_file(video_path, results, output_path=face_bbox_json_path)