Mask2Former / demo_video /
Ahsen Khaliq
add files
raw history blame
No virus
7.63 kB
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
# reference:
import atexit
import bisect
import multiprocessing as mp
from collections import deque
import cv2
import torch
from visualizer import TrackVisualizer
from import MetadataCatalog
from detectron2.engine.defaults import DefaultPredictor
from detectron2.structures import Instances
from detectron2.utils.video_visualizer import VideoVisualizer
from detectron2.utils.visualizer import ColorMode
class VisualizationDemo(object):
def __init__(self, cfg, instance_mode=ColorMode.IMAGE, parallel=False):
cfg (CfgNode):
instance_mode (ColorMode):
parallel (bool): whether to run the model in different processes from visualization.
Useful since the visualization logic can be slow.
self.metadata = MetadataCatalog.get(
cfg.DATASETS.TEST[0] if len(cfg.DATASETS.TEST) else "__unused"
self.cpu_device = torch.device("cpu")
self.instance_mode = instance_mode
self.parallel = parallel
if parallel:
num_gpu = torch.cuda.device_count()
self.predictor = AsyncPredictor(cfg, num_gpus=num_gpu)
self.predictor = VideoPredictor(cfg)
def run_on_video(self, frames):
frames (List[np.ndarray]): a list of images of shape (H, W, C) (in BGR order).
This is the format used by OpenCV.
predictions (dict): the output of the model.
vis_output (VisImage): the visualized image output.
vis_output = None
predictions = self.predictor(frames)
image_size = predictions["image_size"]
pred_scores = predictions["pred_scores"]
pred_labels = predictions["pred_labels"]
pred_masks = predictions["pred_masks"]
frame_masks = list(zip(*pred_masks))
total_vis_output = []
for frame_idx in range(len(frames)):
frame = frames[frame_idx][:, :, ::-1]
visualizer = TrackVisualizer(frame, self.metadata, instance_mode=self.instance_mode)
ins = Instances(image_size)
if len(pred_scores) > 0:
ins.scores = pred_scores
ins.pred_classes = pred_labels
ins.pred_masks = torch.stack(frame_masks[frame_idx], dim=0)
vis_output = visualizer.draw_instance_predictions(predictions=ins)
return predictions, total_vis_output
class VideoPredictor(DefaultPredictor):
Create a simple end-to-end predictor with the given config that runs on
single device for a single input image.
Compared to using the model directly, this class does the following additions:
1. Load checkpoint from `cfg.MODEL.WEIGHTS`.
2. Always take BGR image as the input and apply conversion defined by `cfg.INPUT.FORMAT`.
3. Apply resizing defined by `cfg.INPUT.{MIN,MAX}_SIZE_TEST`.
4. Take one input image and produce a single output, instead of a batch.
If you'd like to do anything more fancy, please refer to its source code
as examples to build and use the model manually.
metadata (Metadata): the metadata of the underlying dataset, obtained from
pred = DefaultPredictor(cfg)
inputs = cv2.imread("input.jpg")
outputs = pred(inputs)
def __call__(self, frames):
original_image (np.ndarray): an image of shape (H, W, C) (in BGR order).
predictions (dict):
the output of the model for one image only.
See :doc:`/tutorials/models` for details about the format.
with torch.no_grad(): #
input_frames = []
for original_image in frames:
# Apply pre-processing to image.
if self.input_format == "RGB":
# whether the model expects BGR inputs or RGB
original_image = original_image[:, :, ::-1]
height, width = original_image.shape[:2]
image = self.aug.get_transform(original_image).apply_image(original_image)
image = torch.as_tensor(image.astype("float32").transpose(2, 0, 1))
inputs = {"image": input_frames, "height": height, "width": width}
predictions = self.model([inputs])
return predictions
class AsyncPredictor:
A predictor that runs the model asynchronously, possibly on >1 GPUs.
Because rendering the visualization takes considerably amount of time,
this helps improve throughput when rendering videos.
class _StopToken:
class _PredictWorker(mp.Process):
def __init__(self, cfg, task_queue, result_queue):
self.cfg = cfg
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
predictor = VideoPredictor(self.cfg)
while True:
task = self.task_queue.get()
if isinstance(task, AsyncPredictor._StopToken):
idx, data = task
result = predictor(data)
self.result_queue.put((idx, result))
def __init__(self, cfg, num_gpus: int = 1):
cfg (CfgNode):
num_gpus (int): if 0, will run on CPU
num_workers = max(num_gpus, 1)
self.task_queue = mp.Queue(maxsize=num_workers * 3)
self.result_queue = mp.Queue(maxsize=num_workers * 3)
self.procs = []
for gpuid in range(max(num_gpus, 1)):
cfg = cfg.clone()
cfg.MODEL.DEVICE = "cuda:{}".format(gpuid) if num_gpus > 0 else "cpu"
AsyncPredictor._PredictWorker(cfg, self.task_queue, self.result_queue)
self.put_idx = 0
self.get_idx = 0
self.result_rank = []
self.result_data = []
for p in self.procs:
def put(self, image):
self.put_idx += 1
self.task_queue.put((self.put_idx, image))
def get(self):
self.get_idx += 1 # the index needed for this request
if len(self.result_rank) and self.result_rank[0] == self.get_idx:
res = self.result_data[0]
del self.result_data[0], self.result_rank[0]
return res
while True:
# make sure the results are returned in the correct order
idx, res = self.result_queue.get()
if idx == self.get_idx:
return res
insert = bisect.bisect(self.result_rank, idx)
self.result_rank.insert(insert, idx)
self.result_data.insert(insert, res)
def __len__(self):
return self.put_idx - self.get_idx
def __call__(self, image):
return self.get()
def shutdown(self):
for _ in self.procs:
def default_buffer_size(self):
return len(self.procs) * 5