aero-recognize / playgrounds /load_video.py
chiyoi's picture
working
9dd1eaa
raw
history blame
No virus
3.45 kB
import random
from typing import *
import numpy as np
import tensorflow as tf
import cv2
from pathlib import Path
SPLIT_RATIO = 0.7
BATCH_SIZE = 8
NUM_FRAMES = 8
def main():
data_dir = Path('assets/dataset')
output_signature = (
tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
tf.TensorSpec(shape = (), dtype = tf.int16),
)
train_ds = tf.data.Dataset.from_generator(frame_generator(data_dir, NUM_FRAMES, 'training'), output_signature=output_signature)
train_ds = train_ds.batch(BATCH_SIZE)
def format_frames(frame, output_size):
"""
Pad and resize an image from a video.
Args:
frame: Image that needs to resized and padded.
output_size: Pixel size of the output frame image.
Return:
Formatted frame with padding of specified output size.
"""
frame = tf.image.convert_image_dtype(frame, tf.float32)
frame = tf.image.resize_with_pad(frame, *output_size)
return frame
def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15):
"""
Creates frames from each video file present for each category.
Args:
video_path: File path to the video.
n_frames: Number of frames to be created per video file.
output_size: Pixel size of the output frame image.
Return:
An NumPy array of frames in the shape of (n_frames, height, width, channels).
"""
# Read each video frame by frame
result = []
src = cv2.VideoCapture(str(video_path))
video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)
need_length = 1 + (n_frames - 1) * frame_step
if need_length > video_length:
start = 0
else:
max_start = video_length - need_length
start = random.randint(0, max_start + 1)
src.set(cv2.CAP_PROP_POS_FRAMES, start)
# ret is a boolean indicating whether read was successful, frame is the image itself
ok, frame = src.read()
if not ok:
raise ValueError('read video not success')
result.append(format_frames(frame, output_size))
for _ in range(n_frames - 1):
for _ in range(frame_step):
ok, frame = src.read()
if ok:
frame = format_frames(frame, output_size)
result.append(frame)
else:
result.append(np.zeros_like(result[0]))
src.release()
result = np.array(result)[..., [2, 1, 0]]
return result
def frame_generator(data_dir: Path, n_frames: int, split: Literal['training', 'validation']):
class_names = sorted([x.name for x in data_dir.iterdir()])
class_ids_for_name = {
name: i
for i, name in enumerate(class_names)
}
data = {
'training':{
a.name: (lambda ps: ps[:int(len(ps) * SPLIT_RATIO)])([x for x in a.iterdir()])
for a in data_dir.iterdir()
},
'validation': {
a.name: (lambda ps: ps[int(len(ps) * SPLIT_RATIO):])([x for x in a.iterdir()])
for a in data_dir.iterdir()
},
}
def generator():
pairs = [
(path, name)
for name, paths in data[split].items()
for path in paths
]
random.shuffle(pairs)
for path, name in pairs:
video_frames = frames_from_video_file(path, n_frames)
label = class_ids_for_name[name] # Encode labels
yield video_frames, label
return generator