import random from typing import * import numpy as np import tensorflow as tf import cv2 from pathlib import Path SPLIT_RATIO = 0.7 BATCH_SIZE = 8 NUM_FRAMES = 8 def main(): data_dir = Path('assets/dataset') output_signature = ( tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32), tf.TensorSpec(shape = (), dtype = tf.int16), ) train_ds = tf.data.Dataset.from_generator(frame_generator(data_dir, NUM_FRAMES, 'training'), output_signature=output_signature) train_ds = train_ds.batch(BATCH_SIZE) def format_frames(frame, output_size): """ Pad and resize an image from a video. Args: frame: Image that needs to resized and padded. output_size: Pixel size of the output frame image. Return: Formatted frame with padding of specified output size. """ frame = tf.image.convert_image_dtype(frame, tf.float32) frame = tf.image.resize_with_pad(frame, *output_size) return frame def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15): """ Creates frames from each video file present for each category. Args: video_path: File path to the video. n_frames: Number of frames to be created per video file. output_size: Pixel size of the output frame image. Return: An NumPy array of frames in the shape of (n_frames, height, width, channels). """ # Read each video frame by frame result = [] src = cv2.VideoCapture(str(video_path)) video_length = src.get(cv2.CAP_PROP_FRAME_COUNT) need_length = 1 + (n_frames - 1) * frame_step if need_length > video_length: start = 0 else: max_start = video_length - need_length start = random.randint(0, max_start + 1) src.set(cv2.CAP_PROP_POS_FRAMES, start) # ret is a boolean indicating whether read was successful, frame is the image itself ok, frame = src.read() if not ok: raise ValueError('read video not success') result.append(format_frames(frame, output_size)) for _ in range(n_frames - 1): for _ in range(frame_step): ok, frame = src.read() if ok: frame = format_frames(frame, output_size) result.append(frame) else: result.append(np.zeros_like(result[0])) src.release() result = np.array(result)[..., [2, 1, 0]] return result def frame_generator(data_dir: Path, n_frames: int, split: Literal['training', 'validation']): class_names = sorted([x.name for x in data_dir.iterdir()]) class_ids_for_name = { name: i for i, name in enumerate(class_names) } data = { 'training':{ a.name: (lambda ps: ps[:int(len(ps) * SPLIT_RATIO)])([x for x in a.iterdir()]) for a in data_dir.iterdir() }, 'validation': { a.name: (lambda ps: ps[int(len(ps) * SPLIT_RATIO):])([x for x in a.iterdir()]) for a in data_dir.iterdir() }, } def generator(): pairs = [ (path, name) for name, paths in data[split].items() for path in paths ] random.shuffle(pairs) for path, name in pairs: video_frames = frames_from_video_file(path, n_frames) label = class_ids_for_name[name] # Encode labels yield video_frames, label return generator