File size: 6,792 Bytes
6ed2820 a4b0fcb 6ed2820 a4b0fcb 6ed2820 a4b0fcb 6ed2820 a4b0fcb 6ed2820 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import numpy as np
import mmcv
from pathlib import Path
from collections import namedtuple
import cv2 as cv
from tqdm import tqdm
from mmengine.registry import init_default_scope
from mmengine.visualization import Visualizer
from mmpose.apis import inference_topdown, init_model
from mmdet.apis import inference_detector, init_detector
from .utils import filter_by_catgory, filter_by_score, Timer
from .apis import build_onnx_model_and_task_processor, inference_onnx_model
class PoseInferencer:
def __init__(self,
det_cfg,
pose_cfg,
device='cpu') -> None:
# init
self.det_model_cfg = det_cfg.model_cfg
self.det_model_ckpt = det_cfg.model_ckpt
self.pose_model_cfg = pose_cfg.model_cfg
self.pose_model_ckpt = pose_cfg.model_ckpt
self.detector = init_detector(self.det_model_cfg,
self.det_model_ckpt,
device=device)
self.pose_model = init_model(self.pose_model_cfg,
self.pose_model_ckpt,
device=device)
# use this count to tell the progress
self.video_count = 0
def process_one_image(self, img):
init_default_scope('mmdet')
det_result = inference_detector(self.detector, img)
det_inst = det_result.pred_instances.cpu().numpy()
bboxes, scores, labels = (det_inst.bboxes,
det_inst.scores,
det_inst.labels)
bboxes, scores, labels = filter_by_score(bboxes, scores,
labels, 0.5)
bboxes, scores, labels = filter_by_catgory(bboxes, scores, labels,
['person'])
# inference with pose model
init_default_scope('mmpose')
pose_result = inference_topdown(self.pose_model, img, bboxes)
if len(pose_result) == 0:
# no detection place holder
keypoints = np.zeros((1, 17, 2))
pts_scores = np.zeros((1, 17))
bboxes = np.zeros((1, 4))
scores = np.zeros((1, ))
labels = np.zeros((1, ))
else:
keypoints = np.concatenate([r.pred_instances.keypoints
for r in pose_result])
pts_scores = np.concatenate([r.pred_instances.keypoint_scores
for r in pose_result])
DetInst = namedtuple('DetInst', ['bboxes', 'scores', 'labels'])
PoseInst = namedtuple('PoseInst', ['keypoints', 'pts_scores'])
return DetInst(bboxes, scores, labels), PoseInst(keypoints, pts_scores)
def inference_video(self, video_path):
""" Inference a video with detector and pose model
Return:
all_pose: a list of PoseInst, check the namedtuple definition
all_det: a list of DetInst
"""
video_reader = mmcv.VideoReader(video_path)
all_pose, all_det = [], []
for frame in tqdm(video_reader):
# inference with detector
det, pose = self.process_one_image(frame)
all_pose.append(pose)
all_det.append(det)
return all_det, all_pose
class PoseInferencerV2:
""" V2 Use onnx for detection model, still use pytorch for pose model.
"""
def __init__(self,
det_cfg,
pose_cfg,
device='cpu') -> None:
# init
self.det_deploy_cfg = det_cfg.deploy_cfg
self.det_model_cfg = det_cfg.model_cfg
self.det_backend_files = det_cfg.backend_files
self.pose_model_cfg = pose_cfg.model_cfg
self.pose_model_ckpt = pose_cfg.model_ckpt
self.detector, self.task_processor = \
build_onnx_model_and_task_processor(self.det_model_cfg,
self.det_deploy_cfg,
self.det_backend_files,
device)
self.pose_model = init_model(self.pose_model_cfg,
self.pose_model_ckpt,
device)
# use this count to tell the progress
self.video_count = 0
def process_one_image(self, img):
init_default_scope('mmdet')
det_result = inference_onnx_model(self.detector,
self.task_processor,
self.det_deploy_cfg,
img)
det_inst = det_result[0].pred_instances.cpu().numpy()
bboxes, scores, labels = (det_inst.bboxes,
det_inst.scores,
det_inst.labels)
bboxes, scores, labels = filter_by_score(bboxes, scores,
labels, 0.5)
bboxes, scores, labels = filter_by_catgory(bboxes, scores, labels,
['person'])
# inference with pose model
init_default_scope('mmpose')
pose_result = inference_topdown(self.pose_model, img, bboxes)
if len(pose_result) == 0:
# no detection place holder
keypoints = np.zeros((1, 17, 2))
pts_scores = np.zeros((1, 17))
bboxes = np.zeros((1, 4))
scores = np.zeros((1, ))
labels = np.zeros((1, ))
else:
keypoints = np.concatenate([r.pred_instances.keypoints
for r in pose_result])
pts_scores = np.concatenate([r.pred_instances.keypoint_scores
for r in pose_result])
DetInst = namedtuple('DetInst', ['bboxes', 'scores', 'labels'])
PoseInst = namedtuple('PoseInst', ['keypoints', 'pts_scores'])
return DetInst(bboxes, scores, labels), PoseInst(keypoints, pts_scores)
def inference_video(self, video_path):
""" Inference a video with detector and pose model
Return:
all_pose: a list of PoseInst, check the namedtuple definition
all_det: a list of DetInst
"""
video_reader = mmcv.VideoReader(video_path)
all_pose, all_det = [], []
count = self.video_count + 1
for frame in tqdm(video_reader, desc=f'Inference video {count}'):
# inference with detector
det, pose = self.process_one_image(frame)
all_pose.append(pose)
all_det.append(det)
self.video_count += 1
return all_det, all_pose |