pesi
/

rtmo / rtmo_gpu.py
Luigi's picture
adjust visualisation style, allow draw_bbox() to show person id
a9e21bc
import os
import numpy as np
from typing import List, Tuple, Union
import onnxruntime as ort
import cv2
from queue import Queue
PLUGIN_LIB_PATHS='libmmdeploy_tensorrt_ops.so'
os.environ['ORT_TENSORRT_EXTRA_PLUGIN_LIB_PATHS']=PLUGIN_LIB_PATHS
TRT_BACKEND='POLYGRAPHY'
DEBUG=False
# dictionary from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/skeleton/coco17.py
coco17 = dict(name='coco17',
keypoint_info={
0:
dict(name='nose', id=0, color=[51, 153, 255], swap=''),
1:
dict(name='left_eye',
id=1,
color=[51, 153, 255],
swap='right_eye'),
2:
dict(name='right_eye',
id=2,
color=[51, 153, 255],
swap='left_eye'),
3:
dict(name='left_ear',
id=3,
color=[51, 153, 255],
swap='right_ear'),
4:
dict(name='right_ear',
id=4,
color=[51, 153, 255],
swap='left_ear'),
5:
dict(name='left_shoulder',
id=5,
color=[0, 255, 0],
swap='right_shoulder'),
6:
dict(name='right_shoulder',
id=6,
color=[255, 128, 0],
swap='left_shoulder'),
7:
dict(name='left_elbow',
id=7,
color=[0, 255, 0],
swap='right_elbow'),
8:
dict(name='right_elbow',
id=8,
color=[255, 128, 0],
swap='left_elbow'),
9:
dict(name='left_wrist',
id=9,
color=[0, 255, 0],
swap='right_wrist'),
10:
dict(name='right_wrist',
id=10,
color=[255, 128, 0],
swap='left_wrist'),
11:
dict(name='left_hip',
id=11,
color=[0, 255, 0],
swap='right_hip'),
12:
dict(name='right_hip',
id=12,
color=[255, 128, 0],
swap='left_hip'),
13:
dict(name='left_knee',
id=13,
color=[0, 255, 0],
swap='right_knee'),
14:
dict(name='right_knee',
id=14,
color=[255, 128, 0],
swap='left_knee'),
15:
dict(name='left_ankle',
id=15,
color=[0, 255, 0],
swap='right_ankle'),
16:
dict(name='right_ankle',
id=16,
color=[255, 128, 0],
swap='left_ankle')
},
skeleton_info={
0:
dict(link=('left_ankle', 'left_knee'),
id=0,
color=[0, 255, 0]),
1:
dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255,
0]),
2:
dict(link=('right_ankle', 'right_knee'),
id=2,
color=[255, 128, 0]),
3:
dict(link=('right_knee', 'right_hip'),
id=3,
color=[255, 128, 0]),
4:
dict(link=('left_hip', 'right_hip'),
id=4,
color=[51, 153, 255]),
5:
dict(link=('left_shoulder', 'left_hip'),
id=5,
color=[51, 153, 255]),
6:
dict(link=('right_shoulder', 'right_hip'),
id=6,
color=[51, 153, 255]),
7:
dict(link=('left_shoulder', 'right_shoulder'),
id=7,
color=[51, 153, 255]),
8:
dict(link=('left_shoulder', 'left_elbow'),
id=8,
color=[0, 255, 0]),
9:
dict(link=('right_shoulder', 'right_elbow'),
id=9,
color=[255, 128, 0]),
10:
dict(link=('left_elbow', 'left_wrist'),
id=10,
color=[0, 255, 0]),
11:
dict(link=('right_elbow', 'right_wrist'),
id=11,
color=[255, 128, 0]),
12:
dict(link=('left_eye', 'right_eye'),
id=12,
color=[51, 153, 255]),
13:
dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]),
14:
dict(link=('nose', 'right_eye'), id=14, color=[51, 153,
255]),
15:
dict(link=('left_eye', 'left_ear'),
id=15,
color=[51, 153, 255]),
16:
dict(link=('right_eye', 'right_ear'),
id=16,
color=[51, 153, 255]),
17:
dict(link=('left_ear', 'left_shoulder'),
id=17,
color=[51, 153, 255]),
18:
dict(link=('right_ear', 'right_shoulder'),
id=18,
color=[51, 153, 255])
})
# functions from https://github.com/Tau-J/rtmlib/blob/4b29101d54b611048ef165277cebfffff3030074/rtmlib/visualization/draw.py#L71
def draw_mmpose(img,
keypoints,
scores,
keypoint_info,
skeleton_info,
kpt_thr=0.5,
radius=2,
line_width=2):
assert len(keypoints.shape) == 2
vis_kpt = [s >= kpt_thr for s in scores]
link_dict = {}
for i, kpt_info in keypoint_info.items():
kpt_color = tuple(kpt_info['color'])
link_dict[kpt_info['name']] = kpt_info['id']
kpt = keypoints[i]
if vis_kpt[i]:
img = cv2.circle(img, (int(kpt[0]), int(kpt[1])), int(radius),
kpt_color, -1)
for i, ske_info in skeleton_info.items():
link = ske_info['link']
pt0, pt1 = link_dict[link[0]], link_dict[link[1]]
if vis_kpt[pt0] and vis_kpt[pt1]:
link_color = ske_info['color']
kpt0 = keypoints[pt0]
kpt1 = keypoints[pt1]
img = cv2.line(img, (int(kpt0[0]), int(kpt0[1])),
(int(kpt1[0]), int(kpt1[1])),
link_color,
thickness=line_width)
return img
def draw_bbox(img, bboxes, bboxes_scores=None, color=None, person_id_list=None, line_width=2):
green = (0, 255, 0)
for i, bbox in enumerate(bboxes):
# Determine the color based on the score if no color is given
if color is None and bboxes_scores is not None:
# Scale the score to a color range (green to red)
score = bboxes_scores[i]
start_color = np.array([128,128,128],dtype=np.uint8)
end_color = np.array([128,255,128],dtype=np.uint8)
box_color = (1 - score) * start_color + score * end_color
else:
box_color = color if color is not None else end_color
# Draw the bounding box
img = cv2.rectangle(img, (int(bbox[0]), int(bbox[1])),
(int(bbox[2]), int(bbox[3])), box_color, line_width)
green_color = (0,255,0)
# Display the score at the top-right corner of the bounding box
if bboxes_scores is not None:
score_text = f'{bboxes_scores[i]:.2f}'
text_size, _ = cv2.getTextSize(score_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
text_x = int(bbox[2]) - text_size[0]
text_y = int(bbox[1]) + text_size[1]
img = cv2.putText(img, score_text, (text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 1, cv2.LINE_AA)
# Display Person ID on the top-right corner edge of the bounding box
if person_id_list is not None:
person_id_text = str(person_id_list[i])
text_size, _ = cv2.getTextSize(person_id_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
text_x = int(bbox[2]) - text_size[0]
text_y = int(bbox[1]) - text_size[1]
img = cv2.putText(img, person_id_text, (text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 2, cv2.LINE_AA)
return img
# with simplification to use onnxruntime only
def draw_skeleton(img,
keypoints,
scores,
kpt_thr=0.5,
radius=1,
line_width=2):
num_keypoints = keypoints.shape[1]
if num_keypoints == 17:
skeleton = 'coco17'
else:
raise NotImplementedError
skeleton_dict = eval(f'{skeleton}')
keypoint_info = skeleton_dict['keypoint_info']
skeleton_info = skeleton_dict['skeleton_info']
if len(keypoints.shape) == 2:
keypoints = keypoints[None, :, :]
scores = scores[None, :, :]
num_instance = keypoints.shape[0]
if skeleton in ['coco17']:
for i in range(num_instance):
img = draw_mmpose(img, keypoints[i], scores[i], keypoint_info,
skeleton_info, kpt_thr, radius, line_width)
else:
raise NotImplementedError
return img
def is_onnx_model(model_path):
try:
ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
return True
except Exception as e:
return False
def is_trt_engine(model_path):
try:
from polygraphy.backend.common import BytesFromPath
from polygraphy.backend.trt import EngineFromBytes
engine = EngineFromBytes(BytesFromPath(model_path))
return engine is not None
except Exception:
return False
def get_onnx_input_shapes(model_path):
from polygraphy.backend.onnx.loader import OnnxFromPath
from polygraphy.backend.onnx import infer_shapes
model = OnnxFromPath(model_path)()
model = infer_shapes(model)
input_shapes = {inp.name: inp.type.tensor_type.shape for inp in model.graph.input}
return {name: [dim.dim_value if dim.dim_value > 0 else 'Dynamic' for dim in shape_proto.dim]
for name, shape_proto in input_shapes.items()}
def get_trt_input_shapes(model_path):
input_shapes = {}
import tensorrt as trt
with open(model_path, "rb") as f, trt.Runtime(trt.Logger(trt.Logger.WARNING)) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
for binding in engine:
if engine.binding_is_input(binding):
input_shapes[binding] = engine.get_binding_shape(binding)
return input_shapes
def get_model_format_and_input_shape(model):
if is_onnx_model(model):
model_format = 'onnx'
input_shape = get_onnx_input_shapes(model)['input']
elif is_trt_engine(model):
model_format = 'engine'
from polygraphy.backend.trt import load_plugins
load_plugins(plugins=[PLUGIN_LIB_PATHS])
input_shape = get_trt_input_shapes(model)['input']
else:
raise TypeError("Your model is neither ONNX nor Engine !")
return model_format, input_shape
class RTMO_GPU(object):
def preprocess(self, img: np.ndarray):
"""Do preprocessing for RTMPose model inference.
Args:
img (np.ndarray): Input image in shape.
Returns:
tuple:
- resized_img (np.ndarray): Preprocessed image.
- center (np.ndarray): Center of image.
- scale (np.ndarray): Scale of image.
"""
if len(img.shape) == 3:
padded_img = np.ones(
(self.model_input_size[0], self.model_input_size[1], 3),
dtype=np.uint8) * 114
else:
padded_img = np.ones(self.model_input_size, dtype=np.uint8) * 114
ratio = min(self.model_input_size[0] / img.shape[0],
self.model_input_size[1] / img.shape[1])
resized_img = cv2.resize(
img,
(int(img.shape[1] * ratio), int(img.shape[0] * ratio)),
interpolation=cv2.INTER_LINEAR,
).astype(np.uint8)
padded_shape = (int(img.shape[0] * ratio), int(img.shape[1] * ratio))
padded_img[:padded_shape[0], :padded_shape[1]] = resized_img
# normalize image
if self.mean is not None:
self.mean = np.array(self.mean)
self.std = np.array(self.std)
padded_img = (padded_img - self.mean) / self.std
return padded_img, ratio
def postprocess(
self,
outputs: List[np.ndarray],
ratio: float = 1.,
) -> Tuple[np.ndarray, np.ndarray]:
"""Do postprocessing for RTMO model inference.
Args:
outputs (List[np.ndarray]): Outputs of RTMO model.
ratio (float): Ratio of preprocessing.
Returns:
tuple:
- final_boxes (np.ndarray): Final bounding boxes.
- final_scores (np.ndarray): Final scores.
"""
if not self.is_yolo_nas_pose:
# RTMO
det_outputs, pose_outputs = outputs
# onnx contains nms module
pack_dets = (det_outputs[0, :, :4], det_outputs[0, :, 4])
final_boxes, final_scores = pack_dets
final_boxes /= ratio
isscore = final_scores > 0.3
isbbox = [i for i in isscore]
final_boxes = final_boxes[isbbox]
final_boxes_scores = final_scores[isbbox]
# decode pose outputs
keypoints, scores = pose_outputs[0, :, :, :2], pose_outputs[0, :, :, 2]
keypoints = keypoints / ratio
keypoints = keypoints[isbbox]
scores = scores[isbbox]
else:
# NAS Pose
flat_predictions = outputs[0]
if flat_predictions.shape[0] > 0: # at least one person found
mask = flat_predictions[:, 0] == 0
final_boxes = flat_predictions[mask, 1:5]
final_boxes_scores = flat_predictions[mask, 5]
pred_joints = flat_predictions[mask, 6:].reshape((len(final_boxes), -1, 3))
keypoints, scores = pred_joints[:,:,:2], pred_joints[:,:,-1]
keypoints = keypoints / ratio
final_boxes = final_boxes / ratio
else: # no detection
final_boxes, final_boxes_scores, keypoints, scores = np.zeros((0, 4)),np.zeros((0, 1)),np.zeros((0, 17, 2)), np.zeros((0, 17))
return final_boxes, final_boxes_scores, keypoints, scores
def inference(self, img: np.ndarray):
"""Inference model.
Args:
img (np.ndarray): Input image in shape.
Returns:
outputs (np.ndarray): Output of RTMPose model.
"""
# build input to (1, 3, H, W)
img = img.transpose(2, 0, 1)
img = np.ascontiguousarray(img, dtype=np.float32 if not self.is_yolo_nas_pose else np.uint8)
input = img[None, :, :, :]
if self.model_format == 'onnx':
# Create an IO Binding object
io_binding = self.session.io_binding()
if not self.is_yolo_nas_pose:
# RTMO
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
else:
# NAS Pose, flat format
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='graph2_flat_predictions')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
else: # 'engine'
if TRT_BACKEND == 'POLYGRAPHY':
if not self.session.is_active:
self.session.activate()
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False)
outputs = [output for output in outputs.values()]
else: # PYCUDA
import pycuda.driver as cuda
# Set the input shape dynamically
input_shape = input.shape
self.context.set_binding_shape(0, input_shape)
# Ensure input_data matches the expected shape
np.copyto(self.inputs[0]['host'], input.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
# Run inference
self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
# Transfer predictions back from the GPU
for output in self.outputs:
cuda.memcpy_dtoh_async(output['host'], output['device'], self.stream)
# Synchronize the stream
self.stream.synchronize()
# Return only the output values (in their original shapes)
outputs = [out['host'].reshape(out['shape']) for out in self.outputs]
return outputs
def __exit__(self):
if self.model_format == 'engine' and TRT_BACKEND == 'POLYGRAPHY':
if self.session.is_active:
self.session.deactivate()
def __call__(self, image: np.ndarray):
image, ratio = self.preprocess(image)
outputs = self.inference(image)
bboxes, bboxes_scores, keypoints, scores = self.postprocess(outputs, ratio)
return bboxes, bboxes_scores, keypoints, scores
def __init__(self,
model: str = None,
mean: tuple = None,
std: tuple = None,
device: str = 'cuda',
is_yolo_nas_pose = False,
batch_size = 1,
plugin_path = PLUGIN_LIB_PATHS):
self.batch_size = batch_size
if not os.path.exists(model):
# If the file does not exist, raise FileNotFoundError
raise FileNotFoundError(f"The specified ONNX model file was not found: {model}")
self.model = model
self.model_format, self.input_shape = get_model_format_and_input_shape(self.model)
if self.model_format == 'onnx':
providers = {'cpu': 'CPUExecutionProvider',
'cuda': [
#('TensorrtExecutionProvider', {
# 'trt_fp16_enable':True,
# 'trt_engine_cache_enable':True,
# 'trt_engine_cache_path':'cache'}),
('CUDAExecutionProvider', {
'cudnn_conv_algo_search': 'DEFAULT',
'cudnn_conv_use_max_workspace': True
}),
'OpenVINOExecutionProvider',
'CPUExecutionProvider']}
self.session = ort.InferenceSession(path_or_bytes=model,
providers=providers[device])
else: # 'engine'
if TRT_BACKEND == 'POLYGRAPHY':
from polygraphy.backend.common import BytesFromPath
from polygraphy.backend.trt import EngineFromBytes, TrtRunner
engine = EngineFromBytes(BytesFromPath(model))
self.session = TrtRunner(engine)
else: # PYCUDA
import tensorrt as trt
import ctypes
import pycuda.autoinit
import pycuda.driver as cuda
self.TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
self.trt_model_path = model
self.plugin_path = plugin_path
# Load the custom plugin library
ctypes.CDLL(self.plugin_path)
# Load the TensorRT engine
with open(self.trt_model_path, 'rb') as f:
engine_data = f.read()
self.runtime = trt.Runtime(self.TRT_LOGGER)
self.engine = self.runtime.deserialize_cuda_engine(engine_data)
if self.engine is None:
raise RuntimeError("Failed to load the engine.")
self.context = self.engine.create_execution_context()
self.inputs = []
self.outputs = []
self.bindings = []
self.stream = cuda.Stream()
# Allocate memory for inputs and outputs
for binding in self.engine:
binding_index = self.engine.get_binding_index(binding)
shape = self.engine.get_binding_shape(binding_index)
if shape[0] == -1:
# Handle dynamic batch size by setting max_batch_size
shape[0] = self.batch_size
size = trt.volume(shape)
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
# Append the device buffer to device bindings.
self.bindings.append(int(device_mem))
# Append to the appropriate list.
if self.engine.binding_is_input(binding):
self.inputs.append({'host': host_mem, 'device': device_mem, 'shape': shape})
else:
self.outputs.append({'host': host_mem, 'device': device_mem, 'shape': shape})
self.model_input_size = self.input_shape[2:4] # B, C, H, W,
self.mean = mean
self.std = std
self.device = device
self.is_yolo_nas_pose = is_yolo_nas_pose
print(f'[I] Detected \'{self.model_format.upper()}\' model', end='')
print(f', \'{TRT_BACKEND.upper()}\' backend is chosen for inference' if self.model_format == 'engine' else '')
class RTMO_GPU_Batch(RTMO_GPU):
def preprocess_batch(self, imgs: List[np.ndarray]) -> Tuple[np.ndarray, List[float]]:
"""Process a batch of images for RTMPose model inference.
Args:
imgs (List[np.ndarray]): List of input images.
Returns:
tuple:
- batch_img (np.ndarray): Batch of preprocessed images.
- ratios (List[float]): Ratios used for preprocessing each image.
"""
batch_img = []
ratios = []
for img in imgs:
preprocessed_img, ratio = super().preprocess(img)
batch_img.append(preprocessed_img)
ratios.append(ratio)
# Stack along the first dimension to create a batch
batch_img = np.stack(batch_img, axis=0)
return batch_img, ratios
def inference(self, batch_img: np.ndarray):
"""Override to handle batch inference.
Args:
batch_img (np.ndarray): Batch of preprocessed images.
Returns:
outputs (List[np.ndarray]): Outputs of RTMPose model for each image.
"""
batch_img = batch_img.transpose(0, 3, 1, 2) # NCHW format
batch_img = np.ascontiguousarray(batch_img, dtype=np.float32)
input = batch_img
if self.model_format == 'onnx':
# Create an IO Binding object
io_binding = self.session.io_binding()
if not self.is_yolo_nas_pose:
# RTMO
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='dets')
io_binding.bind_output(name='keypoints')
else:
# NAS Pose, flat format
io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data)
io_binding.bind_output(name='graph2_flat_predictions')
# Run inference with IO Binding
self.session.run_with_iobinding(io_binding)
# Retrieve the outputs from the IO Binding object
outputs = [output.numpy() for output in io_binding.get_outputs()]
else: # 'engine'
if TRT_BACKEND == 'POLYGRAPHY':
if not self.session.is_active:
self.session.activate()
outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False)
outputs = [output for output in outputs.values()]
else: # PYCUDA
import pycuda.driver as cuda
# Set the input shape dynamically
input_shape = input.shape
self.context.set_binding_shape(0, input_shape)
# Ensure input_data matches the expected shape
np.copyto(self.inputs[0]['host'], input.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
# Run inference
self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
# Transfer predictions back from the GPU
for output in self.outputs:
cuda.memcpy_dtoh_async(output['host'], output['device'], self.stream)
# Synchronize the stream
self.stream.synchronize()
# Return only the output values (in their original shapes)
outputs = [out['host'].reshape(out['shape']) for out in self.outputs]
return outputs
def postprocess_batch(
self,
outputs: List[np.ndarray],
ratios: List[float]
) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Process outputs for a batch of images.
Args:
outputs (List[np.ndarray]): Outputs from the model for each image.
ratios (List[float]): Ratios used for preprocessing each image.
Returns:
List[Tuple[np.ndarray, np.ndarray]]: keypoints and scores for each image.
"""
batch_keypoints = []
batch_scores = []
batch_bboxes = []
batch_bboxes_scores = []
b_dets, b_keypoints = outputs
for i, ratio in enumerate(ratios):
output = [np.expand_dims(b_dets[i], axis=0), np.expand_dims(b_keypoints[i],axis=0)]
bboxes, bboxes_scores, keypoints, scores = super().postprocess(output, ratio)
batch_keypoints.append(keypoints)
batch_scores.append(scores)
batch_bboxes.append(bboxes)
batch_bboxes_scores.append(bboxes_scores)
return batch_bboxes, batch_bboxes_scores, batch_keypoints, batch_scores
def __batch_call__(self, images: List[np.ndarray]):
batch_img, ratios = self.preprocess_batch(images)
outputs = self.inference(batch_img)
bboxes, bboxes_scores, keypoints, scores = self.postprocess_batch(outputs, ratios)
return bboxes, bboxes_scores, keypoints, scores
def free_unused_buffers(self, activate_cameras_ids: List):
for camera_id in list(self.buffers.keys()):
if camera_id not in activate_cameras_ids:
del self.buffers[camera_id]
del self.in_queues[camera_id]
del self.out_queues[camera_id]
if DEBUG:
print(f'RTMO buffers to camera "{camera_id}" got freed.', flush=True)
def __call__(self, image: np.array, camera_id = 0):
# initialize dedicated buffers & queues for camera with id "camera_id"
if camera_id not in self.buffers:
self.buffers[camera_id] = []
self.in_queues[camera_id] = Queue(maxsize=self.batch_size)
self.out_queues[camera_id] = Queue(maxsize=self.batch_size)
if DEBUG:
print(f'RTMO buffers to camera "{camera_id}" are created.', flush=True)
in_queue = self.in_queues[camera_id]
out_queue = self.out_queues[camera_id]
self.buffers[camera_id].append(image)
in_queue.put(image)
if len(self.buffers[camera_id]) == self.batch_size:
b_bboxes, b_bboxes_scores, b_keypoints, b_scores = self.__batch_call__(self.buffers[camera_id])
for i, (keypoints, scores) in enumerate(zip(b_keypoints, b_scores)):
bboxes = b_bboxes[i]
bboxes_scores = b_bboxes_scores[i]
out_queue.put((bboxes, bboxes_scores, keypoints, scores))
self.buffers[camera_id] = []
frame, bboxes, bboxes_scores, keypoints, scores = None, None, None, None, None
if not out_queue.empty():
bboxes, bboxes_scores, keypoints, scores = out_queue.get()
frame = in_queue.get()
return frame, bboxes, bboxes_scores, keypoints, scores
def __init__(self,
model: str = None,
mean: tuple = None,
std: tuple = None,
device: str = 'cuda',
is_yolo_nas_pose = False,
plugin_path = PLUGIN_LIB_PATHS,
batch_size: int = 1):
super().__init__(model,
mean,
std,
device,
is_yolo_nas_pose,
batch_size,
plugin_path)
self.in_queues = dict()
self.out_queues = dict()
self.buffers = dict()
def resize_to_fit_screen(image, screen_width, screen_height):
# Get the dimensions of the image
h, w = image.shape[:2]
# Calculate the aspect ratio of the image
aspect_ratio = w / h
# Determine the scaling factor
scale = min(screen_width / w, screen_height / h)
# Calculate the new dimensions
new_width = int(w * scale)
new_height = int(h * scale)
# Resize the image
resized_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
return resized_image