# Copyright (c) Facebook, Inc. and its affiliates. # Modified by Bowen Cheng from: https://github.com/facebookresearch/detectron2/blob/master/demo/demo.py import argparse import glob import multiprocessing as mp import os # fmt: off import sys sys.path.insert(1, os.path.join(sys.path[0], '..')) # fmt: on import tempfile import time import warnings import cv2 import numpy as np import tqdm from torch.cuda.amp import autocast from detectron2.config import get_cfg from detectron2.data.detection_utils import read_image from detectron2.projects.deeplab import add_deeplab_config from detectron2.utils.logger import setup_logger from mask2former import add_maskformer2_config from mask2former_video import add_maskformer2_video_config from predictor import VisualizationDemo # constants WINDOW_NAME = "mask2former video demo" def setup_cfg(args): # load config from file and command-line arguments cfg = get_cfg() add_deeplab_config(cfg) add_maskformer2_config(cfg) add_maskformer2_video_config(cfg) cfg.merge_from_file(args.config_file) cfg.merge_from_list(args.opts) cfg.freeze() return cfg def get_parser(): parser = argparse.ArgumentParser(description="maskformer2 demo for builtin configs") parser.add_argument( "--config-file", default="configs/youtubevis_2019/video_maskformer2_R50_bs16_8ep.yaml", metavar="FILE", help="path to config file", ) parser.add_argument("--video-input", help="Path to video file.") parser.add_argument( "--input", nargs="+", help="A list of space separated input images; " "or a single glob pattern such as 'directory/*.jpg'" "this will be treated as frames of a video", ) parser.add_argument( "--output", help="A file or directory to save output visualizations. " "If not given, will show output in an OpenCV window.", ) parser.add_argument( "--save-frames", default=False, help="Save frame level image outputs.", ) parser.add_argument( "--confidence-threshold", type=float, default=0.5, help="Minimum score for instance predictions to be shown", ) parser.add_argument( "--opts", help="Modify config options using the command-line 'KEY VALUE' pairs", default=[], nargs=argparse.REMAINDER, ) return parser def test_opencv_video_format(codec, file_ext): with tempfile.TemporaryDirectory(prefix="video_format_test") as dir: filename = os.path.join(dir, "test_file" + file_ext) writer = cv2.VideoWriter( filename=filename, fourcc=cv2.VideoWriter_fourcc(*codec), fps=float(30), frameSize=(10, 10), isColor=True, ) [writer.write(np.zeros((10, 10, 3), np.uint8)) for _ in range(30)] writer.release() if os.path.isfile(filename): return True return False if __name__ == "__main__": mp.set_start_method("spawn", force=True) args = get_parser().parse_args() setup_logger(name="fvcore") logger = setup_logger() logger.info("Arguments: " + str(args)) cfg = setup_cfg(args) demo = VisualizationDemo(cfg) if args.output: os.makedirs(args.output, exist_ok=True) if args.input: if len(args.input) == 1: args.input = glob.glob(os.path.expanduser(args.input[0])) assert args.input, "The input path(s) was not found" vid_frames = [] for path in args.input: img = read_image(path, format="BGR") vid_frames.append(img) start_time = time.time() with autocast(): predictions, visualized_output = demo.run_on_video(vid_frames) logger.info( "detected {} instances per frame in {:.2f}s".format( len(predictions["pred_scores"]), time.time() - start_time ) ) if args.output: if args.save_frames: for path, _vis_output in zip(args.input, visualized_output): out_filename = os.path.join(args.output, os.path.basename(path)) _vis_output.save(out_filename) H, W = visualized_output[0].height, visualized_output[0].width cap = cv2.VideoCapture(-1) fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(os.path.join(args.output, "visualization.mp4"), fourcc, 10.0, (W, H), True) for _vis_output in visualized_output: frame = _vis_output.get_image()[:, :, ::-1] out.write(frame) cap.release() out.release() elif args.video_input: video = cv2.VideoCapture(args.video_input) vid_frames = [] while video.isOpened(): success, frame = video.read() if success: vid_frames.append(frame) else: break start_time = time.time() with autocast(): predictions, visualized_output = demo.run_on_video(vid_frames) logger.info( "detected {} instances per frame in {:.2f}s".format( len(predictions["pred_scores"]), time.time() - start_time ) ) if args.output: if args.save_frames: for idx, _vis_output in enumerate(visualized_output): out_filename = os.path.join(args.output, f"{idx}.jpg") _vis_output.save(out_filename) H, W = visualized_output[0].height, visualized_output[0].width cap = cv2.VideoCapture(-1) fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(os.path.join(args.output, "visualization.mp4"), fourcc, 10.0, (W, H), True) for _vis_output in visualized_output: frame = _vis_output.get_image()[:, :, ::-1] out.write(frame) cap.release() out.release()