openvino_notebooks / utils /engine3js.py
malvika2003's picture
Upload folder using huggingface_hub
db5855f verified
import math
from operator import itemgetter
import cv2
import numpy as np
from pythreejs import *
class Engine3js:
"""
The implementation of these interfaces depends on Pythreejs, so make
sure you install and authorize the Jupyter Widgets plug-in correctly.
Attributes:
view_width, view_height: Size of the view window.
position: Position of the camera.
lookAtPos: The point at which the camera looks at.
axis_size: The size of axis.
grid_length: grid size, length == width.
grid_num: grid number.
grid: If use grid.
axis: If use axis.
"""
def __init__(
self,
view_width=455,
view_height=256,
position=[300, 100, 0],
lookAtPos=[0, 0, 0],
axis_size=300,
grid_length=600,
grid_num=20,
grid=False,
axis=False,
):
self.view_width = view_width
self.view_height = view_height
self.position = position
# set the camera
self.cam = PerspectiveCamera(position=self.position, aspect=self.view_width / self.view_height)
self.cam.lookAt(lookAtPos)
# x,y,z axis
self.axis = AxesHelper(axis_size) # axes length
# set grid size
self.gridHelper = GridHelper(grid_length, grid_num)
# set scene
self.scene = Scene(
children=[
self.cam,
DirectionalLight(position=[3, 5, 1], intensity=0.6),
AmbientLight(intensity=0.5),
]
)
# add axis or grid
if grid:
self.scene.add(self.gridHelper)
if axis:
self.scene.add(self.axis)
# render the objects in scene
self.renderer = Renderer(
camera=self.cam,
scene=self.scene,
controls=[OrbitControls(controlling=self.cam)],
width=self.view_width,
height=self.view_height,
)
# display(renderer4)
def get_width(self):
return self.view_width
def plot(self):
self.renderer.render(self.scene, self.cam)
def scene_add(self, object):
self.scene.add(object)
def scene_remove(self, object):
self.scene.remove(object)
class Geometry:
"""
This is the geometry base class that defines buffer and material.
"""
def __init__(self, name="geometry"):
self.geometry = None
self.material = None
self.name = name
def get_Name():
return self.name
class Skeleton(Geometry):
"""
This is the class for drawing human body poses.
"""
def __init__(self, name="skeleton", lineWidth=3, body_edges=[]):
super(Skeleton, self).__init__(name)
self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth)
self.colorSet = BufferAttribute(
np.array(
[
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
],
dtype=np.float32,
),
normalized=False,
)
# self.geometry.attributes["color"] = self.colorSet
self.body_edges = body_edges
def __call__(self, poses_3d):
poses = []
for pose_position_tmp in poses_3d:
bones = []
for edge in self.body_edges:
# put pair of points as limbs
bones.append(pose_position_tmp[edge[0]])
bones.append(pose_position_tmp[edge[1]])
bones = np.asarray(bones, dtype=np.float32)
# You can find the api in https://github.com/jupyter-widgets/pythreejs
self.geometry = BufferGeometry(
attributes={
"position": BufferAttribute(bones, normalized=False),
# It defines limbs' color
"color": self.colorSet,
}
)
pose = LineSegments(self.geometry, self.material)
poses.append(pose)
# self.geometry.close()
return poses
def plot(self, pose_points=None):
return self.__call__(pose_points)
class Cloudpoint(Geometry):
"""
This is the class for drawing cloud points.
"""
def __init__(self, name="cloudpoint", points=[], point_size=5, line=None, points_color="blue"):
super(Cloudpoint, self).__init__(name)
self.material = PointsMaterial(size=point_size, color=points_color)
self.points = points
self.line = line
def __call__(self, points_3d):
self.geometry = BufferGeometry(
attributes={
"position": BufferAttribute(points_3d, normalized=False),
# It defines points' vertices' color
"color": BufferAttribute(
np.array(
[
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
],
dtype=np.float32,
),
normalized=False,
),
},
)
cloud_points = Points(self.geometry, self.material)
if self.line is not None:
g1 = BufferGeometry(
attributes={
"position": BufferAttribute(line, normalized=False),
# It defines limbs' color
"color": BufferAttribute(
# Here you can set vertex colors, if you set the 'color' option = vertexes
np.array(
[
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0],
[1, 0, 0],
[0, 1, 0],
],
dtype=np.float32,
),
normalized=False,
),
},
)
m1 = LineBasicMaterial(color="red", linewidth=3)
facemesh = LineSegments(g1, m1)
return [cloud_points, facemesh]
return cloud_points
def Box_bounding(Geometry):
def __init__(self, name="Box", lineWidth=3):
super(Box_bounding, self).__init__(name)
self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth)
self.edge = []
def __call__(self, points=None):
pass
class Pose:
num_kpts = 18
kpt_names = [
"neck",
"nose",
"l_sho",
"l_elb",
"l_wri",
"l_hip",
"l_knee",
"l_ank",
"r_sho",
"r_elb",
"r_wri",
"r_hip",
"r_knee",
"r_ank",
"r_eye",
"l_eye",
"r_ear",
"l_ear",
]
sigmas = (
np.array(
[
0.79,
0.26,
0.79,
0.72,
0.62,
1.07,
0.87,
0.89,
0.79,
0.72,
0.62,
1.07,
0.87,
0.89,
0.25,
0.25,
0.35,
0.35,
],
dtype=np.float32,
)
/ 10.0
)
vars = (sigmas * 2) ** 2
last_id = -1
color = [0, 224, 255]
def __init__(self, keypoints, confidence):
super().__init__()
self.keypoints = keypoints
self.confidence = confidence
found_keypoints = np.zeros((np.count_nonzero(keypoints[:, 0] != -1), 2), dtype=np.int32)
found_kpt_id = 0
for kpt_id in range(keypoints.shape[0]):
if keypoints[kpt_id, 0] == -1:
continue
found_keypoints[found_kpt_id] = keypoints[kpt_id]
found_kpt_id += 1
self.bbox = cv2.boundingRect(found_keypoints)
self.id = None
self.translation_filter = [
OneEuroFilter(freq=80, beta=0.01),
OneEuroFilter(freq=80, beta=0.01),
OneEuroFilter(freq=80, beta=0.01),
]
def update_id(self, id=None):
self.id = id
if self.id is None:
self.id = Pose.last_id + 1
Pose.last_id += 1
def filter(self, translation):
filtered_translation = []
for coordinate_id in range(3):
filtered_translation.append(self.translation_filter[coordinate_id](translation[coordinate_id]))
return filtered_translation
def get_similarity(a, b, threshold=0.5):
num_similar_kpt = 0
for kpt_id in range(Pose.num_kpts):
if a.keypoints[kpt_id, 0] != -1 and b.keypoints[kpt_id, 0] != -1:
distance = np.sum((a.keypoints[kpt_id] - b.keypoints[kpt_id]) ** 2)
area = max(a.bbox[2] * a.bbox[3], b.bbox[2] * b.bbox[3])
similarity = np.exp(-distance / (2 * (area + np.spacing(1)) * Pose.vars[kpt_id]))
if similarity > threshold:
num_similar_kpt += 1
return num_similar_kpt
def propagate_ids(previous_poses, current_poses, threshold=3):
"""Propagate poses ids from previous frame results. Id is propagated,
if there are at least `threshold` similar keypoints between pose from previous frame and current.
:param previous_poses: poses from previous frame with ids
:param current_poses: poses from current frame to assign ids
:param threshold: minimal number of similar keypoints between poses
:return: None
"""
current_poses_sorted_ids = list(range(len(current_poses)))
current_poses_sorted_ids = sorted(
current_poses_sorted_ids,
key=lambda pose_id: current_poses[pose_id].confidence,
reverse=True,
) # match confident poses first
mask = np.ones(len(previous_poses), dtype=np.int32)
for current_pose_id in current_poses_sorted_ids:
best_matched_id = None
best_matched_pose_id = None
best_matched_iou = 0
for previous_pose_id in range(len(previous_poses)):
if not mask[previous_pose_id]:
continue
iou = get_similarity(current_poses[current_pose_id], previous_poses[previous_pose_id])
if iou > best_matched_iou:
best_matched_iou = iou
best_matched_pose_id = previous_poses[previous_pose_id].id
best_matched_id = previous_pose_id
if best_matched_iou >= threshold:
mask[best_matched_id] = 0
else: # pose not similar to any previous
best_matched_pose_id = None
current_poses[current_pose_id].update_id(best_matched_pose_id)
if best_matched_pose_id is not None:
current_poses[current_pose_id].translation_filter = previous_poses[best_matched_id].translation_filter
AVG_PERSON_HEIGHT = 180
# pelvis (body center) is missing, id == 2
map_id_to_panoptic = [1, 0, 9, 10, 11, 3, 4, 5, 12, 13, 14, 6, 7, 8, 15, 16, 17, 18]
limbs = [[18, 17, 1], [16, 15, 1], [5, 4, 3], [8, 7, 6], [11, 10, 9], [14, 13, 12]]
def get_root_relative_poses(inference_results):
features, heatmap, paf_map = inference_results
upsample_ratio = 4
found_poses = extract_poses(heatmap[0:-1], paf_map, upsample_ratio)[0]
# scale coordinates to features space
found_poses[:, 0:-1:3] /= upsample_ratio
found_poses[:, 1:-1:3] /= upsample_ratio
poses_2d = []
num_kpt_panoptic = 19
num_kpt = 18
for pose_id in range(found_poses.shape[0]):
if found_poses[pose_id, 5] == -1: # skip pose if is not found neck
continue
pose_2d = np.ones(num_kpt_panoptic * 3 + 1, dtype=np.float32) * -1 # +1 for pose confidence
for kpt_id in range(num_kpt):
if found_poses[pose_id, kpt_id * 3 + 2] != -1:
x_2d, y_2d = found_poses[pose_id, kpt_id * 3 : kpt_id * 3 + 2]
conf = found_poses[pose_id, kpt_id * 3 + 2]
pose_2d[map_id_to_panoptic[kpt_id] * 3] = x_2d # just repacking
pose_2d[map_id_to_panoptic[kpt_id] * 3 + 1] = y_2d
pose_2d[map_id_to_panoptic[kpt_id] * 3 + 2] = conf
pose_2d[-1] = found_poses[pose_id, -1]
poses_2d.append(pose_2d)
keypoint_treshold = 0.1
poses_3d = np.ones((len(poses_2d), num_kpt_panoptic * 4), dtype=np.float32) * -1
for pose_id in range(len(poses_3d)):
if poses_2d[pose_id][2] > keypoint_treshold:
neck_2d = poses_2d[pose_id][:2].astype(int)
# read all pose coordinates at neck location
for kpt_id in range(num_kpt_panoptic):
map_3d = features[kpt_id * 3 : (kpt_id + 1) * 3]
poses_3d[pose_id][kpt_id * 4] = map_3d[0, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT
poses_3d[pose_id][kpt_id * 4 + 1] = map_3d[1, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT
poses_3d[pose_id][kpt_id * 4 + 2] = map_3d[2, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT
poses_3d[pose_id][kpt_id * 4 + 3] = poses_2d[pose_id][kpt_id * 3 + 2]
# refine keypoints coordinates at corresponding limbs locations
for limb in limbs:
for kpt_id_from in limb:
if poses_2d[pose_id][kpt_id_from * 3 + 2] > keypoint_treshold:
for kpt_id_where in limb:
kpt_from_2d = poses_2d[pose_id][kpt_id_from * 3 : kpt_id_from * 3 + 2].astype(int)
map_3d = features[kpt_id_where * 3 : (kpt_id_where + 1) * 3]
poses_3d[pose_id][kpt_id_where * 4] = map_3d[0, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT
poses_3d[pose_id][kpt_id_where * 4 + 1] = map_3d[1, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT
poses_3d[pose_id][kpt_id_where * 4 + 2] = map_3d[2, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT
break
return poses_3d, np.array(poses_2d), features.shape
previous_poses_2d = []
def parse_poses(inference_results, input_scale, stride, fx, is_video=False):
global previous_poses_2d
poses_3d, poses_2d, features_shape = get_root_relative_poses(inference_results)
poses_2d_scaled = []
for pose_2d in poses_2d:
num_kpt = (pose_2d.shape[0] - 1) // 3
pose_2d_scaled = np.ones(pose_2d.shape[0], dtype=np.float32) * -1 # +1 for pose confidence
for kpt_id in range(num_kpt):
if pose_2d[kpt_id * 3 + 2] != -1:
pose_2d_scaled[kpt_id * 3] = int(pose_2d[kpt_id * 3] * stride / input_scale)
pose_2d_scaled[kpt_id * 3 + 1] = int(pose_2d[kpt_id * 3 + 1] * stride / input_scale)
pose_2d_scaled[kpt_id * 3 + 2] = pose_2d[kpt_id * 3 + 2]
pose_2d_scaled[-1] = pose_2d[-1]
poses_2d_scaled.append(pose_2d_scaled)
if is_video: # track poses ids
current_poses_2d = []
for pose_id in range(len(poses_2d_scaled)):
pose_keypoints = np.ones((Pose.num_kpts, 2), dtype=np.int32) * -1
for kpt_id in range(Pose.num_kpts):
if poses_2d_scaled[pose_id][kpt_id * 3 + 2] != -1.0: # keypoint is found
pose_keypoints[kpt_id, 0] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 0])
pose_keypoints[kpt_id, 1] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 1])
pose = Pose(pose_keypoints, poses_2d_scaled[pose_id][-1])
current_poses_2d.append(pose)
propagate_ids(previous_poses_2d, current_poses_2d)
previous_poses_2d = current_poses_2d
translated_poses_3d = []
# translate poses
for pose_id in range(len(poses_3d)):
pose_3d = poses_3d[pose_id].reshape((-1, 4)).transpose()
pose_2d = poses_2d[pose_id][:-1].reshape((-1, 3)).transpose()
num_valid = np.count_nonzero(pose_2d[2] != -1)
pose_3d_valid = np.zeros((3, num_valid), dtype=np.float32)
pose_2d_valid = np.zeros((2, num_valid), dtype=np.float32)
valid_id = 0
for kpt_id in range(pose_3d.shape[1]):
if pose_2d[2, kpt_id] == -1:
continue
pose_3d_valid[:, valid_id] = pose_3d[0:3, kpt_id]
pose_2d_valid[:, valid_id] = pose_2d[0:2, kpt_id]
valid_id += 1
pose_2d_valid[0] = pose_2d_valid[0] - features_shape[2] / 2
pose_2d_valid[1] = pose_2d_valid[1] - features_shape[1] / 2
mean_3d = np.expand_dims(pose_3d_valid.mean(axis=1), axis=1)
mean_2d = np.expand_dims(pose_2d_valid.mean(axis=1), axis=1)
numerator = np.trace(
np.dot(
(pose_3d_valid[:2, :] - mean_3d[:2, :]).transpose(),
pose_3d_valid[:2, :] - mean_3d[:2, :],
)
).sum()
numerator = np.sqrt(numerator)
denominator = np.sqrt(
np.trace(
np.dot(
(pose_2d_valid[:2, :] - mean_2d[:2, :]).transpose(),
pose_2d_valid[:2, :] - mean_2d[:2, :],
)
).sum()
)
mean_2d = np.array([mean_2d[0, 0], mean_2d[1, 0], fx * input_scale / stride])
mean_3d = np.array([mean_3d[0, 0], mean_3d[1, 0], 0])
translation = numerator / denominator * mean_2d - mean_3d
if is_video:
translation = current_poses_2d[pose_id].filter(translation)
for kpt_id in range(19):
pose_3d[0, kpt_id] = pose_3d[0, kpt_id] + translation[0]
pose_3d[1, kpt_id] = pose_3d[1, kpt_id] + translation[1]
pose_3d[2, kpt_id] = pose_3d[2, kpt_id] + translation[2]
translated_poses_3d.append(pose_3d.transpose().reshape(-1))
return np.array(translated_poses_3d), np.array(poses_2d_scaled)
def get_alpha(rate=30, cutoff=1):
tau = 1 / (2 * math.pi * cutoff)
te = 1 / rate
return 1 / (1 + tau / te)
class LowPassFilter:
def __init__(self):
self.x_previous = None
def __call__(self, x, alpha=0.5):
if self.x_previous is None:
self.x_previous = x
return x
x_filtered = alpha * x + (1 - alpha) * self.x_previous
self.x_previous = x_filtered
return x_filtered
class OneEuroFilter:
def __init__(self, freq=15, mincutoff=1, beta=1, dcutoff=1):
self.freq = freq
self.mincutoff = mincutoff
self.beta = beta
self.dcutoff = dcutoff
self.filter_x = LowPassFilter()
self.filter_dx = LowPassFilter()
self.x_previous = None
self.dx = None
def __call__(self, x):
if self.dx is None:
self.dx = 0
else:
self.dx = (x - self.x_previous) * self.freq
dx_smoothed = self.filter_dx(self.dx, get_alpha(self.freq, self.dcutoff))
cutoff = self.mincutoff + self.beta * abs(dx_smoothed)
x_filtered = self.filter_x(x, get_alpha(self.freq, cutoff))
self.x_previous = x
return x_filtered
BODY_PARTS_KPT_IDS = [
[1, 2],
[1, 5],
[2, 3],
[3, 4],
[5, 6],
[6, 7],
[1, 8],
[8, 9],
[9, 10],
[1, 11],
[11, 12],
[12, 13],
[1, 0],
[0, 14],
[14, 16],
[0, 15],
[15, 17],
[2, 16],
[5, 17],
]
BODY_PARTS_PAF_IDS = (
[12, 13],
[20, 21],
[14, 15],
[16, 17],
[22, 23],
[24, 25],
[0, 1],
[2, 3],
[4, 5],
[6, 7],
[8, 9],
[10, 11],
[28, 29],
[30, 31],
[34, 35],
[32, 33],
[36, 37],
[18, 19],
[26, 27],
)
def linspace2d(start, stop, n=10):
points = 1 / (n - 1) * (stop - start)
return points[:, None] * np.arange(n) + start[:, None]
def extract_keypoints(heatmap, all_keypoints, total_keypoint_num):
heatmap[heatmap < 0.1] = 0
heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode="constant")
heatmap_center = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 1 : heatmap_with_borders.shape[1] - 1]
heatmap_left = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 2 : heatmap_with_borders.shape[1]]
heatmap_right = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 0 : heatmap_with_borders.shape[1] - 2]
heatmap_up = heatmap_with_borders[2 : heatmap_with_borders.shape[0], 1 : heatmap_with_borders.shape[1] - 1]
heatmap_down = heatmap_with_borders[0 : heatmap_with_borders.shape[0] - 2, 1 : heatmap_with_borders.shape[1] - 1]
heatmap_peaks = (heatmap_center > heatmap_left) & (heatmap_center > heatmap_right) & (heatmap_center > heatmap_up) & (heatmap_center > heatmap_down)
heatmap_peaks = heatmap_peaks[1 : heatmap_center.shape[0] - 1, 1 : heatmap_center.shape[1] - 1]
keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h)
keypoints = sorted(keypoints, key=itemgetter(0))
suppressed = np.zeros(len(keypoints), np.uint8)
keypoints_with_score_and_id = []
keypoint_num = 0
for i in range(len(keypoints)):
if suppressed[i]:
continue
for j in range(i + 1, len(keypoints)):
if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 + (keypoints[i][1] - keypoints[j][1]) ** 2) < 6:
suppressed[j] = 1
keypoint_with_score_and_id = (
keypoints[i][0],
keypoints[i][1],
heatmap[keypoints[i][1], keypoints[i][0]],
total_keypoint_num + keypoint_num,
)
keypoints_with_score_and_id.append(keypoint_with_score_and_id)
keypoint_num += 1
all_keypoints.append(keypoints_with_score_and_id)
return keypoint_num
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05):
pose_entries = []
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist])
for part_id in range(len(BODY_PARTS_PAF_IDS)):
part_pafs = pafs[BODY_PARTS_PAF_IDS[part_id]]
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]]
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]]
num_kpts_a = len(kpts_a)
num_kpts_b = len(kpts_b)
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part
continue
elif num_kpts_a == 0: # body part has just 'b' keypoints
for i in range(num_kpts_b):
num = 0
for j in range(len(pose_entries)): # check if already in some pose, was added by another body part
if pose_entries[j][kpt_b_id] == kpts_b[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx
pose_entry[-1] = 1 # num keypoints in pose
pose_entry[-2] = kpts_b[i][2] # pose score
pose_entries.append(pose_entry)
continue
elif num_kpts_b == 0: # body part has just 'a' keypoints
for i in range(num_kpts_a):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == kpts_a[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = kpts_a[i][3]
pose_entry[-1] = 1
pose_entry[-2] = kpts_a[i][2]
pose_entries.append(pose_entry)
continue
connections = []
for i in range(num_kpts_a):
kpt_a = np.array(kpts_a[i][0:2])
for j in range(num_kpts_b):
kpt_b = np.array(kpts_b[j][0:2])
mid_point = [(), ()]
mid_point[0] = (
int(round((kpt_a[0] + kpt_b[0]) * 0.5)),
int(round((kpt_a[1] + kpt_b[1]) * 0.5)),
)
mid_point[1] = mid_point[0]
vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]]
vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2)
if vec_norm == 0:
continue
vec[0] /= vec_norm
vec[1] /= vec_norm
cur_point_score = vec[0] * part_pafs[0, mid_point[0][1], mid_point[0][0]] + vec[1] * part_pafs[1, mid_point[1][1], mid_point[1][0]]
height_n = pafs.shape[1] // 2
success_ratio = 0
point_num = 10 # number of points to integration over paf
ratio = 0
if cur_point_score > -100:
passed_point_score = 0
passed_point_num = 0
x, y = linspace2d(kpt_a, kpt_b)
for point_idx in range(point_num):
px = int(x[point_idx])
py = int(y[point_idx])
paf = part_pafs[:, py, px]
cur_point_score = vec[0] * paf[0] + vec[1] * paf[1]
if cur_point_score > min_paf_score:
passed_point_score += cur_point_score
passed_point_num += 1
success_ratio = passed_point_num / point_num
if passed_point_num > 0:
ratio = passed_point_score / passed_point_num
ratio += min(height_n / vec_norm - 1, 0)
if ratio > 0 and success_ratio > 0.8:
score_all = ratio + kpts_a[i][2] + kpts_b[j][2]
connections.append([i, j, ratio, score_all])
if len(connections) > 0:
connections = sorted(connections, key=itemgetter(2), reverse=True)
num_connections = min(num_kpts_a, num_kpts_b)
has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32)
has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32)
filtered_connections = []
for row in range(len(connections)):
if len(filtered_connections) == num_connections:
break
i, j, cur_point_score = connections[row][0:3]
if not has_kpt_a[i] and not has_kpt_b[j]:
filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score])
has_kpt_a[i] = 1
has_kpt_b[j] = 1
connections = filtered_connections
if len(connections) == 0:
continue
if part_id == 0:
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))]
for i in range(len(connections)):
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0]
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1]
pose_entries[i][-1] = 2
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
elif part_id == 17 or part_id == 18:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1:
pose_entries[j][kpt_b_id] = connections[i][1]
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1:
pose_entries[j][kpt_a_id] = connections[i][0]
continue
else:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0]:
pose_entries[j][kpt_b_id] = connections[i][1]
num += 1
pose_entries[j][-1] += 1
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2]
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = connections[i][0]
pose_entry[kpt_b_id] = connections[i][1]
pose_entry[-1] = 2
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
pose_entries.append(pose_entry)
filtered_entries = []
for i in range(len(pose_entries)):
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2):
continue
filtered_entries.append(pose_entries[i])
pose_entries = np.asarray(filtered_entries)
return pose_entries, all_keypoints
def extract_poses(heatmaps, pafs, upsample_ratio):
heatmaps = np.transpose(heatmaps, (1, 2, 0))
pafs = np.transpose(pafs, (1, 2, 0))
heatmaps = cv2.resize(heatmaps, dsize=None, fx=upsample_ratio, fy=upsample_ratio)
pafs = cv2.resize(pafs, dsize=None, fx=upsample_ratio, fy=upsample_ratio)
heatmaps = np.transpose(heatmaps, (2, 0, 1))
pafs = np.transpose(pafs, (2, 0, 1))
num_keypoints = heatmaps.shape[0]
total_keypoints_num = 0
all_keypoints_by_type = []
for kpt_idx in range(num_keypoints):
total_keypoints_num += extract_keypoints(heatmaps[kpt_idx], all_keypoints_by_type, total_keypoints_num)
pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs)
found_poses = []
for pose_entry in pose_entries:
if len(pose_entry) == 0:
continue
pose_keypoints = np.ones((num_keypoints * 3 + 1), dtype=np.float32) * -1
for kpt_id in range(num_keypoints):
if pose_entry[kpt_id] != -1.0:
pose_keypoints[kpt_id * 3 + 0] = all_keypoints[int(pose_entry[kpt_id]), 0]
pose_keypoints[kpt_id * 3 + 1] = all_keypoints[int(pose_entry[kpt_id]), 1]
pose_keypoints[kpt_id * 3 + 2] = all_keypoints[int(pose_entry[kpt_id]), 2]
pose_keypoints[-1] = pose_entry[18]
found_poses.append(pose_keypoints)
if not found_poses:
return np.array(found_poses, dtype=np.float32).reshape((0, 0)), None
return np.array(found_poses, dtype=np.float32), None