|
import os |
|
import numpy as np |
|
from typing import List, Tuple |
|
import onnxruntime as ort |
|
import cv2 |
|
from queue import Queue |
|
os.environ['ORT_TENSORRT_EXTRA_PLUGIN_LIB_PATHS']='libmmdeploy_tensorrt_ops.so' |
|
|
|
|
|
coco17 = dict(name='coco17', |
|
keypoint_info={ |
|
0: |
|
dict(name='nose', id=0, color=[51, 153, 255], swap=''), |
|
1: |
|
dict(name='left_eye', |
|
id=1, |
|
color=[51, 153, 255], |
|
swap='right_eye'), |
|
2: |
|
dict(name='right_eye', |
|
id=2, |
|
color=[51, 153, 255], |
|
swap='left_eye'), |
|
3: |
|
dict(name='left_ear', |
|
id=3, |
|
color=[51, 153, 255], |
|
swap='right_ear'), |
|
4: |
|
dict(name='right_ear', |
|
id=4, |
|
color=[51, 153, 255], |
|
swap='left_ear'), |
|
5: |
|
dict(name='left_shoulder', |
|
id=5, |
|
color=[0, 255, 0], |
|
swap='right_shoulder'), |
|
6: |
|
dict(name='right_shoulder', |
|
id=6, |
|
color=[255, 128, 0], |
|
swap='left_shoulder'), |
|
7: |
|
dict(name='left_elbow', |
|
id=7, |
|
color=[0, 255, 0], |
|
swap='right_elbow'), |
|
8: |
|
dict(name='right_elbow', |
|
id=8, |
|
color=[255, 128, 0], |
|
swap='left_elbow'), |
|
9: |
|
dict(name='left_wrist', |
|
id=9, |
|
color=[0, 255, 0], |
|
swap='right_wrist'), |
|
10: |
|
dict(name='right_wrist', |
|
id=10, |
|
color=[255, 128, 0], |
|
swap='left_wrist'), |
|
11: |
|
dict(name='left_hip', |
|
id=11, |
|
color=[0, 255, 0], |
|
swap='right_hip'), |
|
12: |
|
dict(name='right_hip', |
|
id=12, |
|
color=[255, 128, 0], |
|
swap='left_hip'), |
|
13: |
|
dict(name='left_knee', |
|
id=13, |
|
color=[0, 255, 0], |
|
swap='right_knee'), |
|
14: |
|
dict(name='right_knee', |
|
id=14, |
|
color=[255, 128, 0], |
|
swap='left_knee'), |
|
15: |
|
dict(name='left_ankle', |
|
id=15, |
|
color=[0, 255, 0], |
|
swap='right_ankle'), |
|
16: |
|
dict(name='right_ankle', |
|
id=16, |
|
color=[255, 128, 0], |
|
swap='left_ankle') |
|
}, |
|
skeleton_info={ |
|
0: |
|
dict(link=('left_ankle', 'left_knee'), |
|
id=0, |
|
color=[0, 255, 0]), |
|
1: |
|
dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255, |
|
0]), |
|
2: |
|
dict(link=('right_ankle', 'right_knee'), |
|
id=2, |
|
color=[255, 128, 0]), |
|
3: |
|
dict(link=('right_knee', 'right_hip'), |
|
id=3, |
|
color=[255, 128, 0]), |
|
4: |
|
dict(link=('left_hip', 'right_hip'), |
|
id=4, |
|
color=[51, 153, 255]), |
|
5: |
|
dict(link=('left_shoulder', 'left_hip'), |
|
id=5, |
|
color=[51, 153, 255]), |
|
6: |
|
dict(link=('right_shoulder', 'right_hip'), |
|
id=6, |
|
color=[51, 153, 255]), |
|
7: |
|
dict(link=('left_shoulder', 'right_shoulder'), |
|
id=7, |
|
color=[51, 153, 255]), |
|
8: |
|
dict(link=('left_shoulder', 'left_elbow'), |
|
id=8, |
|
color=[0, 255, 0]), |
|
9: |
|
dict(link=('right_shoulder', 'right_elbow'), |
|
id=9, |
|
color=[255, 128, 0]), |
|
10: |
|
dict(link=('left_elbow', 'left_wrist'), |
|
id=10, |
|
color=[0, 255, 0]), |
|
11: |
|
dict(link=('right_elbow', 'right_wrist'), |
|
id=11, |
|
color=[255, 128, 0]), |
|
12: |
|
dict(link=('left_eye', 'right_eye'), |
|
id=12, |
|
color=[51, 153, 255]), |
|
13: |
|
dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]), |
|
14: |
|
dict(link=('nose', 'right_eye'), id=14, color=[51, 153, |
|
255]), |
|
15: |
|
dict(link=('left_eye', 'left_ear'), |
|
id=15, |
|
color=[51, 153, 255]), |
|
16: |
|
dict(link=('right_eye', 'right_ear'), |
|
id=16, |
|
color=[51, 153, 255]), |
|
17: |
|
dict(link=('left_ear', 'left_shoulder'), |
|
id=17, |
|
color=[51, 153, 255]), |
|
18: |
|
dict(link=('right_ear', 'right_shoulder'), |
|
id=18, |
|
color=[51, 153, 255]) |
|
}) |
|
|
|
|
|
def draw_mmpose(img, |
|
keypoints, |
|
scores, |
|
keypoint_info, |
|
skeleton_info, |
|
kpt_thr=0.5, |
|
radius=2, |
|
line_width=2): |
|
assert len(keypoints.shape) == 2 |
|
|
|
vis_kpt = [s >= kpt_thr for s in scores] |
|
|
|
link_dict = {} |
|
for i, kpt_info in keypoint_info.items(): |
|
kpt_color = tuple(kpt_info['color']) |
|
link_dict[kpt_info['name']] = kpt_info['id'] |
|
|
|
kpt = keypoints[i] |
|
|
|
if vis_kpt[i]: |
|
img = cv2.circle(img, (int(kpt[0]), int(kpt[1])), int(radius), |
|
kpt_color, -1) |
|
|
|
for i, ske_info in skeleton_info.items(): |
|
link = ske_info['link'] |
|
pt0, pt1 = link_dict[link[0]], link_dict[link[1]] |
|
|
|
if vis_kpt[pt0] and vis_kpt[pt1]: |
|
link_color = ske_info['color'] |
|
kpt0 = keypoints[pt0] |
|
kpt1 = keypoints[pt1] |
|
|
|
img = cv2.line(img, (int(kpt0[0]), int(kpt0[1])), |
|
(int(kpt1[0]), int(kpt1[1])), |
|
link_color, |
|
thickness=line_width) |
|
|
|
return img |
|
|
|
|
|
def draw_skeleton(img, |
|
keypoints, |
|
scores, |
|
kpt_thr=0.5, |
|
radius=2, |
|
line_width=2): |
|
num_keypoints = keypoints.shape[1] |
|
|
|
if num_keypoints == 17: |
|
skeleton = 'coco17' |
|
else: |
|
raise NotImplementedError |
|
|
|
skeleton_dict = eval(f'{skeleton}') |
|
keypoint_info = skeleton_dict['keypoint_info'] |
|
skeleton_info = skeleton_dict['skeleton_info'] |
|
|
|
if len(keypoints.shape) == 2: |
|
keypoints = keypoints[None, :, :] |
|
scores = scores[None, :, :] |
|
|
|
num_instance = keypoints.shape[0] |
|
if skeleton in ['coco17']: |
|
for i in range(num_instance): |
|
img = draw_mmpose(img, keypoints[i], scores[i], keypoint_info, |
|
skeleton_info, kpt_thr, radius, line_width) |
|
else: |
|
raise NotImplementedError |
|
return img |
|
|
|
def is_onnx_model(model_path): |
|
try: |
|
ort.InferenceSession(model_path, providers=["CPUExecutionProvider"]) |
|
return True |
|
except Exception as e: |
|
return False |
|
|
|
def is_trt_engine(model_path): |
|
try: |
|
from polygraphy.backend.common import BytesFromPath |
|
from polygraphy.backend.trt import EngineFromBytes |
|
engine = EngineFromBytes(BytesFromPath(model_path)) |
|
return engine is not None |
|
except Exception: |
|
return False |
|
|
|
def get_onnx_input_shapes(model_path): |
|
from polygraphy.backend.onnx.loader import OnnxFromPath |
|
from polygraphy.backend.onnx import infer_shapes |
|
model = OnnxFromPath(model_path)() |
|
model = infer_shapes(model) |
|
input_shapes = {inp.name: inp.type.tensor_type.shape for inp in model.graph.input} |
|
return {name: [dim.dim_value if dim.dim_value > 0 else 'Dynamic' for dim in shape_proto.dim] |
|
for name, shape_proto in input_shapes.items()} |
|
|
|
def get_trt_input_shapes(model_path): |
|
input_shapes = {} |
|
import tensorrt as trt |
|
with open(model_path, "rb") as f, trt.Runtime(trt.Logger(trt.Logger.WARNING)) as runtime: |
|
engine = runtime.deserialize_cuda_engine(f.read()) |
|
for binding in engine: |
|
if engine.binding_is_input(binding): |
|
input_shapes[binding] = engine.get_binding_shape(binding) |
|
return input_shapes |
|
|
|
class RTMO_GPU(object): |
|
|
|
def preprocess(self, img: np.ndarray): |
|
"""Do preprocessing for RTMPose model inference. |
|
|
|
Args: |
|
img (np.ndarray): Input image in shape. |
|
|
|
Returns: |
|
tuple: |
|
- resized_img (np.ndarray): Preprocessed image. |
|
- center (np.ndarray): Center of image. |
|
- scale (np.ndarray): Scale of image. |
|
""" |
|
if len(img.shape) == 3: |
|
padded_img = np.ones( |
|
(self.model_input_size[0], self.model_input_size[1], 3), |
|
dtype=np.uint8) * 114 |
|
else: |
|
padded_img = np.ones(self.model_input_size, dtype=np.uint8) * 114 |
|
|
|
ratio = min(self.model_input_size[0] / img.shape[0], |
|
self.model_input_size[1] / img.shape[1]) |
|
resized_img = cv2.resize( |
|
img, |
|
(int(img.shape[1] * ratio), int(img.shape[0] * ratio)), |
|
interpolation=cv2.INTER_LINEAR, |
|
).astype(np.uint8) |
|
padded_shape = (int(img.shape[0] * ratio), int(img.shape[1] * ratio)) |
|
padded_img[:padded_shape[0], :padded_shape[1]] = resized_img |
|
|
|
|
|
if self.mean is not None: |
|
self.mean = np.array(self.mean) |
|
self.std = np.array(self.std) |
|
padded_img = (padded_img - self.mean) / self.std |
|
|
|
return padded_img, ratio |
|
|
|
def postprocess( |
|
self, |
|
outputs: List[np.ndarray], |
|
ratio: float = 1., |
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
"""Do postprocessing for RTMO model inference. |
|
|
|
Args: |
|
outputs (List[np.ndarray]): Outputs of RTMO model. |
|
ratio (float): Ratio of preprocessing. |
|
|
|
Returns: |
|
tuple: |
|
- final_boxes (np.ndarray): Final bounding boxes. |
|
- final_scores (np.ndarray): Final scores. |
|
""" |
|
|
|
if not self.is_yolo_nas_pose: |
|
|
|
det_outputs, pose_outputs = outputs |
|
|
|
|
|
pack_dets = (det_outputs[0, :, :4], det_outputs[0, :, 4]) |
|
final_boxes, final_scores = pack_dets |
|
final_boxes /= ratio |
|
isscore = final_scores > 0.3 |
|
isbbox = [i for i in isscore] |
|
|
|
|
|
|
|
keypoints, scores = pose_outputs[0, :, :, :2], pose_outputs[0, :, :, 2] |
|
keypoints = keypoints / ratio |
|
|
|
keypoints = keypoints[isbbox] |
|
scores = scores[isbbox] |
|
else: |
|
|
|
flat_predictions = outputs[0] |
|
if flat_predictions.shape[0] > 0: |
|
mask = flat_predictions[:, 0] == 0 |
|
pred_bboxes = flat_predictions[mask, 1:5] |
|
pred_joints = flat_predictions[mask, 6:].reshape((len(pred_bboxes), -1, 3)) |
|
keypoints, scores = pred_joints[:,:,:2], pred_joints[:,:,-1] |
|
keypoints = keypoints / ratio |
|
else: |
|
keypoints, scores = np.zeros((0, 17, 2)), np.zeros((0, 17)) |
|
|
|
return keypoints, scores |
|
|
|
def inference(self, img: np.ndarray): |
|
"""Inference model. |
|
|
|
Args: |
|
img (np.ndarray): Input image in shape. |
|
|
|
Returns: |
|
outputs (np.ndarray): Output of RTMPose model. |
|
""" |
|
|
|
|
|
img = img.transpose(2, 0, 1) |
|
img = np.ascontiguousarray(img, dtype=np.float32 if not self.is_yolo_nas_pose else np.uint8) |
|
input = img[None, :, :, :] |
|
|
|
if self.model_format == 'onnx': |
|
|
|
|
|
io_binding = self.session.io_binding() |
|
|
|
if not self.is_yolo_nas_pose: |
|
|
|
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data) |
|
io_binding.bind_output(name='dets') |
|
io_binding.bind_output(name='keypoints') |
|
else: |
|
|
|
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data) |
|
io_binding.bind_output(name='graph2_flat_predictions') |
|
|
|
|
|
self.session.run_with_iobinding(io_binding) |
|
|
|
|
|
outputs = [output.numpy() for output in io_binding.get_outputs()] |
|
|
|
else: |
|
|
|
if not self.session.is_active: |
|
self.session.activate() |
|
|
|
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False) |
|
outputs = [output for output in outputs.values()] |
|
|
|
return outputs |
|
|
|
def __exit__(self): |
|
if self.model_format == 'engine': |
|
if self.session.is_active: |
|
self.session.deactivate() |
|
|
|
def __call__(self, image: np.ndarray): |
|
image, ratio = self.preprocess(image) |
|
|
|
|
|
outputs = self.inference(image) |
|
|
|
keypoints, scores = self.postprocess(outputs, ratio) |
|
|
|
return keypoints, scores |
|
|
|
def __init__(self, |
|
model: str = None, |
|
mean: tuple = None, |
|
std: tuple = None, |
|
device: str = 'cuda', |
|
is_yolo_nas_pose = False): |
|
|
|
if not os.path.exists(model): |
|
|
|
raise FileNotFoundError(f"The specified ONNX model file was not found: {model}") |
|
|
|
self.model = model |
|
if is_onnx_model(model): |
|
self.model_format = 'onnx' |
|
self.input_shape = get_onnx_input_shapes(self.model)['input'] |
|
elif is_trt_engine(model): |
|
self.model_format = 'engine' |
|
from polygraphy.backend.common import BytesFromPath |
|
from polygraphy.backend.trt import EngineFromBytes, TrtRunner, load_plugins |
|
load_plugins(plugins=['libmmdeploy_tensorrt_ops.so']) |
|
self.input_shape = get_trt_input_shapes(self.model)['input'] |
|
else: |
|
raise TypeError("Your model is neither ONNX nor Engine !") |
|
|
|
|
|
if self.model_format == 'onnx': |
|
|
|
providers = {'cpu': 'CPUExecutionProvider', |
|
'cuda': [ |
|
|
|
|
|
|
|
|
|
('CUDAExecutionProvider', { |
|
'cudnn_conv_algo_search': 'DEFAULT', |
|
'cudnn_conv_use_max_workspace': True |
|
}), |
|
'OpenVINOExecutionProvider', |
|
'CPUExecutionProvider']} |
|
|
|
self.session = ort.InferenceSession(path_or_bytes=model, |
|
providers=providers[device]) |
|
|
|
else: |
|
engine = EngineFromBytes(BytesFromPath(model)) |
|
self.session = TrtRunner(engine) |
|
|
|
self.model_input_size = self.input_shape[2:4] |
|
self.mean = mean |
|
self.std = std |
|
self.device = device |
|
self.is_yolo_nas_pose = is_yolo_nas_pose |
|
|
|
class RTMO_GPU_Batch(RTMO_GPU): |
|
def preprocess_batch(self, imgs: List[np.ndarray]) -> Tuple[np.ndarray, List[float]]: |
|
"""Process a batch of images for RTMPose model inference. |
|
|
|
Args: |
|
imgs (List[np.ndarray]): List of input images. |
|
|
|
Returns: |
|
tuple: |
|
- batch_img (np.ndarray): Batch of preprocessed images. |
|
- ratios (List[float]): Ratios used for preprocessing each image. |
|
""" |
|
batch_img = [] |
|
ratios = [] |
|
|
|
for img in imgs: |
|
preprocessed_img, ratio = super().preprocess(img) |
|
batch_img.append(preprocessed_img) |
|
ratios.append(ratio) |
|
|
|
|
|
batch_img = np.stack(batch_img, axis=0) |
|
|
|
return batch_img, ratios |
|
|
|
def inference(self, batch_img: np.ndarray): |
|
"""Override to handle batch inference. |
|
|
|
Args: |
|
batch_img (np.ndarray): Batch of preprocessed images. |
|
|
|
Returns: |
|
outputs (List[np.ndarray]): Outputs of RTMPose model for each image. |
|
""" |
|
batch_img = batch_img.transpose(0, 3, 1, 2) |
|
batch_img = np.ascontiguousarray(batch_img, dtype=np.float32) |
|
|
|
input = batch_img |
|
|
|
if self.model_format == 'onnx': |
|
|
|
|
|
io_binding = self.session.io_binding() |
|
|
|
if not self.is_yolo_nas_pose: |
|
|
|
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data) |
|
io_binding.bind_output(name='dets') |
|
io_binding.bind_output(name='keypoints') |
|
else: |
|
|
|
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data) |
|
io_binding.bind_output(name='graph2_flat_predictions') |
|
|
|
|
|
self.session.run_with_iobinding(io_binding) |
|
|
|
|
|
outputs = [output.numpy() for output in io_binding.get_outputs()] |
|
|
|
else: |
|
|
|
if not self.session.is_active: |
|
self.session.activate() |
|
|
|
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False) |
|
outputs = [output for output in outputs.values()] |
|
|
|
return outputs |
|
|
|
def postprocess_batch( |
|
self, |
|
outputs: List[np.ndarray], |
|
ratios: List[float] |
|
) -> Tuple[List[np.ndarray], List[np.ndarray]]: |
|
"""Process outputs for a batch of images. |
|
|
|
Args: |
|
outputs (List[np.ndarray]): Outputs from the model for each image. |
|
ratios (List[float]): Ratios used for preprocessing each image. |
|
|
|
Returns: |
|
List[Tuple[np.ndarray, np.ndarray]]: keypoints and scores for each image. |
|
""" |
|
batch_keypoints = [] |
|
batch_scores = [] |
|
|
|
b_dets, b_keypoints = outputs |
|
for i, ratio in enumerate(ratios): |
|
output = [np.expand_dims(b_dets[i], axis=0), np.expand_dims(b_keypoints[i],axis=0)] |
|
keypoints, scores = super().postprocess(output, ratio) |
|
batch_keypoints.append(keypoints) |
|
batch_scores.append(scores) |
|
|
|
return batch_keypoints, batch_scores |
|
|
|
def __batch_call__(self, images: List[np.ndarray]): |
|
batch_img, ratios = self.preprocess_batch(images) |
|
outputs = self.inference(batch_img) |
|
keypoints, scores = self.postprocess_batch(outputs, ratios) |
|
return keypoints, scores |
|
|
|
def __call__(self, image: np.array): |
|
self.buffer.append(image) |
|
|
|
if len(self.buffer) == self.batch_size: |
|
b_keypoints, b_scores = self.__batch_call__(self.buffer) |
|
for keypoints, scores in zip(b_keypoints, b_scores): |
|
self.out_queue.put((keypoints, scores)) |
|
self.buffer = [] |
|
|
|
keypoints, scores = None, None |
|
if not self.out_queue.empty(): |
|
keypoints, scores = self.out_queue.get() |
|
|
|
return keypoints, scores |
|
|
|
|
|
def __init__(self, |
|
model: str = None, |
|
mean: tuple = None, |
|
std: tuple = None, |
|
device: str = 'cuda', |
|
is_yolo_nas_pose = False, |
|
batch_size: int = 1): |
|
super().__init__(model, |
|
mean, |
|
std, |
|
device, |
|
is_yolo_nas_pose) |
|
|
|
self.batch_size = batch_size |
|
self.out_queue = Queue(maxsize=self.batch_size) |
|
self.buffer = [] |
|
|
|
def resize_to_fit_screen(image, screen_width, screen_height): |
|
|
|
h, w = image.shape[:2] |
|
|
|
|
|
aspect_ratio = w / h |
|
|
|
|
|
scale = min(screen_width / w, screen_height / h) |
|
|
|
|
|
new_width = int(w * scale) |
|
new_height = int(h * scale) |
|
|
|
|
|
resized_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
|
|
return resized_image |