Spaces:
Runtime error
Runtime error
import numpy as np | |
from scipy.spatial import distance as dist | |
from utils.labels import pose_id_part, pose_id_part_openpose, rev_pose_id_part_openpose, rev_pose_id_part | |
import cv2 | |
import os | |
import json | |
def rescale_bb(boxes, pad, im_width, im_height): | |
""" | |
Modify in place the bounding box coordinates (percentage) to the new image width and height | |
Args: | |
:boxes (numpy.ndarray): Array of bounding box coordinates expressed in percentage [y_min, x_min, y_max, x_max] | |
:pad (tuple): The first element represents the right padding (applied by resize_preserving_ar() function); | |
the second element represents the bottom padding (applied by resize_preserving_ar() function) and | |
the third element is a tuple that is the shape of the image after resizing without the padding (this is useful for | |
the coordinates changes) | |
:im_width (int): The new image width | |
:im_height (int): The new image height | |
Returns: | |
""" | |
right_padding = pad[0] | |
bottom_padding = pad[1] | |
if bottom_padding != 0: | |
for box in boxes: | |
y_min, y_max = box[0] * im_height, box[2] * im_height # to pixels | |
box[0], box[2] = y_min / (im_height - pad[1]), y_max / (im_height - pad[1]) # back to percentage | |
if right_padding != 0: | |
for box in boxes: | |
x_min, x_max = box[1] * im_width, box[3] * im_width # to pixels | |
box[1], box[3] = x_min / (im_width - pad[0]), x_max / (im_width - pad[0]) # back to percentage | |
def rescale_key_points(key_points, pad, im_width, im_height): | |
""" | |
Modify in place the bounding box coordinates (percentage) to the new image width and height | |
Args: | |
:key_points (numpy.ndarray): Array of bounding box coordinates expressed in percentage [y_min, x_min, y_max, x_max] | |
:pad (tuple): The first element represents the right padding (applied by resize_preserving_ar() function); | |
the second element represents the bottom padding (applied by resize_preserving_ar() function) and | |
the third element is a tuple that is the shape of the image after resizing without the padding (this is useful for | |
the coordinates changes) | |
:im_width (int): The new image width | |
:im_height (int): The new image height | |
Returns: | |
""" | |
right_padding = pad[0] | |
bottom_padding = pad[1] | |
if bottom_padding != 0: | |
for aux in key_points: | |
for point in aux: # x 1 y 0 | |
y = point[0] * im_height | |
point[0] = y / (im_height - pad[1]) | |
if right_padding != 0: | |
for aux in key_points: | |
for point in aux: | |
x = point[1] * im_width | |
point[1] = x / (im_width - pad[0]) | |
def change_coordinates_aspect_ratio(aux_key_points_array, img_person, img_person_resized): | |
""" | |
Args: | |
: | |
Returns: | |
: | |
""" | |
aux_key_points_array_ratio = [] | |
ratio_h, ratio_w = img_person.shape[0] / (img_person_resized.shape[1]), img_person.shape[1] / (img_person_resized.shape[2]) # shape 0 batch 1 | |
for elem in aux_key_points_array: | |
aux = np.zeros(3) | |
aux[0] = int((elem[0]) * ratio_h) | |
aux[1] = int(elem[1] * ratio_h) | |
aux[2] = int(elem[2]) | |
aux_key_points_array_ratio.append(aux) | |
aux_key_points_array_ratio = np.array(aux_key_points_array_ratio, dtype=int) | |
return aux_key_points_array_ratio | |
def parse_output_pose(heatmaps, offsets, threshold): | |
""" | |
Parse the output pose (auxiliary function for tflite models) | |
Args: | |
: | |
Returns: | |
: | |
""" | |
# | |
# heatmaps: 9x9x17 probability of appearance of each keypoint in the particular part of the image (9,9) -> used to locate position of the joints | |
# offsets: 9x9x34 used for calculation of the keypoint's position (first 17 x coords, the second 17 y coords) | |
# | |
joint_num = heatmaps.shape[-1] | |
pose_kps = np.zeros((joint_num, 3), np.uint32) | |
for i in range(heatmaps.shape[-1]): | |
joint_heatmap = heatmaps[..., i] | |
max_val_pos = np.squeeze(np.argwhere(joint_heatmap == np.max(joint_heatmap))) | |
remap_pos = np.array(max_val_pos / 8 * 257, dtype=np.int32) | |
pose_kps[i, 0] = int(remap_pos[0] + offsets[max_val_pos[0], max_val_pos[1], i]) | |
pose_kps[i, 1] = int(remap_pos[1] + offsets[max_val_pos[0], max_val_pos[1], i + joint_num]) | |
max_prob = np.max(joint_heatmap) | |
if max_prob > threshold: | |
if pose_kps[i, 0] < 257 and pose_kps[i, 1] < 257: | |
pose_kps[i, 2] = 1 | |
return pose_kps | |
def retrieve_xyz_from_detection(points_list, point_cloud_img): | |
""" | |
Retrieve the xyz of the list of points passed as input (if we have the point cloud of the image) | |
Args: | |
:points_list (list): list of points for which we want to retrieve xyz information | |
:point_cloud_img (numpy.ndarray): numpy array containing XYZRGBA information of the image | |
Returns: | |
:xyz (list): list of lists of 3D points with XYZ information (left camera origin (0,0,0)) | |
""" | |
xyz = [[point_cloud_img[:, :, 0][point[1], point[0]], point_cloud_img[:, :, 1][point[1], point[0]], point_cloud_img[:, :, 2][point[1], point[0]]] | |
for point in points_list] | |
return xyz | |
def retrieve_xyz_pose_points(point_cloud_image, key_points_score, key_points): | |
"""Retrieve the key points from the point cloud to get the XYZ position in the 3D space | |
Args: | |
:point_cloud_image (numpy.ndarray): | |
:key_points_score (list): | |
:key_points (list): | |
Returns: | |
:xyz_pose: a list of lists representing the XYZ 3D coordinates of each key point (j is the index number of the id pose) | |
""" | |
xyz_pose = [] | |
for i in range(len(key_points_score)): | |
xyz_pose_aux = [] | |
for j in range(len(key_points_score[i])): | |
# if key_points_score[i][j] > threshold:# and j < 5: | |
x, y = int(key_points[i][j][0] * point_cloud_image.shape[0]) - 1, int(key_points[i][j][1] * point_cloud_image.shape[1]) - 1 | |
xyz_pose_aux.append([point_cloud_image[x, y, 0], point_cloud_image[x, y, 1], point_cloud_image[x, y, 2], key_points_score[i][j]]) | |
xyz_pose.append(xyz_pose_aux) | |
return xyz_pose | |
def compute_distance(points_list, min_distance=1.5): | |
""" | |
Compute the distance between each point and find if there are points that are closer to each other that do not respect a certain distance | |
expressed in meter. | |
Args: | |
:points_list (list): list of points expressed in xyz 3D coordinates (meters) | |
:min_distance (float): minimum threshold for distances (if the l2 distance between two objects is lower than this value it is considered a violation) | |
(default is 1.5) | |
Returns: | |
:distance_matrix: matrix containing the distances between each points (diagonal 0) | |
:violate: set of points that violate the minimum distance threshold | |
:couple_points: list of lists of couple points that violate the min_distance threshold (to keep track of each couple) | |
""" | |
if points_list is None or len(points_list) == 1 or len(points_list) == 0: | |
return None, None, None | |
else: # if there are more than two points | |
violate = set() | |
couple_points = [] | |
aux = np.array(points_list) | |
distance_matrix = dist.cdist(aux, aux, 'euclidean') | |
for i in range(0, distance_matrix.shape[0]): # loop over the upper triangular of the distance matrix | |
for j in range(i + 1, distance_matrix.shape[1]): | |
if distance_matrix[i, j] < min_distance: | |
# print("Distance between {} and {} is {:.2f} meters".format(i, j, distance_matrix[i, j])) | |
violate.add(i) | |
violate.add(j) | |
couple_points.append((i, j)) | |
return distance_matrix, violate, couple_points | |
def initialize_video_recorder(output_path, output_depth_path, fps, shape): | |
"""Initialize OpenCV video recorders that will be used to write each image/frame to a single video | |
Args: | |
:output (str): The file location where the recorded video will be saved | |
:output_depth (str): The file location where the recorded video with depth information will be saved | |
:fps (int): The frame per seconds of the output videos | |
:shape (tuple): The dimension of the output video (width, height) | |
Returns: | |
:writer (cv2.VideoWriter): The video writer used to save the video | |
:writer_depth (cv2.VideoWriter): The video writer used to save the video with depth information | |
""" | |
if not os.path.isdir(os.path.split(output_path)[0]): | |
logger.error("Invalid path for the video writer; folder does not exist") | |
exit(1) | |
fourcc = cv2.VideoWriter_fourcc(*"MJPG") | |
writer = cv2.VideoWriter(output_path, fourcc, fps, shape, True) | |
writer_depth = None | |
if output_depth_path: | |
if not os.path.isdir(os.path.split(output_depth_path)[0]): | |
logger.error("Invalid path for the depth video writer; folder does not exist") | |
exit(1) | |
writer_depth = cv2.VideoWriter(output_depth_path, fourcc, fps, shape, True) | |
return writer, writer_depth | |
def delete_items_from_array_aux(arr, i): | |
""" | |
Auxiliary function that delete the item at a certain index from a numpy array | |
Args: | |
:arr (numpy.ndarray): Array of array where each element correspond to the four coordinates of bounding box expressed in percentage | |
:i (int): Index of the element to be deleted | |
Returns: | |
:arr_ret: the array without the element at index i | |
""" | |
aux = arr.tolist() | |
aux.pop(i) | |
arr_ret = np.array(aux) | |
return arr_ret | |
def fit_plane_least_square(xyz): | |
# find a plane that best fit xyz points using least squares | |
(rows, cols) = xyz.shape | |
g = np.ones((rows, 3)) | |
g[:, 0] = xyz[:, 0] # X | |
g[:, 1] = xyz[:, 1] # Y | |
z = xyz[:, 2] | |
(a, b, c), _, rank, s = np.linalg.lstsq(g, z, rcond=None) | |
normal = (a, b, -1) | |
nn = np.linalg.norm(normal) | |
normal = normal / nn | |
point = np.array([0.0, 0.0, c]) | |
d = -point.dot(normal) | |
return d, normal, point | |
# | |
# def plot_plane(data, normal, d): | |
# from mpl_toolkits.mplot3d import Axes3D | |
# import matplotlib.pyplot as plt | |
# | |
# fig = plt.figure() | |
# ax = fig.gca(projection='3d') | |
# | |
# # plot fitted plane | |
# maxx = np.max(data[:, 0]) | |
# maxy = np.max(data[:, 1]) | |
# minx = np.min(data[:, 0]) | |
# miny = np.min(data[:, 1]) | |
# | |
# # compute needed points for plane plotting | |
# xx, yy = np.meshgrid([minx - 10, maxx + 10], [miny - 10, maxy + 10]) | |
# z = (-normal[0] * xx - normal[1] * yy - d) * 1. / normal[2] | |
# | |
# # plot plane | |
# ax.plot_surface(xx, yy, z, alpha=0.2) | |
# | |
# ax.set_xlabel('x') | |
# ax.set_ylabel('y') | |
# ax.set_zlabel('z') | |
# plt.show() | |
# | |
# return | |
def shape_to_np(shape, dtype="int"): | |
""" | |
Function used for the dlib facial detector; it determine the facial landmarks for the face region, then convert the facial landmark | |
(x, y)-coordinates to a NumPy array | |
Args: | |
:shape (): | |
:dtype (): | |
(Default is "int") | |
Returns: | |
:coordinates (list): list of x, y coordinates | |
""" | |
# initialize the list of (x, y)-coordinates | |
coordinates = np.zeros((68, 2), dtype=dtype) | |
# loop over the 68 facial landmarks and convert them to a 2-tuple of (x, y)-coordinates | |
for i in range(0, 68): | |
coordinates[i] = (shape.part(i).x, shape.part(i).y) | |
# return the list of (x, y)-coordinates | |
return coordinates | |
def rect_to_bb(rect): | |
""" | |
Function used for the dlib facial detector; it converts dlib's rectangle to a tuple (x, y, w, h) where x and y represent xmin and ymin | |
coordinates while w and h represent the width and the height | |
Args: | |
:rect (dlib.rectangle): dlib rectangle object that represents the region of the image where a face is detected | |
Returns: | |
:res (tuple): tuple that represents the region of the image where a face is detected in the form x, y, w, h | |
""" | |
# take a bounding predicted by dlib and convert it to the format (x, y, w, h) as we would normally do with OpenCV | |
x = rect.left() | |
y = rect.top() | |
w = rect.right() - x | |
h = rect.bottom() - y | |
# return a tuple of (x, y, w, h) | |
res = x, y, w, h | |
return res | |
def enlarge_bb(y_min, x_min, y_max, x_max, im_width, im_height): | |
""" | |
Enlarge the bounding box to include more background margin (used for face detection) | |
Args: | |
:y_min (int): the top y coordinate of the bounding box | |
:x_min (int): the left x coordinate of the bounding box | |
:y_max (int): the bottom y coordinate of the bounding box | |
:x_max (int): the right x coordinate of the bounding box | |
:im_width (int): The width of the image | |
:im_height (int): The height of the image | |
Returns: | |
:y_min (int): the top y coordinate of the bounding box after enlarging | |
:x_min (int): the left x coordinate of the bounding box after enlarging | |
:y_max (int): the bottom y coordinate of the bounding box after enlarging | |
:x_max (int): the right x coordinate of the bounding box after enlarging | |
""" | |
y_min = int(max(0, y_min - abs(y_min - y_max) / 10)) | |
y_max = int(min(im_height, y_max + abs(y_min - y_max) / 10)) | |
x_min = int(max(0, x_min - abs(x_min - x_max) / 5)) | |
x_max = int(min(im_width, x_max + abs(x_min - x_max) / 4)) # 5 | |
x_max = int(min(x_max, im_width)) | |
return y_min, x_min, y_max, x_max | |
def linear_assignment(cost_matrix): | |
try: | |
import lap | |
_, x, y = lap.lapjv(cost_matrix, extend_cost=True) | |
return np.array([[y[i], i] for i in x if i >= 0]) | |
except ImportError: | |
from scipy.optimize import linear_sum_assignment | |
x, y = linear_sum_assignment(cost_matrix) | |
return np.array(list(zip(x, y))) | |
def iou_batch(bb_test, bb_gt): | |
""" | |
From SORT: Computes IUO between two bboxes in the form [x1,y1,x2,y2] | |
Args: | |
:bb_test (): | |
:bb_gt (): | |
Returns: | |
""" | |
# print(bb_test, bb_gt) | |
bb_gt = np.expand_dims(bb_gt, 0) | |
bb_test = np.expand_dims(bb_test, 1) | |
xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0]) | |
yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1]) | |
xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2]) | |
yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3]) | |
w = np.maximum(0., xx2 - xx1) | |
h = np.maximum(0., yy2 - yy1) | |
wh = w * h | |
o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1]) + (bb_gt[..., 2] - bb_gt[..., 0]) * ( | |
bb_gt[..., 3] - bb_gt[..., 1]) - wh) | |
return o | |
def convert_bbox_to_z(bbox): | |
""" | |
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is | |
the aspect ratio | |
Args: | |
:bbox (): | |
Returns: | |
""" | |
w = bbox[2] - bbox[0] | |
h = bbox[3] - bbox[1] | |
x = bbox[0] + w / 2. | |
y = bbox[1] + h / 2. | |
s = w * h # scale is just area | |
r = w / float(h) if float(h) != 0 else w | |
return np.array([x, y, s, r]).reshape((4, 1)) | |
def convert_x_to_bbox(x, score=None): | |
""" | |
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form | |
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right | |
Args: | |
:x (): | |
:score (): | |
(Default is None) | |
Returns: | |
""" | |
w = np.sqrt(x[2] * x[3]) | |
h = x[2] / w | |
if score is None: | |
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4)) | |
else: | |
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5)) | |
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3): | |
""" | |
Assigns detections to tracked object (both represented as bounding boxes) | |
Returns 3 lists of matches, unmatched_detections and unmatched_trackers | |
Args: | |
:detections (): | |
:trackers (): | |
:iou_threshold (): | |
(Default is 0.3) | |
Returns: | |
""" | |
if len(trackers) == 0: | |
return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int) | |
iou_matrix = iou_batch(detections, trackers) | |
# print("IOU MATRIX: ", iou_matrix) | |
if min(iou_matrix.shape) > 0: | |
a = (iou_matrix > iou_threshold).astype(np.int32) | |
if a.sum(1).max() == 1 and a.sum(0).max() == 1: | |
matched_indices = np.stack(np.where(a), axis=1) | |
else: | |
matched_indices = linear_assignment(-iou_matrix) | |
else: | |
matched_indices = np.empty(shape=(0, 2)) | |
unmatched_detections = [] | |
for d, det in enumerate(detections): | |
if d not in matched_indices[:, 0]: | |
unmatched_detections.append(d) | |
unmatched_trackers = [] | |
for t, trk in enumerate(trackers): | |
if t not in matched_indices[:, 1]: | |
unmatched_trackers.append(t) | |
# filter out matched with low IOU | |
matches = [] | |
for m in matched_indices: | |
if iou_matrix[m[0], m[1]] < iou_threshold: | |
unmatched_detections.append(m[0]) | |
unmatched_trackers.append(m[1]) | |
else: | |
matches.append(m.reshape(1, 2)) | |
if len(matches) == 0: | |
matches = np.empty((0, 2), dtype=int) | |
else: | |
matches = np.concatenate(matches, axis=0) | |
return matches, np.array(unmatched_detections), np.array(unmatched_trackers) | |
def find_face_from_key_points(key_points, bboxes, image, person=None, openpose=False, gazefollow=True): | |
""" | |
Args: | |
key_points: | |
bboxes: | |
image: | |
person: | |
openpose: | |
gazefollow: | |
Returns: | |
""" | |
im_width, im_height = image.shape[1], image.shape[0] | |
# key_points, bboxes = person.get_key_points()[-1], person.get_bboxes()[-1] | |
# print("PERSON ID:", person.get_id()) | |
# 0 nose, 1/2 left/right eye, 3/4 left/right ear | |
# 5/6 leftShoulder/rightShoulder | |
# 7/8 leftElbow/rightElbow | |
# 9/10 leftWrist/rightWrist | |
# 11/12 leftHip/rightHip | |
# 13/14 leftKnee/rightKnee | |
# 15/16 leftAnkle/rightAnkle | |
# print(key_points) | |
face_points = key_points[:7] | |
if openpose: | |
face_points = [] | |
for point in key_points[:7]: | |
# print(point[2], type(point[2])) | |
if point[2] > 0.0: | |
face_points.append(point) | |
# print("face1", face_points) | |
if len(face_points) == 0: | |
return None, [] | |
# print("bboxe", bboxes, face_points) | |
if not gazefollow: | |
ct = compute_centroid(face_points) | |
x_min, y_min = ct[0] - 10, ct[1] - 15 | |
x_max, y_max = ct[0] + 10, ct[1] + 10 | |
y_min_bbox = y_min | |
elif gazefollow: | |
# [l_shoulder, r_shoulder] = key_points[5:] | |
# print(l_shoulder, r_shoulder) | |
print("FACE", face_points) | |
if len(face_points) == 1: | |
return None, [] | |
x_min, y_min, _ = np.amin(face_points, axis=0) | |
x_max, y_max, _ = np.amax(face_points, axis=0) | |
# aux_diff = | |
# print("X: ", aux_diff) | |
# if aux_diff < 20: | |
# x_max += 20 | |
# x_min -= 20 | |
aux_diff = y_max - y_min | |
print("y: ", aux_diff) | |
if aux_diff < 50: # rapporto xmax -xmin o altro | |
y_max += (x_max - x_min) / 1.4 | |
y_min -= (x_max - x_min) / 1.2 | |
# x_min -= 10 | |
# x_max += 10 | |
y_min_bbox = int(y_min) # int(bboxes[1]) if bboxes is not None else y_min - (x_max-x_min) | |
# if bboxes is None: | |
# y_max = y_max + (x_max-x_min) | |
y_min, x_min, y_max, x_max = enlarge_bb(y_min_bbox, x_min, y_max, x_max, im_width, im_height) | |
# print(y_min, x_min, y_max, x_max, y_max - y_min, x_max - x_min) | |
# if -1 < y_max - y_min < 5 and -1 < x_max - x_min < 5: # due punti uguali | |
# # print("AAAAA") | |
# return None, [] | |
face_image = image[y_min:y_max, x_min:x_max] | |
if person is not None: | |
# person.print_() | |
person.update_faces(face_image) | |
person.update_faces_coordinates([y_min, x_min, y_max, x_max]) | |
# person.update_faces_key_points(face_points) | |
# person.print_() | |
return None | |
else: | |
return face_image, [y_min, x_min, y_max, x_max] | |
def compute_interaction_cosine(head_position, target_position, gaze_direction): | |
""" | |
Computes the interaction between two people using the angle of view. | |
The interaction in measured as the cosine of the angle formed by the line from person A to B and the gaze direction of person A. | |
Args: | |
:head_position (list): list of pixel coordinates [x, y] that represents the position of the head of person A | |
:target_position (list): list of pixel coordinates [x, y] that represents the position of head of person B | |
:gaze_direction (list): list that represents the gaze direction of the head of person A in the form [gx, gy] | |
Returns: | |
:val (float): value that describe the quantity of interaction | |
""" | |
if head_position == target_position: | |
return 0 # or -1 | |
else: | |
# direction from observer to target | |
direction = np.arctan2((target_position[1] - head_position[1]), (target_position[0] - head_position[0])) | |
direction_gaze = np.arctan2(gaze_direction[1], gaze_direction[0]) | |
difference = direction - direction_gaze | |
# difference of the line joining observer -> target with the gazing direction, | |
val = np.cos(difference) | |
if val < 0: | |
return 0 | |
else: | |
return val | |
def compute_attention_from_vectors(list_objects): | |
""" | |
Args: | |
:list_objects (): | |
Returns: | |
""" | |
dict_person = dict() | |
id_list = [] | |
for obj in list_objects: | |
if len(obj.get_key_points()) > 0: | |
# print("Object ID: ", obj.get_id(), "x: ", obj.get_poses_vector_norm()[-1][0], "y: ", obj.get_poses_vector_norm()[-1][1]) | |
id_list.append(obj.get_id()) | |
# print("kpts: ", obj.get_key_points()[-1]) | |
aux = [obj.get_key_points()[-1][j][:2] for j in [0, 2, 1, 4, 3]] | |
dict_person[obj.get_id()] = [obj.get_poses_vector_norm()[-1], np.mean(aux, axis=0).tolist()] | |
attention_matrix = np.zeros((len(dict_person), len(dict_person)), dtype=np.float32) | |
for i in range(attention_matrix.shape[0]): | |
for j in range(attention_matrix.shape[1]): | |
if i == j: | |
continue | |
attention_matrix[i][j] = compute_interaction_cosine(dict_person[i][1], dict_person[j][1], dict_person[i][0]) | |
return attention_matrix.tolist(), id_list | |
def compute_attention_ypr(list_objects): | |
""" | |
Args: | |
:list_objects (): | |
Returns: | |
: | |
""" | |
for obj in list_objects: | |
if len(obj.get_key_points()) > 0: | |
print("Object ID: ", obj.get_id(), "yaw: ", obj.get_poses_ypr()[-1][0], "pitch: ", obj.get_poses_ypr()[-1][1], "roll: ", | |
obj.get_poses_ypr()[-1][2]) | |
def save_key_points_to_json(ids, kpts, path_json, openpose=False): | |
""" | |
Save key points to .json format according to Openpose output format | |
Args: | |
:kpts (): | |
:path_json (): | |
Returns: | |
""" | |
# print(path_json) | |
dict_file = {"version": 1.3} | |
list_dict_person = [] | |
for j in range(len(kpts)): | |
dict_person = {"person_id": [int(ids[j])], | |
"face_keypoints_2d": [], | |
"hand_left_keypoints_2d": [], | |
"hand_right_keypoints_2d": [], | |
"pose_keypoints_3d": [], | |
"face_keypoints_3d": [], | |
"hand_left_keypoints_3d": [], | |
"hand_right_keypoints_3d": []} | |
kpts_openpose = np.zeros((25, 3)) | |
for i, point in enumerate(kpts[j]): | |
if openpose: | |
idx_op = rev_pose_id_part_openpose[pose_id_part_openpose[i]] | |
else: | |
idx_op = rev_pose_id_part_openpose[pose_id_part[i]] | |
# print(idx_op, point[1], point[0], point[2]) | |
kpts_openpose[idx_op] = [point[1], point[0], point[2]] # x, y, conf | |
list_kpts_openpose = list(np.concatenate(kpts_openpose).ravel()) | |
dict_person["pose_keypoints_2d"] = list_kpts_openpose | |
# print(dict_person) | |
list_dict_person.append(dict_person) | |
dict_file["people"] = list_dict_person | |
# Serializing json | |
json_object = json.dumps(dict_file, indent=4) | |
# Writing to sample.json | |
with open(path_json, "w") as outfile: | |
outfile.write(json_object) | |
def json_to_poses(json_data): | |
""" | |
Args: | |
:js_data (): | |
Returns: | |
:res (): | |
""" | |
poses = [] | |
confidences = [] | |
ids = [] | |
for arr in json_data["people"]: | |
ids.append(arr["person_id"]) | |
confidences.append(arr["pose_keypoints_2d"][2::3]) | |
aux = arr["pose_keypoints_2d"][2::3] | |
arr = np.delete(arr["pose_keypoints_2d"], slice(2, None, 3)) | |
# print("B", list(zip(arr[::2], arr[1::2]))) | |
poses.append(list(zip(arr[::2], arr[1::2], aux))) | |
return poses, confidences, ids | |
def parse_json1(aux): | |
# print(aux['people']) | |
list_kpts = [] | |
id_list = [] | |
for person in aux['people']: | |
# print(len(person['pose_keypoints_2d'])) | |
aux = person['pose_keypoints_2d'] | |
aux_kpts = [[aux[i+1], aux[i], aux[i+2]] for i in range(0, 75, 3)] | |
# print(len(aux_kpts)) | |
list_kpts.append(aux_kpts) | |
id_list.append(person['person_id']) | |
# print(list_kpts) | |
return list_kpts, id_list | |
def load_poses_from_json1(json_filename): | |
""" | |
Args: | |
:json_filename (): | |
Returns: | |
:poses, conf: | |
""" | |
with open(json_filename) as data_file: | |
loaded = json.load(data_file) | |
zz = parse_json1(loaded) | |
return zz | |
def load_poses_from_json(json_filename): | |
""" | |
Args: | |
:json_filename (): | |
Returns: | |
:poses, conf: | |
""" | |
with open(json_filename) as data_file: | |
loaded = json.load(data_file) | |
poses, conf, ids = json_to_poses(loaded) | |
if len(poses) < 1: # != 1: | |
return None, None, None | |
else: | |
return poses, conf, ids | |
def compute_head_features(img, pose, conf, open_pose=True): | |
""" | |
Args: | |
img: | |
pose: | |
conf: | |
open_pose: | |
Returns: | |
""" | |
joints = [0, 15, 16, 17, 18] if open_pose else [0, 2, 1, 4, 3] | |
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])] # if open_pose else pose | |
if len(n_joints_set) < 1: | |
return None, None | |
centroid = compute_centroid(n_joints_set) | |
# for j in n_joints_set: | |
# print(j, centroid) | |
max_dist = max([dist_2D([j[0], j[1]], centroid) for j in n_joints_set]) | |
new_repr = [(np.array([pose[joint][0], pose[joint][1]]) - np.array(centroid)) for joint in joints] if open_pose else [ | |
(np.array(pose[i]) - np.array(centroid)) for i in range(len(n_joints_set))] | |
result = [] | |
for i in range(0, 5): | |
if joint_set(pose[joints[i]]): | |
if max_dist != 0.0: | |
result.append([new_repr[i][0] / max_dist, new_repr[i][1] / max_dist]) | |
else: | |
result.append([new_repr[i][0], new_repr[i][1]]) | |
else: | |
result.append([0, 0]) | |
flat_list = [item for sublist in result for item in sublist] | |
conf_list = [] | |
for j in joints: | |
conf_list.append(conf[j]) | |
return flat_list, conf_list, centroid | |
def compute_body_features(pose, conf): | |
""" | |
Args: | |
pose: | |
conf: | |
Returns: | |
""" | |
joints = [0, 15, 16, 17, 18] | |
alljoints = range(0, 25) | |
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])] | |
if len(n_joints_set) < 1: | |
return None, None | |
centroid = compute_centroid(n_joints_set) | |
n_joints_set = [pose[joint] for joint in alljoints if joint_set(pose[joint])] | |
max_dist = max([dist_2D(j, centroid) for j in n_joints_set]) | |
new_repr = [(np.array(pose[joint]) - np.array(centroid)) for joint in alljoints] | |
result = [] | |
for i in range(0, 25): | |
if joint_set(pose[i]): | |
result.append([new_repr[i][0] / max_dist, new_repr[i][1] / max_dist]) | |
else: | |
result.append([0, 0]) | |
flat_list = [item for sublist in result for item in sublist] | |
for j in alljoints: | |
flat_list.append(conf[j]) | |
return flat_list, centroid | |
def compute_centroid(points): | |
""" | |
Args: | |
points: | |
Returns: | |
""" | |
x, y = [], [] | |
for point in points: | |
if len(point) == 3: | |
if point[2] > 0.0: | |
x.append(point[0]) | |
y.append(point[1]) | |
else: | |
x.append(point[0]) | |
y.append(point[1]) | |
# print(x, y) | |
if x == [] or y == []: | |
return [None, None] | |
mean_x = np.mean(x) | |
mean_y = np.mean(y) | |
return [mean_x, mean_y] | |
def joint_set(p): | |
""" | |
Args: | |
p: | |
Returns: | |
""" | |
return p[0] != 0.0 or p[1] != 0.0 | |
def dist_2D(p1, p2): | |
""" | |
Args: | |
p1: | |
p2: | |
Returns: | |
""" | |
# print(p1) | |
# print(p2) | |
p1 = np.array(p1) | |
p2 = np.array(p2) | |
squared_dist = np.sum((p1 - p2) ** 2, axis=0) | |
return np.sqrt(squared_dist) | |
def compute_head_centroid(pose): | |
""" | |
Args: | |
pose: | |
Returns: | |
""" | |
joints = [0, 15, 16, 17, 18] | |
n_joints_set = [pose[joint] for joint in joints if joint_set(pose[joint])] | |
# if len(n_joints_set) < 2: | |
# return None | |
centroid = compute_centroid(n_joints_set) | |
return centroid | |
def head_direction_to_json(path_json, norm_list, unc_list, ids_list, file_name): | |
dict_file = {} | |
list_dict_person = [] | |
for k, i in enumerate(norm_list): | |
dict_person = {"id_person": [ids_list[k]], | |
"norm_xy": [i[0][0].item(), i[0][1].item()], # from numpy to native python type for json serilization | |
"center_xy": [int(i[1][0]), int(i[1][1])], | |
"uncertainty": [unc_list[k].item()]} | |
list_dict_person.append(dict_person) | |
dict_file["people"] = list_dict_person | |
json_object = json.dumps(dict_file, indent=4) | |
with open(path_json, "w") as outfile: | |
outfile.write(json_object) | |
def ypr_to_json(path_json, yaw_list, pitch_list, roll_list, yaw_u_list, pitch_u_list, roll_u_list, ids_list, center_xy): | |
dict_file = {} | |
list_dict_person = [] | |
for k in range(len(yaw_list)): | |
dict_person = {"id_person": [ids_list[k]], | |
"yaw": [yaw_list[k].item()], | |
"yaw_u": [yaw_u_list[k].item()], | |
"pitch": [pitch_list[k].item()], | |
"pitch_u": [pitch_u_list[k].item()], | |
"roll": [roll_list[k].item()], | |
"roll_u": [roll_u_list[k].item()], | |
"center_xy": [int(center_xy[k][0]), int(center_xy[k][1])]} | |
list_dict_person.append(dict_person) | |
dict_file["people"] = list_dict_person | |
json_object = json.dumps(dict_file, indent=4) | |
with open(path_json, "w") as outfile: | |
outfile.write(json_object) | |
# exit() | |
def save_keypoints_image(img, poses, suffix_, path_save=''): | |
""" | |
Save the image with the key points drawn on it | |
Args: | |
img: | |
poses: | |
suffix_: | |
Returns: | |
""" | |
aux = img.copy() | |
for point in poses: | |
for i, p in enumerate(point): | |
if i in [0, 15, 16, 17, 18]: | |
cv2.circle(aux, (int(p[0]), int(p[1])), 2, (0, 255, 0), 2) | |
cv2.imwrite(os.path.join(path_save, suffix_ + '.jpg'), aux) | |
def unit_vector(vector): | |
""" | |
Returns the unit vector of the vector. | |
Args: | |
vector: | |
Returns: | |
""" | |
return vector / np.linalg.norm(vector) | |
def angle_between(v1, v2): | |
""" | |
Returns the angle in radians between vectors 'v1' and 'v2':: | |
angle_between((1, 0, 0), (0, 1, 0)) | |
1.5707963267948966 | |
angle_between((1, 0, 0), (1, 0, 0)) | |
0.0 | |
angle_between((1, 0, 0), (-1, 0, 0)) | |
3.141592653589793 | |
""" | |
# if not unit vector | |
v1_u = unit_vector(tuple(v1)) | |
v2_u = unit_vector(tuple(v2)) | |
angle = np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0)) | |
return angle if angle < 1.80 else angle - 1.80 | |
def centroid_constraint(centroid, centroid_det, gazefollow=False): # x y | |
""" | |
Args: | |
centroid: | |
centroid_det: | |
Returns: | |
""" | |
if centroid_det == [None, None]: | |
return False | |
if gazefollow == False: | |
if 0 < centroid_det[0] < 143 and 0 < centroid_det[1] < 24: # centroid in the overprinted text of hour in the video | |
return False | |
if 0 < centroid_det[1] < 4: | |
return False | |
if centroid[0] - 3 < centroid_det[0] < centroid[0] + 3 and centroid[1] - 3 < centroid_det[1] < centroid[ | |
1] + 3: # detected centroid near the gt centroid | |
return True | |
else: | |
return False | |
else: | |
if int(centroid[0] - 30) < int(centroid_det[0]) < int(centroid[0] + 30) and int(centroid[1] - 30) < int(centroid_det[1]) < int( | |
centroid[1] + 30): # detected centroid near the gt centroid | |
return True | |
else: | |
return False | |
def initialize_video_reader(path_video): | |
""" | |
Args: | |
path_video: | |
Returns: | |
""" | |
cap = cv2.VideoCapture(path_video) | |
if cap is None or not cap.isOpened(): | |
print('Warning: unable to open video source: ', path_video) | |
exit(-1) | |
return cap | |
def distance_skeletons(kpts1, kpts2, dst_type): | |
""" | |
Function to compute the distance between skeletons | |
#TO DO | |
Args: | |
kpts1: | |
kpts2: | |
dts_type: | |
Returns: | |
""" | |
if len(kpts1) != len(kpts2): | |
print("Error: Different notation used for keypoints") | |
exit(-1) | |
print(len(kpts1), len(kpts2)) | |
# to openpose notations | |
if len(kpts1) == len(kpts2) == 17: | |
kpts1, kpts2 = kpt_centernet_to_openpose(kpts1), kpt_centernet_to_openpose(kpts2) | |
print(len(kpts1), len(kpts2)) | |
if len(kpts1) != 25 or len(kpts2) != 25: | |
print("Error") | |
exit(-1) | |
res_dist = 0 | |
if dst_type == 'all_points': | |
for i, _ in enumerate(kpts1): | |
res_dist += dist_2D(kpts1[i][:2], kpts2[i][:2]) | |
res_dist /= 25 | |
return res_dist | |
elif dst_type == 'head_centroid': | |
top1_c, top2_c = compute_head_centroid(kpts1), compute_head_centroid(kpts2) | |
if top1_c == [None, None] or top2_c == [None, None]: | |
res_dist = 900 | |
else: | |
res_dist = dist_2D(top1_c[:2], top2_c[:2]) | |
return res_dist | |
elif dst_type == 'three_centroids': | |
#TO DO | |
# top1_c, top2_c = compute_centroid(kpts1[0, 15, 16, 17, 18]), compute_centroid(kpts2[0, 15, 16, 17, 18]) | |
# mid1_c, mid2_c = compute_centroid(kpts1[2, 5, 9, 12]), compute_centroid(kpts2[2, 5, 9, 12]) | |
# btm1_c, btm2_c = compute_centroid(kpts1[9, 12, 10, 13]), compute_centroid(kpts2[9, 12, 10, 13]) | |
# res_dist = dist_2D(top1_c[:2], top2_c[:2]) + dist_2D(mid1_c[:2], mid2_c[:2]) + dist_2D(btm1_c[:2], btm2_c[:2]) | |
# res_dist /= 3 | |
# return res_dist | |
return None | |
elif dst_type == '': | |
print("dst_typ not valid") | |
exit(-1) | |
def kpt_openpose_to_centernet(kpts): | |
""" | |
Args: | |
kpts: | |
Returns: | |
""" | |
#TO TEST | |
kpts_openpose = np.zeros((16, 3)) | |
for i, point in enumerate(kpts): | |
idx_op = rev_pose_id_part[pose_id_part_openpose[i]] | |
kpts_openpose[idx_op] = [point[0], point[1], point[2]] | |
return kpts_openpose | |
def kpt_centernet_to_openpose(kpts): | |
""" | |
Args: | |
kpts: | |
Returns: | |
""" | |
#TO TEST | |
kpts_openpose = np.zeros((25, 3)) | |
for i, point in enumerate(kpts): | |
idx_op = rev_pose_id_part_openpose[pose_id_part[i]] | |
kpts_openpose[idx_op] = [point[1], point[0], point[2]] | |
return kpts_openpose | |
def non_maxima_aux(det, kpt, threshold=15): # threshold in pxels | |
# print("A", kpt, "\n", len(kpt)) | |
indexes_to_delete = [] | |
if len(kpt) == 0 or len(det) == 0: | |
return [], [] | |
if len(kpt) == 1 or len(det) == 1: | |
return det, kpt | |
kpt_res = kpt.copy() | |
det_res_aux = det.copy() | |
for i in range(0, len(kpt)): | |
for j in range(i, len(kpt)): | |
if i == j: | |
continue | |
dist = distance_skeletons(kpt[i], kpt[j], 'head_centroid') | |
# print("DIST", i, j, dist) | |
if dist < threshold: | |
if j not in indexes_to_delete: | |
indexes_to_delete.append(j) | |
# kpt_res.pop(j) | |
det_res = [] | |
# print(indexes_to_delete) | |
indexes_to_delete = sorted(indexes_to_delete, reverse=True) | |
# print(len(kpt_res)) | |
for index in indexes_to_delete: | |
kpt_res.pop(index) | |
det_res_aux = list(np.delete(det_res_aux, indexes_to_delete, axis=0)) | |
det_res = np.array(det_res_aux) | |
return det_res, kpt_res | |
def compute_centroid_list(points): | |
""" | |
Args: | |
points: | |
Returns: | |
""" | |
x, y = [], [] | |
for i in range(0, len(points), 3): | |
if points[i + 2] > 0.0: # confidence openpose | |
x.append(points[i]) | |
y.append(points[i + 1]) | |
if x == [] or y == []: | |
return [None, None] | |
mean_x = np.mean(x) | |
mean_y = np.mean(y) | |
return [mean_x, mean_y] | |
def normalize_wrt_maximum_distance_point(points, file_name=''): | |
centroid = compute_centroid_list(points) | |
# centroid = [points[0], points[1]] | |
# print(centroid) | |
# exit() | |
max_dist_x, max_dist_y = 0, 0 | |
for i in range(0, len(points), 3): | |
if points[i + 2] > 0.0: # confidence openpose take only valid keypoints (if not detected (0, 0, 0) | |
distance_x = abs(points[i] - centroid[0]) | |
distance_y = abs(points[i+1] - centroid[1]) | |
# dist_aux.append(distance) | |
if distance_x > max_dist_x: | |
max_dist_x = distance_x | |
if distance_y > max_dist_y: | |
max_dist_y = distance_y | |
elif points[i + 2] == 0.0: # check for centernet people on borders with confidence 0 | |
points[i] = 0 | |
points[i+1] = 0 | |
for i in range(0, len(points), 3): | |
if points[i + 2] > 0.0: | |
if max_dist_x != 0.0: | |
points[i] = (points[i] - centroid[0]) / max_dist_x | |
if max_dist_y != 0.0: | |
points[i + 1] = (points[i + 1] - centroid[1]) / max_dist_y | |
if max_dist_x == 0.0: # only one point valid with some confidence value so it become (0,0, confidence) | |
points[i] = 0.0 | |
if max_dist_y == 0.0: | |
points[i + 1] = 0.0 | |
return points | |
def retrieve_interest_points(kpts, detector): | |
""" | |
:param kpts: | |
:return: | |
""" | |
res_kpts = [] | |
if detector == 'centernet': | |
face_points = [0, 1, 2, 3, 4] | |
for index in face_points: | |
res_kpts.append(kpts[index][1]) | |
res_kpts.append(kpts[index][0]) | |
res_kpts.append(kpts[index][2]) | |
elif detector== 'zedcam': | |
face_points = [0, 14, 15, 16, 17] | |
for index in face_points: | |
res_kpts.append(kpts[index][0]) | |
res_kpts.append(kpts[index][1]) | |
res_kpts.append(kpts[index][2]) | |
else: | |
# take only interest points (5 points of face) | |
face_points = [0, 16, 15, 18, 17] | |
for index in face_points: | |
res_kpts.append(kpts[index][0]) | |
res_kpts.append(kpts[index][1]) | |
res_kpts.append(kpts[index][2]) | |
return res_kpts | |
def create_bbox_from_openpose_keypoints(data): | |
# from labels import pose_id_part_openpose | |
bbox = list() | |
ids = list() | |
kpt = list() | |
kpt_scores = list() | |
for person in data['people']: | |
ids.append(person['person_id'][0]) | |
kpt_temp = list() | |
kpt_score_temp = list() | |
# create bbox with min max each dimension | |
x, y = [], [] | |
for i in pose_id_part_openpose: | |
if i < 25: | |
# kpt and kpts scores | |
kpt_temp.append([int(person['pose_keypoints_2d'][i * 3]), int(person['pose_keypoints_2d'][(i * 3) + 1]), | |
person['pose_keypoints_2d'][(i * 3) + 2]]) | |
kpt_score_temp.append(person['pose_keypoints_2d'][(i * 3) + 2]) | |
# check confidence != 0 | |
if person['pose_keypoints_2d'][(3 * i) + 2]!=0: | |
x.append(int(person['pose_keypoints_2d'][3 * i])) | |
y.append(int(person['pose_keypoints_2d'][(3 * i) + 1])) | |
kpt_scores.append(kpt_score_temp) | |
kpt.append(kpt_temp) | |
xmax = max(x) | |
xmin = min(x) | |
ymax = max(y) | |
ymin = min(y) | |
bbox.append([xmin, ymin, xmax, ymax, 1]) # last value is for compatibility of centernet | |
return bbox, kpt, kpt_scores # not to use scores | |
def atoi(text): | |
return int(text) if text.isdigit() else text | |
def natural_keys(text): | |
""" | |
alist.sort(key=natural_keys) sorts in human order | |
http://nedbatchelder.com/blog/200712/human_sorting.html | |
(See Toothy's implementation in the comments) | |
""" | |
import re | |
return [atoi(c) for c in re.split(r'(\d+)', text)] |