import argparse import os import cv2 import numpy as np from loguru import logger import onnxruntime from yolox.data.data_augment import preproc as preprocess from yolox.utils import mkdir, multiclass_nms, demo_postprocess, vis from yolox.utils.visualize import plot_tracking from yolox.tracker.byte_tracker import BYTETracker from yolox.tracking_utils.timer import Timer def make_parser(): parser = argparse.ArgumentParser("onnxruntime inference sample") parser.add_argument( "-m", "--model", type=str, default="bytetrack_s.onnx", help="Input your onnx model.", ) parser.add_argument( "-i", "--video_path", type=str, default='../../videos/palace.mp4', help="Path to your input image.", ) parser.add_argument( "-o", "--output_dir", type=str, default='.', help="Path to your output directory.", ) parser.add_argument( "-s", "--score_thr", type=float, default=0.1, help="Score threshould to filter the result.", ) parser.add_argument( "-n", "--nms_thr", type=float, default=0.7, help="NMS threshould.", ) parser.add_argument( "--input_shape", type=str, default="608,1088", help="Specify an input shape for inference.", ) parser.add_argument( "--with_p6", action="store_true", help="Whether your model uses p6 in FPN/PAN.", ) # tracking args parser.add_argument("--track_thresh", type=float, default=0.5, help="tracking confidence threshold") parser.add_argument("--track_buffer", type=int, default=30, help="the frames for keep lost tracks") parser.add_argument("--match_thresh", type=int, default=0.8, help="matching threshold for tracking") parser.add_argument('--min-box-area', type=float, default=10, help='filter out tiny boxes') parser.add_argument("--mot20", dest="mot20", default=False, action="store_true", help="test mot20.") return parser class Predictor(object): def __init__(self, args): self.rgb_means = (0.485, 0.456, 0.406) self.std = (0.229, 0.224, 0.225) self.args = args self.session = onnxruntime.InferenceSession(args.model) self.input_shape = tuple(map(int, args.input_shape.split(','))) def inference(self, ori_img, timer): img_info = {"id": 0} height, width = ori_img.shape[:2] img_info["height"] = height img_info["width"] = width img_info["raw_img"] = ori_img img, ratio = preprocess(ori_img, self.input_shape, self.rgb_means, self.std) img_info["ratio"] = ratio ort_inputs = {self.session.get_inputs()[0].name: img[None, :, :, :]} timer.tic() output = self.session.run(None, ort_inputs) predictions = demo_postprocess(output[0], self.input_shape, p6=self.args.with_p6)[0] boxes = predictions[:, :4] scores = predictions[:, 4:5] * predictions[:, 5:] boxes_xyxy = np.ones_like(boxes) boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2]/2. boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3]/2. boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2]/2. boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3]/2. boxes_xyxy /= ratio dets = multiclass_nms(boxes_xyxy, scores, nms_thr=self.args.nms_thr, score_thr=self.args.score_thr) return dets[:, :-1], img_info def imageflow_demo(predictor, args): cap = cv2.VideoCapture(args.video_path) width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) # float height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) # float fps = cap.get(cv2.CAP_PROP_FPS) save_folder = args.output_dir os.makedirs(save_folder, exist_ok=True) save_path = os.path.join(save_folder, args.video_path.split("/")[-1]) logger.info(f"video save_path is {save_path}") vid_writer = cv2.VideoWriter( save_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (int(width), int(height)) ) tracker = BYTETracker(args, frame_rate=30) timer = Timer() frame_id = 0 results = [] while True: if frame_id % 20 == 0: logger.info('Processing frame {} ({:.2f} fps)'.format(frame_id, 1. / max(1e-5, timer.average_time))) ret_val, frame = cap.read() if ret_val: outputs, img_info = predictor.inference(frame, timer) online_targets = tracker.update(outputs, [img_info['height'], img_info['width']], [img_info['height'], img_info['width']]) online_tlwhs = [] online_ids = [] online_scores = [] for t in online_targets: tlwh = t.tlwh tid = t.track_id vertical = tlwh[2] / tlwh[3] > 1.6 if tlwh[2] * tlwh[3] > args.min_box_area and not vertical: online_tlwhs.append(tlwh) online_ids.append(tid) online_scores.append(t.score) timer.toc() results.append((frame_id + 1, online_tlwhs, online_ids, online_scores)) online_im = plot_tracking(img_info['raw_img'], online_tlwhs, online_ids, frame_id=frame_id + 1, fps=1. / timer.average_time) vid_writer.write(online_im) ch = cv2.waitKey(1) if ch == 27 or ch == ord("q") or ch == ord("Q"): break else: break frame_id += 1 if __name__ == '__main__': args = make_parser().parse_args() predictor = Predictor(args) imageflow_demo(predictor, args)