pesi
/

rtmo / rtmo_gpu.py
Luigi
Unify batched and non-batched versions
42892de
raw
history blame
22.9 kB
import os
import numpy as np
from typing import List, Tuple
import onnxruntime as ort
import cv2
from queue import Queue
os.environ['ORT_TENSORRT_EXTRA_PLUGIN_LIB_PATHS']='libmmdeploy_tensorrt_ops.so'
# dictionary from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/skeleton/coco17.py
coco17 = dict(name='coco17',
keypoint_info={
0:
dict(name='nose', id=0, color=[51, 153, 255], swap=''),
1:
dict(name='left_eye',
id=1,
color=[51, 153, 255],
swap='right_eye'),
2:
dict(name='right_eye',
id=2,
color=[51, 153, 255],
swap='left_eye'),
3:
dict(name='left_ear',
id=3,
color=[51, 153, 255],
swap='right_ear'),
4:
dict(name='right_ear',
id=4,
color=[51, 153, 255],
swap='left_ear'),
5:
dict(name='left_shoulder',
id=5,
color=[0, 255, 0],
swap='right_shoulder'),
6:
dict(name='right_shoulder',
id=6,
color=[255, 128, 0],
swap='left_shoulder'),
7:
dict(name='left_elbow',
id=7,
color=[0, 255, 0],
swap='right_elbow'),
8:
dict(name='right_elbow',
id=8,
color=[255, 128, 0],
swap='left_elbow'),
9:
dict(name='left_wrist',
id=9,
color=[0, 255, 0],
swap='right_wrist'),
10:
dict(name='right_wrist',
id=10,
color=[255, 128, 0],
swap='left_wrist'),
11:
dict(name='left_hip',
id=11,
color=[0, 255, 0],
swap='right_hip'),
12:
dict(name='right_hip',
id=12,
color=[255, 128, 0],
swap='left_hip'),
13:
dict(name='left_knee',
id=13,
color=[0, 255, 0],
swap='right_knee'),
14:
dict(name='right_knee',
id=14,
color=[255, 128, 0],
swap='left_knee'),
15:
dict(name='left_ankle',
id=15,
color=[0, 255, 0],
swap='right_ankle'),
16:
dict(name='right_ankle',
id=16,
color=[255, 128, 0],
swap='left_ankle')
},
skeleton_info={
0:
dict(link=('left_ankle', 'left_knee'),
id=0,
color=[0, 255, 0]),
1:
dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255,
0]),
2:
dict(link=('right_ankle', 'right_knee'),
id=2,
color=[255, 128, 0]),
3:
dict(link=('right_knee', 'right_hip'),
id=3,
color=[255, 128, 0]),
4:
dict(link=('left_hip', 'right_hip'),
id=4,
color=[51, 153, 255]),
5:
dict(link=('left_shoulder', 'left_hip'),
id=5,
color=[51, 153, 255]),
6:
dict(link=('right_shoulder', 'right_hip'),
id=6,
color=[51, 153, 255]),
7:
dict(link=('left_shoulder', 'right_shoulder'),
id=7,
color=[51, 153, 255]),
8:
dict(link=('left_shoulder', 'left_elbow'),
id=8,
color=[0, 255, 0]),
9:
dict(link=('right_shoulder', 'right_elbow'),
id=9,
color=[255, 128, 0]),
10:
dict(link=('left_elbow', 'left_wrist'),
id=10,
color=[0, 255, 0]),
11:
dict(link=('right_elbow', 'right_wrist'),
id=11,
color=[255, 128, 0]),
12:
dict(link=('left_eye', 'right_eye'),
id=12,
color=[51, 153, 255]),
13:
dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]),
14:
dict(link=('nose', 'right_eye'), id=14, color=[51, 153,
255]),
15:
dict(link=('left_eye', 'left_ear'),
id=15,
color=[51, 153, 255]),
16:
dict(link=('right_eye', 'right_ear'),
id=16,
color=[51, 153, 255]),
17:
dict(link=('left_ear', 'left_shoulder'),
id=17,
color=[51, 153, 255]),
18:
dict(link=('right_ear', 'right_shoulder'),
id=18,
color=[51, 153, 255])
})
# functions from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/draw.py#L71
def draw_mmpose(img,
keypoints,
scores,
keypoint_info,
skeleton_info,
kpt_thr=0.5,
radius=2,
line_width=2):
assert len(keypoints.shape) == 2
vis_kpt = [s >= kpt_thr for s in scores]
link_dict = {}
for i, kpt_info in keypoint_info.items():
kpt_color = tuple(kpt_info['color'])
link_dict[kpt_info['name']] = kpt_info['id']
kpt = keypoints[i]
if vis_kpt[i]:
img = cv2.circle(img, (int(kpt[0]), int(kpt[1])), int(radius),
kpt_color, -1)
for i, ske_info in skeleton_info.items():
link = ske_info['link']
pt0, pt1 = link_dict[link[0]], link_dict[link[1]]
if vis_kpt[pt0] and vis_kpt[pt1]:
link_color = ske_info['color']
kpt0 = keypoints[pt0]
kpt1 = keypoints[pt1]
img = cv2.line(img, (int(kpt0[0]), int(kpt0[1])),
(int(kpt1[0]), int(kpt1[1])),
link_color,
thickness=line_width)
return img
# with simplification to use onnxruntime only
def draw_skeleton(img,
keypoints,
scores,
kpt_thr=0.5,
radius=2,
line_width=2):
num_keypoints = keypoints.shape[1]
if num_keypoints == 17:
skeleton = 'coco17'
else:
raise NotImplementedError
skeleton_dict = eval(f'{skeleton}')
keypoint_info = skeleton_dict['keypoint_info']
skeleton_info = skeleton_dict['skeleton_info']
if len(keypoints.shape) == 2:
keypoints = keypoints[None, :, :]
scores = scores[None, :, :]
num_instance = keypoints.shape[0]
if skeleton in ['coco17']:
for i in range(num_instance):
img = draw_mmpose(img, keypoints[i], scores[i], keypoint_info,
skeleton_info, kpt_thr, radius, line_width)
else:
raise NotImplementedError
return img
def is_onnx_model(model_path):
try:
ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
return True
except Exception as e:
return False
def is_trt_engine(model_path):
try:
from polygraphy.backend.common import BytesFromPath
from polygraphy.backend.trt import EngineFromBytes
engine = EngineFromBytes(BytesFromPath(model_path))
return engine is not None
except Exception:
return False
def get_onnx_input_shapes(model_path):
from polygraphy.backend.onnx.loader import OnnxFromPath
from polygraphy.backend.onnx import infer_shapes
model = OnnxFromPath(model_path)()
model = infer_shapes(model)
input_shapes = {inp.name: inp.type.tensor_type.shape for inp in model.graph.input}
return {name: [dim.dim_value if dim.dim_value > 0 else 'Dynamic' for dim in shape_proto.dim]
for name, shape_proto in input_shapes.items()}
def get_trt_input_shapes(model_path):
input_shapes = {}
import tensorrt as trt
with open(model_path, "rb") as f, trt.Runtime(trt.Logger(trt.Logger.WARNING)) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
for binding in engine:
if engine.binding_is_input(binding):
input_shapes[binding] = engine.get_binding_shape(binding)
return input_shapes
class RTMO_GPU(object):
def preprocess(self, img: np.ndarray):
"""Do preprocessing for RTMPose model inference.
Args:
img (np.ndarray): Input image in shape.
Returns:
tuple:
- resized_img (np.ndarray): Preprocessed image.
- center (np.ndarray): Center of image.
- scale (np.ndarray): Scale of image.
"""
if len(img.shape) == 3:
padded_img = np.ones(
(self.model_input_size[0], self.model_input_size[1], 3),
dtype=np.uint8) * 114
else:
padded_img = np.ones(self.model_input_size, dtype=np.uint8) * 114
ratio = min(self.model_input_size[0] / img.shape[0],
self.model_input_size[1] / img.shape[1])
resized_img = cv2.resize(
img,
(int(img.shape[1] * ratio), int(img.shape[0] * ratio)),
interpolation=cv2.INTER_LINEAR,
).astype(np.uint8)
padded_shape = (int(img.shape[0] * ratio), int(img.shape[1] * ratio))
padded_img[:padded_shape[0], :padded_shape[1]] = resized_img
# normalize image
if self.mean is not None:
self.mean = np.array(self.mean)
self.std = np.array(self.std)
padded_img = (padded_img - self.mean) / self.std
return padded_img, ratio
def postprocess(
self,
outputs: List[np.ndarray],
ratio: float = 1.,
) -> Tuple[np.ndarray, np.ndarray]:
"""Do postprocessing for RTMO model inference.
Args:
outputs (List[np.ndarray]): Outputs of RTMO model.
ratio (float): Ratio of preprocessing.
Returns:
tuple:
- final_boxes (np.ndarray): Final bounding boxes.
- final_scores (np.ndarray): Final scores.
"""
if not self.is_yolo_nas_pose:
# RTMO
det_outputs, pose_outputs = outputs
# onnx contains nms module
pack_dets = (det_outputs[0, :, :4], det_outputs[0, :, 4])
final_boxes, final_scores = pack_dets
final_boxes /= ratio
isscore = final_scores > 0.3
isbbox = [i for i in isscore]
# final_boxes = final_boxes[isbbox]
# decode pose outputs
keypoints, scores = pose_outputs[0, :, :, :2], pose_outputs[0, :, :, 2]
keypoints = keypoints / ratio
keypoints = keypoints[isbbox]
scores = scores[isbbox]
else:
# NAS Pose
flat_predictions = outputs[0]
if flat_predictions.shape[0] > 0: # at least one person found
mask = flat_predictions[:, 0] == 0
pred_bboxes = flat_predictions[mask, 1:5]
pred_joints = flat_predictions[mask, 6:].reshape((len(pred_bboxes), -1, 3))
keypoints, scores = pred_joints[:,:,:2], pred_joints[:,:,-1]
keypoints = keypoints / ratio
else: # no detection
keypoints, scores = np.zeros((0, 17, 2)), np.zeros((0, 17))
return keypoints, scores
def inference(self, img: np.ndarray):
"""Inference model.
Args:
img (np.ndarray): Input image in shape.
Returns:
outputs (np.ndarray): Output of RTMPose model.
"""
# build input to (1, 3, H, W)
img = img.transpose(2, 0, 1)
img = np.ascontiguousarray(img, dtype=np.float32 if not self.is_yolo_nas_pose else np.uint8)
input = img[None, :, :, :]
if self.model_format == 'onnx':
# Create an IO Binding object
io_binding = self.session.io_binding()
if not self.is_yolo_nas_pose:
# RTMO
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
else:
# NAS Pose, flat format
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='graph2_flat_predictions')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
else: # 'engine'
if not self.session.is_active:
self.session.activate()
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False)
outputs = [output for output in outputs.values()]
return outputs
def __exit__(self):
if self.model_format == 'engine':
if self.session.is_active:
self.session.deactivate()
def __call__(self, image: np.ndarray):
image, ratio = self.preprocess(image)
outputs = self.inference(image)
keypoints, scores = self.postprocess(outputs, ratio)
return keypoints, scores
def __init__(self,
model: str = None,
mean: tuple = None,
std: tuple = None,
device: str = 'cuda',
is_yolo_nas_pose = False):
if not os.path.exists(model):
# If the file does not exist, raise FileNotFoundError
raise FileNotFoundError(f"The specified ONNX model file was not found: {model}")
self.model = model
if is_onnx_model(model):
self.model_format = 'onnx'
self.input_shape = get_onnx_input_shapes(self.model)['input']
elif is_trt_engine(model):
self.model_format = 'engine'
from polygraphy.backend.common import BytesFromPath
from polygraphy.backend.trt import EngineFromBytes, TrtRunner, load_plugins
load_plugins(plugins=['libmmdeploy_tensorrt_ops.so'])
self.input_shape = get_trt_input_shapes(self.model)['input']
else:
raise TypeError("Your model is neither ONNX nor Engine !")
if self.model_format == 'onnx':
providers = {'cpu': 'CPUExecutionProvider',
'cuda': [
#('TensorrtExecutionProvider', {
# 'trt_fp16_enable':True,
# 'trt_engine_cache_enable':True,
# 'trt_engine_cache_path':'cache'}),
('CUDAExecutionProvider', {
'cudnn_conv_algo_search': 'DEFAULT',
'cudnn_conv_use_max_workspace': True
}),
'OpenVINOExecutionProvider',
'CPUExecutionProvider']}
self.session = ort.InferenceSession(path_or_bytes=model,
providers=providers[device])
else: # 'engine'
engine = EngineFromBytes(BytesFromPath(model))
self.session = TrtRunner(engine)
self.model_input_size = self.input_shape[2:4] # B, C, H, W,
self.mean = mean
self.std = std
self.device = device
self.is_yolo_nas_pose = is_yolo_nas_pose
class RTMO_GPU_Batch(RTMO_GPU):
def preprocess_batch(self, imgs: List[np.ndarray]) -> Tuple[np.ndarray, List[float]]:
"""Process a batch of images for RTMPose model inference.
Args:
imgs (List[np.ndarray]): List of input images.
Returns:
tuple:
- batch_img (np.ndarray): Batch of preprocessed images.
- ratios (List[float]): Ratios used for preprocessing each image.
"""
batch_img = []
ratios = []
for img in imgs:
preprocessed_img, ratio = super().preprocess(img)
batch_img.append(preprocessed_img)
ratios.append(ratio)
# Stack along the first dimension to create a batch
batch_img = np.stack(batch_img, axis=0)
return batch_img, ratios
def inference(self, batch_img: np.ndarray):
"""Override to handle batch inference.
Args:
batch_img (np.ndarray): Batch of preprocessed images.
Returns:
outputs (List[np.ndarray]): Outputs of RTMPose model for each image.
"""
batch_img = batch_img.transpose(0, 3, 1, 2) # NCHW format
batch_img = np.ascontiguousarray(batch_img, dtype=np.float32)
input = batch_img
if self.model_format == 'onnx':
# Create an IO Binding object
io_binding = self.session.io_binding()
if not self.is_yolo_nas_pose:
# RTMO
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
else:
# NAS Pose, flat format
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='graph2_flat_predictions')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
else: # 'engine'
if not self.session.is_active:
self.session.activate()
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False)
outputs = [output for output in outputs.values()]
return outputs
def postprocess_batch(
self,
outputs: List[np.ndarray],
ratios: List[float]
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Process outputs for a batch of images.
Args:
outputs (List[np.ndarray]): Outputs from the model for each image.
ratios (List[float]): Ratios used for preprocessing each image.
Returns:
List[Tuple[np.ndarray, np.ndarray]]: keypoints and scores for each image.
"""
batch_keypoints = []
batch_scores = []
b_dets, b_keypoints = outputs
for i, ratio in enumerate(ratios):
output = [np.expand_dims(b_dets[i], axis=0), np.expand_dims(b_keypoints[i],axis=0)]
keypoints, scores = super().postprocess(output, ratio)
batch_keypoints.append(keypoints)
batch_scores.append(scores)
return batch_keypoints, batch_scores
def __batch_call__(self, images: List[np.ndarray]):
batch_img, ratios = self.preprocess_batch(images)
outputs = self.inference(batch_img)
keypoints, scores = self.postprocess_batch(outputs, ratios)
return keypoints, scores
def __call__(self, image: np.array):
self.buffer.append(image)
if len(self.buffer) == self.batch_size:
b_keypoints, b_scores = self.__batch_call__(self.buffer)
for keypoints, scores in zip(b_keypoints, b_scores):
self.out_queue.put((keypoints, scores))
self.buffer = []
keypoints, scores = None, None
if not self.out_queue.empty():
keypoints, scores = self.out_queue.get()
return keypoints, scores
def __init__(self,
model: str = None,
mean: tuple = None,
std: tuple = None,
device: str = 'cuda',
is_yolo_nas_pose = False,
batch_size: int = 1):
super().__init__(model,
mean,
std,
device,
is_yolo_nas_pose)
self.batch_size = batch_size
self.out_queue = Queue(maxsize=self.batch_size)
self.buffer = []
def resize_to_fit_screen(image, screen_width, screen_height):
# Get the dimensions of the image
h, w = image.shape[:2]
# Calculate the aspect ratio of the image
aspect_ratio = w / h
# Determine the scaling factor
scale = min(screen_width / w, screen_height / h)
# Calculate the new dimensions
new_width = int(w * scale)
new_height = int(h * scale)
# Resize the image
resized_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
return resized_image