File size: 3,451 Bytes
5f3320a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dd1eaa
 
 
5f3320a
 
 
 
9dd1eaa
 
5f3320a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import random
from typing import *
import numpy as np
import tensorflow as tf
import cv2
from pathlib import Path

SPLIT_RATIO = 0.7
BATCH_SIZE = 8
NUM_FRAMES = 8

def main():
    data_dir = Path('assets/dataset')
    output_signature = (
        tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
        tf.TensorSpec(shape = (), dtype = tf.int16),
    )
    train_ds = tf.data.Dataset.from_generator(frame_generator(data_dir, NUM_FRAMES, 'training'), output_signature=output_signature)
    train_ds = train_ds.batch(BATCH_SIZE)


def format_frames(frame, output_size):
    """
      Pad and resize an image from a video.

      Args:
        frame: Image that needs to resized and padded.
        output_size: Pixel size of the output frame image.

      Return:
        Formatted frame with padding of specified output size.
    """
    frame = tf.image.convert_image_dtype(frame, tf.float32)
    frame = tf.image.resize_with_pad(frame, *output_size)
    return frame


def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15):
    """
      Creates frames from each video file present for each category.

      Args:
        video_path: File path to the video.
        n_frames: Number of frames to be created per video file.
        output_size: Pixel size of the output frame image.

      Return:
        An NumPy array of frames in the shape of (n_frames, height, width, channels).
    """
    # Read each video frame by frame
    result = []
    src = cv2.VideoCapture(str(video_path))

    video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

    need_length = 1 + (n_frames - 1) * frame_step

    if need_length > video_length:
        start = 0
    else:
        max_start = video_length - need_length
        start = random.randint(0, max_start + 1)

    src.set(cv2.CAP_PROP_POS_FRAMES, start)
    # ret is a boolean indicating whether read was successful, frame is the image itself
    ok, frame = src.read()
    if not ok:
        raise ValueError('read video not success')
    result.append(format_frames(frame, output_size))

    for _ in range(n_frames - 1):
        for _ in range(frame_step):
            ok, frame = src.read()
        if ok:
            frame = format_frames(frame, output_size)
            result.append(frame)
        else:
            result.append(np.zeros_like(result[0]))
    src.release()
    result = np.array(result)[..., [2, 1, 0]]

    return result

def frame_generator(data_dir: Path, n_frames: int, split: Literal['training', 'validation']):
    class_names = sorted([x.name for x in data_dir.iterdir()])
    class_ids_for_name = {
        name: i
        for i, name in enumerate(class_names)
    }
    data = {
        'training':{
            a.name: (lambda ps: ps[:int(len(ps) * SPLIT_RATIO)])([x for x in a.iterdir()])
            for a in data_dir.iterdir()
        },
        'validation': {
            a.name: (lambda ps: ps[int(len(ps) * SPLIT_RATIO):])([x for x in a.iterdir()])
            for a in data_dir.iterdir()
        },
    }

    def generator():
        pairs = [
            (path, name)
            for name, paths in data[split].items()
            for path in paths
        ]
        random.shuffle(pairs)
        for path, name in pairs:
            video_frames = frames_from_video_file(path, n_frames)
            label = class_ids_for_name[name]  # Encode labels
            yield video_frames, label
    return generator