|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cv2 |
|
import numpy as np |
|
import argparse |
|
from enum import Enum, auto |
|
import time |
|
|
|
|
|
class FrameVis: |
|
""" |
|
Reads a video file and outputs an image comprised of n resized frames, spread evenly throughout the file. |
|
""" |
|
|
|
default_frame_height = None |
|
default_frame_width = None |
|
default_concat_size = 1 |
|
default_direction = "horizontal" |
|
|
|
def visualize(self, source, nframes, height=default_frame_height, width=default_frame_width, \ |
|
direction=default_direction, trim=False, quiet=True): |
|
""" |
|
Reads a video file and outputs an image comprised of n resized frames, spread evenly throughout the file. |
|
|
|
Parameters: |
|
source (str): filepath to source video file |
|
nframes (int): number of frames to process from the video |
|
height (int): height of each frame, in pixels |
|
width (int): width of each frame, in pixels |
|
direction (str): direction to concatenate frames ("horizontal" or "vertical") |
|
quiet (bool): suppress console messages |
|
|
|
Returns: |
|
visualization image as numpy array |
|
""" |
|
|
|
video = cv2.VideoCapture(source) |
|
if not video.isOpened(): |
|
raise FileNotFoundError("Source Video Not Found") |
|
|
|
if not quiet: |
|
print("") |
|
|
|
|
|
video_total_frames = video.get(cv2.CAP_PROP_FRAME_COUNT) |
|
if not isinstance(nframes, int) or nframes < 1: |
|
raise ValueError("Number of frames must be a positive integer") |
|
elif nframes > video_total_frames: |
|
raise ValueError("Requested frame count larger than total available ({})".format(video_total_frames)) |
|
keyframe_interval = video_total_frames / nframes |
|
|
|
|
|
success,image = video.read() |
|
if not success: |
|
raise IOError("Cannot read from video file") |
|
|
|
|
|
matte_type = 0 |
|
if trim == True: |
|
if not quiet: |
|
print("Trimming enabled, checking matting... ", end="", flush=True) |
|
|
|
|
|
success, cropping_bounds = MatteTrimmer.determine_video_bounds(source, 10, 3) |
|
|
|
matte_type = 0 |
|
if success: |
|
crop_width = cropping_bounds[1][0] - cropping_bounds[0][0] + 1 |
|
crop_height = cropping_bounds[1][1] - cropping_bounds[0][1] + 1 |
|
|
|
if crop_height != image.shape[0]: |
|
matte_type += 1 |
|
if crop_width != image.shape[1]: |
|
matte_type +=2 |
|
|
|
if not quiet: |
|
if matte_type == 0: |
|
print("no matting detected") |
|
elif matte_type == 1: |
|
print("letterboxing detected, cropping {} px from the top and bottom".format(int((image.shape[0] - crop_height) / 2))) |
|
elif matte_type == 2: |
|
print("pillarboxing detected, trimming {} px from the sides".format(int((image.shape[1] - crop_width) / 2))) |
|
elif matte_type == 3: |
|
print("multiple matting detected - cropping ({}, {}) to ({}, {})".format(image.shape[1], image.shape[0], crop_width, crop_height)) |
|
|
|
|
|
if height is None: |
|
if direction == "horizontal": |
|
if matte_type & 1 == 1: |
|
height = crop_height |
|
else: |
|
height = image.shape[0] |
|
else: |
|
height = FrameVis.default_concat_size |
|
elif not isinstance(height, int) or height < 1: |
|
raise ValueError("Frame height must be a positive integer") |
|
|
|
|
|
if width is None: |
|
if direction == "vertical": |
|
if matte_type & 2 == 2: |
|
width = crop_width |
|
else: |
|
width = image.shape[1] |
|
else: |
|
width = FrameVis.default_concat_size |
|
elif not isinstance(width, int) or width < 1: |
|
raise ValueError("Frame width must be a positive integer") |
|
|
|
|
|
if direction == "horizontal": |
|
concatenate = cv2.hconcat |
|
output_width = width * nframes |
|
output_height = height |
|
elif direction == "vertical": |
|
concatenate = cv2.vconcat |
|
output_width = width |
|
output_height = height * nframes |
|
else: |
|
raise ValueError("Invalid direction specified") |
|
|
|
if not quiet: |
|
aspect_ratio = output_width / output_height |
|
print("Visualizing \"{}\" - {} by {} ({:.2f}), from {} frames (every {:.2f} seconds)"\ |
|
.format(source, output_width, output_height, aspect_ratio, nframes, FrameVis.interval_from_nframes(source, nframes))) |
|
|
|
|
|
next_keyframe = keyframe_interval / 2 |
|
finished_frames = 0 |
|
output_image = None |
|
progress = ProgressBar("Processing:") |
|
|
|
while True: |
|
if finished_frames == nframes: |
|
break |
|
|
|
video.set(cv2.CAP_PROP_POS_FRAMES, int(next_keyframe)) |
|
success,image = video.read() |
|
|
|
if not success: |
|
raise IOError("Cannot read from video file (frame {} out of {})".format(int(next_keyframe), video_total_frames)) |
|
|
|
if matte_type != 0: |
|
image = MatteTrimmer.crop_image(image, cropping_bounds) |
|
|
|
image = cv2.resize(image, (width, height)) |
|
|
|
|
|
if output_image is None: |
|
output_image = image |
|
else: |
|
output_image = concatenate([output_image, image]) |
|
|
|
finished_frames += 1 |
|
next_keyframe += keyframe_interval |
|
|
|
if not quiet: |
|
progress.write(finished_frames / nframes) |
|
|
|
video.release() |
|
|
|
return output_image |
|
|
|
@staticmethod |
|
def average_image(image, direction): |
|
""" |
|
Averages the colors in an axis across an entire image |
|
|
|
Parameters: |
|
image (arr x.y.c): image as 3-dimensional numpy array |
|
direction (str): direction to average frames ("horizontal" or "vertical") |
|
|
|
Returns: |
|
image, with pixel data averaged along provided axis |
|
""" |
|
|
|
height, width, depth = image.shape |
|
|
|
if direction == "horizontal": |
|
scale_height = 1 |
|
scale_width = width |
|
elif direction == "vertical": |
|
scale_height = height |
|
scale_width = 1 |
|
else: |
|
raise ValueError("Invalid direction specified") |
|
|
|
image = cv2.resize(image, (scale_width, scale_height)) |
|
image = cv2.resize(image, (width, height)) |
|
|
|
return image |
|
|
|
@staticmethod |
|
def motion_blur(image, direction, blur_amount): |
|
""" |
|
Blurs the pixels in a given axis across an entire image. |
|
|
|
Parameters: |
|
image (arr x.y.c): image as 3-dimensional numpy array |
|
direction (str): direction of stacked images for blurring ("horizontal" or "vertical") |
|
blur_amount (int): how much to blur the image, as the convolution kernel size |
|
|
|
Returns: |
|
image, with pixel data blurred along provided axis |
|
""" |
|
|
|
kernel = np.zeros((blur_amount, blur_amount)) |
|
|
|
|
|
if direction == "horizontal": |
|
kernel[:, int((blur_amount - 1)/2)] = np.ones(blur_amount) |
|
elif direction == "vertical": |
|
kernel[int((blur_amount - 1)/2), :] = np.ones(blur_amount) |
|
else: |
|
raise ValueError("Invalid direction specified") |
|
|
|
kernel /= blur_amount |
|
|
|
return cv2.filter2D(image, -1, kernel) |
|
|
|
@staticmethod |
|
def nframes_from_interval(source, interval): |
|
""" |
|
Calculates the number of frames available in a video file for a given capture interval |
|
|
|
Parameters: |
|
source (str): filepath to source video file |
|
interval (float): capture frame every i seconds |
|
|
|
Returns: |
|
number of frames per time interval (int) |
|
""" |
|
video = cv2.VideoCapture(source) |
|
if not video.isOpened(): |
|
raise FileNotFoundError("Source Video Not Found") |
|
|
|
frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
duration = frame_count / fps |
|
|
|
video.release() |
|
|
|
return int(round(duration / interval)) |
|
|
|
@staticmethod |
|
def interval_from_nframes(source, nframes): |
|
""" |
|
Calculates the capture interval, in seconds, for a video file given the |
|
number of frames to capture |
|
|
|
Parameters: |
|
source (str): filepath to source video file |
|
nframes (int): number of frames to capture from the video file |
|
|
|
Returns: |
|
time interval (seconds) between frame captures (float) |
|
""" |
|
video = cv2.VideoCapture(source) |
|
if not video.isOpened(): |
|
raise FileNotFoundError("Source Video Not Found") |
|
|
|
frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
keyframe_interval = frame_count / nframes |
|
|
|
video.release() |
|
|
|
return keyframe_interval / fps |
|
|
|
|
|
class MatteTrimmer: |
|
""" |
|
Functions for finding and removing black mattes around video frames |
|
""" |
|
|
|
@staticmethod |
|
def find_matrix_edges(matrix, threshold): |
|
""" |
|
Finds the start and end points of a 1D array above a given threshold |
|
|
|
Parameters: |
|
matrix (arr, 1.x): 1D array of data to check |
|
threshold (value): valid data is above this trigger level |
|
|
|
Returns: |
|
tuple with the array indices of data bounds, start and end |
|
""" |
|
|
|
if not isinstance(matrix, (list, tuple, np.ndarray)) or len(matrix.shape) != 1: |
|
raise ValueError("Provided matrix is not the right size (must be 1D)") |
|
|
|
data_start = None |
|
data_end = None |
|
|
|
for value_id, value in enumerate(matrix): |
|
if value > threshold: |
|
if data_start is None: |
|
data_start = value_id |
|
data_end = value_id |
|
|
|
return (data_start, data_end) |
|
|
|
@staticmethod |
|
def find_larger_bound(first, second): |
|
""" |
|
Takes two sets of diagonal rectangular boundary coordinates and determines |
|
the set of rectangular boundary coordinates that contains both |
|
|
|
Parameters: |
|
first (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
|
second (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
|
|
|
Where for both arrays the first coordinate is in the top left-hand corner, |
|
and the second coordinate is in the bottom right-hand corner. |
|
|
|
Returns: |
|
numpy coordinate matrix containing both of the provided boundaries |
|
""" |
|
left_edge = first[0][0] if first[0][0] <= second[0][0] else second[0][0] |
|
right_edge = first[1][0] if first[1][0] >= second[1][0] else second[1][0] |
|
|
|
top_edge = first[0][1] if first[0][1] <= second[0][1] else second[0][1] |
|
bottom_edge = first[1][1] if first[1][1] >= second[1][1] else second[1][1] |
|
|
|
return np.array([[left_edge, top_edge], [right_edge, bottom_edge]]) |
|
|
|
@staticmethod |
|
def valid_bounds(bounds): |
|
""" |
|
Checks if the frame bounds are a valid format |
|
|
|
Parameters: |
|
bounds (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
|
|
|
Returns: |
|
True or False |
|
""" |
|
|
|
for x, x_coordinate in enumerate(bounds): |
|
for y, y_coordinate in enumerate(bounds): |
|
if bounds[x][y] is None: |
|
return False |
|
|
|
if bounds[0][0] > bounds[1][0] or \ |
|
bounds[0][1] > bounds[1][1]: |
|
return False |
|
|
|
return True |
|
|
|
@staticmethod |
|
def determine_image_bounds(image, threshold): |
|
""" |
|
Determines if there are any hard mattes (black bars) surrounding |
|
an image on either the top (letterboxing) or the sides (pillarboxing) |
|
|
|
Parameters: |
|
image (arr, x.y.c): image as 3-dimensional numpy array |
|
threshold (8-bit int): min color channel value to judge as 'image present' |
|
|
|
Returns: |
|
success (bool): True or False if the bounds are valid |
|
image_bounds: numpy coordinate matrix with the two opposite corners of the |
|
image bounds, in the form [(X,Y), (X,Y)] |
|
""" |
|
|
|
height, width, depth = image.shape |
|
|
|
|
|
horizontal_sums = np.sum(image, axis=(1,2)) |
|
hthreshold = (threshold * width * depth) |
|
vertical_edges = MatteTrimmer.find_matrix_edges(horizontal_sums, hthreshold) |
|
|
|
|
|
vertical_sums = np.sum(image, axis=(0,2)) |
|
vthreshold = (threshold * height * depth) |
|
horizontal_edges = MatteTrimmer.find_matrix_edges(vertical_sums, vthreshold) |
|
|
|
image_bounds = np.array([[horizontal_edges[0], vertical_edges[0]], [horizontal_edges[1], vertical_edges[1]]]) |
|
|
|
return MatteTrimmer.valid_bounds(image_bounds), image_bounds |
|
|
|
@staticmethod |
|
def determine_video_bounds(source, nsamples, threshold): |
|
""" |
|
Determines if any matting exists in a video source |
|
|
|
Parameters: |
|
source (str): filepath to source video file |
|
nsamples (int): number of frames from the video to determine bounds, |
|
evenly spaced throughout the video |
|
threshold (8-bit int): min color channel value to judge as 'image present' |
|
|
|
Returns: |
|
success (bool): True or False if the bounds are valid |
|
video_bounds: numpy coordinate matrix with the two opposite corners of the |
|
video bounds, in the form [(X,Y), (X,Y)] |
|
""" |
|
video = cv2.VideoCapture(source) |
|
if not video.isOpened(): |
|
raise FileNotFoundError("Source Video Not Found") |
|
|
|
video_total_frames = video.get(cv2.CAP_PROP_FRAME_COUNT) |
|
if not isinstance(nsamples, int) or nsamples < 1: |
|
raise ValueError("Number of samples must be a positive integer") |
|
keyframe_interval = video_total_frames / nsamples |
|
|
|
|
|
|
|
success,image = video.read() |
|
if not success: |
|
raise IOError("Cannot read from video file") |
|
|
|
next_keyframe = keyframe_interval / 2 |
|
video_bounds = None |
|
|
|
for frame_number in range(nsamples): |
|
video.set(cv2.CAP_PROP_POS_FRAMES, int(next_keyframe)) |
|
success,image = video.read() |
|
|
|
if not success: |
|
raise IOError("Cannot read from video file") |
|
|
|
success, frame_bounds = MatteTrimmer.determine_image_bounds(image, threshold) |
|
|
|
if not success: |
|
continue |
|
|
|
video_bounds = frame_bounds if video_bounds is None else MatteTrimmer.find_larger_bound(video_bounds, frame_bounds) |
|
next_keyframe += keyframe_interval |
|
|
|
video.release() |
|
|
|
return MatteTrimmer.valid_bounds(video_bounds), video_bounds |
|
|
|
@staticmethod |
|
def crop_image(image, bounds): |
|
""" |
|
Crops a provided image by the coordinate bounds pair provided. |
|
|
|
Parameters: |
|
image (arr, x.y.c): image as 3-dimensional numpy array |
|
second (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
|
|
|
Returns: |
|
image as 3-dimensional numpy array, cropped to the coordinate bounds |
|
""" |
|
return image[bounds[0][1]:bounds[1][1], bounds[0][0]:bounds[1][0]] |
|
|
|
class ProgressBar: |
|
""" |
|
Generates a progress bar for the console output |
|
|
|
Args: |
|
pre (str): string to prepend before the progress bar |
|
bar_length (int): length of the progress bar itself, in characters |
|
print_elapsed (bool): option to print time elapsed or not |
|
|
|
Attributes: |
|
pre (str): string to prepend before the progress bar |
|
bar_length (int): length of the progress bar itself, in characters |
|
print_time (bool): option to print time elapsed or not |
|
print_elapsed (int): starting time for the progress bar, in unix seconds |
|
|
|
""" |
|
|
|
def __init__(self, pre="", bar_length=25, print_elapsed=True): |
|
pre = (pre + '\t') if pre != "" else pre |
|
self.pre = pre |
|
self.bar_length = bar_length |
|
self.print_elapsed = print_elapsed |
|
if self.print_elapsed: |
|
self.__start_time = time.time() |
|
|
|
def write(self, percent): |
|
"""Prints a progress bar to the console based on the input percentage (float).""" |
|
term_char = '\r' if percent < 1.0 else '\n' |
|
|
|
filled_size = int(round(self.bar_length * percent)) |
|
progress_bar = "#" * filled_size + " " * (self.bar_length - filled_size) |
|
|
|
time_string = "" |
|
if self.print_elapsed: |
|
time_elapsed = time.time() - self.__start_time |
|
time_string = "\tTime Elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(time_elapsed))) |
|
|
|
print("{}[{}]\t{:.2%}{}".format(self.pre, progress_bar, percent, time_string), end=term_char, flush=True) |
|
|
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="video frame visualizer and movie barcode generator", add_help=False) |
|
|
|
parser.add_argument("source", help="file path for the video file to be visualized", type=str) |
|
parser.add_argument("destination", help="file path output for the final image", type=str) |
|
parser.add_argument("-n", "--nframes", help="the number of frames in the visualization", type=int) |
|
parser.add_argument("-i", "--interval", help="interval between frames for the visualization", type=float) |
|
parser.add_argument("-h", "--height", help="the height of each frame, in pixels", type=int, default=FrameVis.default_frame_height) |
|
parser.add_argument("-w", "--width", help="the output width of each frame, in pixels", type=int, default=FrameVis.default_frame_width) |
|
parser.add_argument("-d", "--direction", help="direction to concatenate frames, horizontal or vertical", type=str, \ |
|
choices=["horizontal", "vertical"], default=FrameVis.default_direction) |
|
parser.add_argument("-t", "--trim", help="detect and trim any hard matting (letterboxing or pillarboxing)", action='store_true', default=False) |
|
parser.add_argument("-a", "--average", help="average colors for each frame", action='store_true', default=False) |
|
parser.add_argument("-b", "--blur", help="apply motion blur to the frames (kernel size)", type=int, nargs='?', const=100, default=0) |
|
parser.add_argument("-q", "--quiet", help="mute console outputs", action='store_true', default=False) |
|
parser.add_argument("--help", action="help", help="show this help message and exit") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.nframes is None: |
|
if args.interval is not None: |
|
args.nframes = FrameVis.nframes_from_interval(args.source, args.interval) |
|
else: |
|
parser.error("You must provide either an --(n)frames or --(i)nterval argument") |
|
|
|
|
|
if args.average is True and args.blur != 0: |
|
parser.error("Cannot (a)verage and (b)lur, you must choose one or the other") |
|
|
|
fv = FrameVis() |
|
|
|
output_image = fv.visualize(args.source, args.nframes, height=args.height, width=args.width, \ |
|
direction=args.direction, trim=args.trim, quiet=args.quiet) |
|
|
|
|
|
if args.average or args.blur != 0: |
|
if args.average: |
|
if not args.quiet: |
|
print("Averaging frame colors... ", end="", flush=True) |
|
output_image = fv.average_image(output_image, args.direction) |
|
|
|
if args.blur != 0: |
|
if not args.quiet: |
|
print("Adding motion blur to final frame... ", end="", flush=True) |
|
output_image = fv.motion_blur(output_image, args.direction, args.blur) |
|
|
|
if not args.quiet: |
|
print("done") |
|
|
|
cv2.imwrite(args.destination, output_image) |
|
|
|
if not args.quiet: |
|
print("Visualization saved to {}".format(args.destination)) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|