VNext / app.py
osanseviero's picture
osanseviero HF staff
Update app.py
acc3d84
import atexit
import bisect
import multiprocessing as mp
from collections import deque
import cv2
import torch
import argparse
import glob
import multiprocessing as mp
import numpy as np
import os
import tempfile
import time
import warnings
import cv2
import subprocess
import tqdm
import gradio as gr
TOTAL_FRAMES = 60
subprocess.run(["pip", "install", "git+https://github.com/wjf5203/VNext.git"])
subprocess.run(["git", "clone", "https://github.com/wjf5203/VNext"])
from detectron2.data import MetadataCatalog
from detectron2.data.detection_utils import read_image
from detectron2.engine.defaults import DefaultPredictor
from detectron2.utils.video_visualizer import VideoVisualizer
from detectron2.utils.visualizer import ColorMode, Visualizer
from detectron2.config import get_cfg
from detectron2.utils.logger import setup_logger
def test_opencv_video_format(codec, file_ext):
with tempfile.TemporaryDirectory(prefix="video_format_test") as dir:
filename = os.path.join(dir, "test_file" + file_ext)
writer = cv2.VideoWriter(
filename=filename,
fourcc=cv2.VideoWriter_fourcc(*codec),
fps=float(30),
frameSize=(10, 10),
isColor=True,
)
[writer.write(np.zeros((10, 10, 3), np.uint8)) for _ in range(30)]
writer.release()
if os.path.isfile(filename):
return True
return False
def setup_cfg(cfg):
# load config from file and command-line arguments
cfg = get_cfg()
# To use demo for Panoptic-DeepLab, please uncomment the following two lines.
# from detectron2.projects.panoptic_deeplab import add_panoptic_deeplab_config # noqa
# add_panoptic_deeplab_config(cfg)
cfg.merge_from_file("VNext/configs/quick_schedules/mask_rcnn_R_50_FPN_inference_acc_test.yaml")
# Set score_threshold for builtin models
cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.5
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
cfg.MODEL.PANOPTIC_FPN.COMBINE.INSTANCES_CONFIDENCE_THRESH = 0.5
cfg.freeze()
return cfg
predictor = DefaultPredictor(setup_cfg({}))
metadata = MetadataCatalog.get("__unused")
def run_on_video(video, total_frames):
video_visualizer = VideoVisualizer(metadata, ColorMode.IMAGE)
def _frame_from_video(video):
while video.isOpened():
success, frame = video.read()
if success:
yield frame
else:
break
def process_predictions(frame, predictions):
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if "panoptic_seg" in predictions:
panoptic_seg, segments_info = predictions["panoptic_seg"]
vis_frame = video_visualizer.draw_panoptic_seg_predictions(
frame, panoptic_seg.to("cpu"), segments_info
)
elif "instances" in predictions:
predictions = predictions["instances"].to("cpu")
vis_frame = video_visualizer.draw_instance_predictions(frame, predictions)
elif "sem_seg" in predictions:
vis_frame = video_visualizer.draw_sem_seg(
frame, predictions["sem_seg"].argmax(dim=0).to("cpu")
)
# Converts Matplotlib RGB format to OpenCV BGR format
vis_frame = cv2.cvtColor(vis_frame.get_image(), cv2.COLOR_RGB2BGR)
return vis_frame
frame_gen = _frame_from_video(video)
i = 0
for frame in frame_gen:
i += 1
if i == total_frames:
return
yield process_predictions(frame, predictor(frame))
def inference(video):
video = cv2.VideoCapture(video)
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames_per_second = video.get(cv2.CAP_PROP_FPS)
num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
print(num_frames)
if num_frames>TOTAL_FRAMES:
num_frames=TOTAL_FRAMES
codec, file_ext = (
("x264", ".mkv") if test_opencv_video_format("x264", ".mkv") else ("mp4v", ".mp4")
)
print(codec, file_ext)
output_fname = "result.mp4"
output_file = cv2.VideoWriter(
filename=output_fname,
fourcc=cv2.VideoWriter_fourcc(*codec),
fps=float(frames_per_second),
frameSize=(width, height),
isColor=True,
)
for vis_frame in tqdm.tqdm(run_on_video(video, num_frames), total=num_frames):
output_file.write(vis_frame)
video.release()
output_file.release()
out_file = tempfile.NamedTemporaryFile(suffix="out.mp4", delete=False)
subprocess.run(f"ffmpeg -y -loglevel quiet -stats -i {output_fname} -c:v libx264 {out_file.name}".split())
return out_file.name
video_interface = gr.Interface(
fn=inference,
inputs=[
gr.Video(type="file"),
],
outputs=gr.Video(type="file", format="mp4"),
examples=[
["inps.mp4"], ["example_3.mp4"],
],
allow_flagging=False,
allow_screenshot=False,
title="VNext",
description="demo for <a href='https://github.com/wjf5203/VNext'>wjf5203/VNext</a>"
).launch(debug=True)