Spaces:
Sleeping
Sleeping
import random | |
from typing import * | |
import numpy as np | |
import tensorflow as tf | |
import cv2 | |
from pathlib import Path | |
SPLIT_RATIO = 0.7 | |
BATCH_SIZE = 8 | |
NUM_FRAMES = 8 | |
def main(): | |
data_dir = Path('assets/dataset') | |
output_signature = ( | |
tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32), | |
tf.TensorSpec(shape = (), dtype = tf.int16), | |
) | |
train_ds = tf.data.Dataset.from_generator(frame_generator(data_dir, NUM_FRAMES, 'training'), output_signature=output_signature) | |
train_ds = train_ds.batch(BATCH_SIZE) | |
def format_frames(frame, output_size): | |
""" | |
Pad and resize an image from a video. | |
Args: | |
frame: Image that needs to resized and padded. | |
output_size: Pixel size of the output frame image. | |
Return: | |
Formatted frame with padding of specified output size. | |
""" | |
frame = tf.image.convert_image_dtype(frame, tf.float32) | |
frame = tf.image.resize_with_pad(frame, *output_size) | |
return frame | |
def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15): | |
""" | |
Creates frames from each video file present for each category. | |
Args: | |
video_path: File path to the video. | |
n_frames: Number of frames to be created per video file. | |
output_size: Pixel size of the output frame image. | |
Return: | |
An NumPy array of frames in the shape of (n_frames, height, width, channels). | |
""" | |
# Read each video frame by frame | |
result = [] | |
src = cv2.VideoCapture(str(video_path)) | |
video_length = src.get(cv2.CAP_PROP_FRAME_COUNT) | |
need_length = 1 + (n_frames - 1) * frame_step | |
if need_length > video_length: | |
start = 0 | |
else: | |
max_start = video_length - need_length | |
start = random.randint(0, max_start + 1) | |
src.set(cv2.CAP_PROP_POS_FRAMES, start) | |
# ret is a boolean indicating whether read was successful, frame is the image itself | |
ok, frame = src.read() | |
if not ok: | |
raise ValueError('read video not success') | |
result.append(format_frames(frame, output_size)) | |
for _ in range(n_frames - 1): | |
for _ in range(frame_step): | |
ok, frame = src.read() | |
if ok: | |
frame = format_frames(frame, output_size) | |
result.append(frame) | |
else: | |
result.append(np.zeros_like(result[0])) | |
src.release() | |
result = np.array(result)[..., [2, 1, 0]] | |
return result | |
def frame_generator(data_dir: Path, n_frames: int, split: Literal['training', 'validation']): | |
class_names = sorted([x.name for x in data_dir.iterdir()]) | |
class_ids_for_name = { | |
name: i | |
for i, name in enumerate(class_names) | |
} | |
data = { | |
'training':{ | |
a.name: (lambda ps: ps[:int(len(ps) * SPLIT_RATIO)])([x for x in a.iterdir()]) | |
for a in data_dir.iterdir() | |
}, | |
'validation': { | |
a.name: (lambda ps: ps[int(len(ps) * SPLIT_RATIO):])([x for x in a.iterdir()]) | |
for a in data_dir.iterdir() | |
}, | |
} | |
def generator(): | |
pairs = [ | |
(path, name) | |
for name, paths in data[split].items() | |
for path in paths | |
] | |
random.shuffle(pairs) | |
for path, name in pairs: | |
video_frames = frames_from_video_file(path, n_frames) | |
label = class_ids_for_name[name] # Encode labels | |
yield video_frames, label | |
return generator | |