Spaces:
Sleeping
Sleeping
import cv2 | |
import torch | |
import random | |
import numpy as np | |
from mediapipe.python.solutions import pose | |
SELECTED_JOINTS = { | |
27: { | |
'pose': [0, 11, 12, 13, 14, 15, 16], | |
'hand': [0, 4, 5, 8, 9, 12, 13, 16, 17, 20], | |
}, # 27 | |
} | |
def pad(joints: np.ndarray, num_frames: int = 150) -> np.ndarray: | |
''' | |
Add padding to the joints. | |
Parameters | |
---------- | |
joints : np.ndarray | |
The joints to pad. | |
num_frames : int, default=150 | |
The number of frames to pad. | |
Returns | |
------- | |
np.ndarray | |
The padded joints. | |
''' | |
if joints.shape[0] < num_frames: | |
L = joints.shape[0] | |
padded_joints = np.zeros((num_frames, joints.shape[1], joints.shape[2])) | |
padded_joints[:L, :, :] = joints | |
rest = num_frames - L | |
num = int(np.ceil(rest / L)) | |
pad = np.concatenate([joints for _ in range(num)], 0)[:rest] | |
padded_joints[L:, :, :] = pad | |
else: | |
padded_joints = joints[:num_frames] | |
return padded_joints | |
def extract_joints( | |
source: str, | |
keypoints_detector, | |
resize_to: tuple = (256, 256), | |
num_joints: int = 27, | |
num_frames: int = 150, | |
num_bodies: int = 1, | |
num_channels: int = 3, | |
) -> np.ndarray: | |
''' | |
Extract the joints from the video. | |
Parameters | |
---------- | |
source : str | |
The path to the video. | |
keypoints_detector : mediapipe.solutions.holistic.Holistic | |
The keypoints detector. | |
resize_to : tuple, default=(256, 256) | |
The size to resize the image. | |
num_joints : int, default=27 | |
The number of joints. | |
num_frames : int, default=150 | |
The number of frames. | |
num_bodies : int, default=1 | |
The number of bodies. | |
num_channels : int, default=3 | |
The number of channels. | |
Returns | |
------- | |
np.ndarray | |
The extracted joints. | |
''' | |
cap = cv2.VideoCapture(source) | |
extracted_joints = [] | |
while cap.isOpened(): | |
success, image = cap.read() | |
if not success: | |
break | |
image = cv2.resize(image, resize_to) | |
image = cv2.flip(image, flipCode=1) | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
frame_joints = [] | |
results = keypoints_detector.process(image) | |
pose = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['pose']) | |
if results.pose_landmarks is not None: | |
pose = [ | |
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) | |
for i, landmark in enumerate(results.pose_landmarks.landmark) | |
if i in SELECTED_JOINTS[num_joints]['pose'] | |
] | |
frame_joints.extend(pose) | |
left_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand']) | |
if results.left_hand_landmarks is not None: | |
left_hand = [ | |
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) | |
for i, landmark in enumerate(results.left_hand_landmarks.landmark) | |
if i in SELECTED_JOINTS[num_joints]['hand'] | |
] | |
frame_joints.extend(left_hand) | |
right_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand']) | |
if results.right_hand_landmarks is not None: | |
right_hand = [ | |
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) | |
for i, landmark in enumerate(results.right_hand_landmarks.landmark) | |
if i in SELECTED_JOINTS[num_joints]['hand'] | |
] | |
frame_joints.extend(right_hand) | |
assert len(frame_joints) == num_joints, \ | |
f'Expected {num_joints} joints, got {len(frame_joints)} joints.' | |
extracted_joints.append(frame_joints) | |
extracted_joints = np.array(extracted_joints) | |
extracted_joints = pad(extracted_joints, num_frames=num_frames) | |
fp = np.zeros( | |
(num_frames, num_joints, num_channels, num_bodies), | |
dtype=np.float32, | |
) | |
fp[:, :, :, 0] = extracted_joints | |
return np.transpose(fp, [2, 0, 1, 3]) | |
def preprocess( | |
source: str, | |
keypoints_detector, | |
normalization: bool = True, | |
random_choose: bool = True, | |
window_size: int = 120, | |
) -> np.ndarray: | |
''' | |
Preprocess the video. | |
Parameters | |
---------- | |
source : str | |
The path to the video. | |
keypoints_detector : mediapipe.solutions.holistic.Holistic | |
The keypoints detector. | |
normalization : bool, default=True | |
Whether to normalize the data. | |
random_choose : bool, default=True | |
Whether to randomly sample the data. | |
window_size : int, default=120 | |
The window size. | |
Returns | |
------- | |
np.ndarray | |
The processed inputs for model. | |
''' | |
inputs = extract_joints(source=source, keypoints_detector=keypoints_detector) | |
# T = inputs.shape[1] | |
# ori_data = inputs | |
# for t in range(T - 1): | |
# inputs[:, t, :, :] = ori_data[:, t + 1, :, :] - ori_data[:, t, :, :] | |
# inputs[:, T - 1, :, :] = 0 | |
if random_choose: | |
inputs = random_sample_np(inputs, window_size) | |
else: | |
inputs = uniform_sample_np(inputs, window_size) | |
# if normalization: | |
# assert inputs.shape[0] == 3 | |
# inputs[0, :, :, :] = inputs[0, :, :, :] - inputs[0, :, 0, 0].mean(axis=0) | |
# inputs[1, :, :, :] = inputs[1, :, :, :] - inputs[1, :, 0, 0].mean(axis=0) | |
return inputs[np.newaxis, :].astype(np.float32) | |
def random_sample_np(data: np.ndarray, size: int) -> np.ndarray: | |
''' | |
Sample the data randomly. | |
Parameters | |
---------- | |
data : np.ndarray | |
The data to sample. | |
size : int | |
The size of the data to sample. | |
Returns | |
------- | |
np.ndarray | |
The sampled data. | |
''' | |
C, T, V, M = data.shape | |
if T == size: | |
return data | |
interval = int(np.ceil(size / T)) | |
random_list = sorted(random.sample(list(range(T))*interval, size)) | |
return data[:, random_list] | |
def uniform_sample_np(data: np.ndarray, size: int) -> np.ndarray: | |
''' | |
Sample the data uniformly. | |
Parameters | |
---------- | |
data : np.ndarray | |
The data to sample. | |
size : int | |
The size of the data to sample. | |
Returns | |
------- | |
np.ndarray | |
The sampled data. | |
''' | |
C, T, V, M = data.shape | |
if T == size: | |
return data | |
interval = T / size | |
uniform_list = [int(i * interval) for i in range(size)] | |
return data[:, uniform_list] | |
def calculate_angle( | |
shoulder: list, | |
elbow: list, | |
wrist: list, | |
) -> float: | |
''' | |
Calculate the angle between the shoulder, elbow, and wrist. | |
Parameters | |
---------- | |
shoulder : list | |
Shoulder coordinates. | |
elbow : list | |
Elbow coordinates. | |
wrist : list | |
Wrist coordinates. | |
Returns | |
------- | |
float | |
Angle in degree between the shoulder, elbow, and wrist. | |
''' | |
shoulder = np.array(shoulder) | |
elbow = np.array(elbow) | |
wrist = np.array(wrist) | |
radians = np.arctan2(wrist[1] - elbow[1], wrist[0] - elbow[0]) \ | |
- np.arctan2(shoulder[1] - elbow[1], shoulder[0] - elbow[0]) | |
angle = np.abs(radians * 180.0 / np.pi) | |
if angle > 180.0: | |
angle = 360 - angle | |
return angle | |
def do_hands_relax( | |
pose_landmarks: list, | |
angle_threshold: float = 160.0, | |
) -> bool: | |
''' | |
Check if the hand is down. | |
Parameters | |
---------- | |
hand_landmarks : list | |
Hand landmarks. | |
angle_threshold : float, optional | |
Angle threshold, by default 160.0. | |
Returns | |
------- | |
bool | |
True if the hand is down, False otherwise. | |
''' | |
if pose_landmarks is None: | |
return True | |
landmarks = pose_landmarks.landmark | |
left_shoulder = [ | |
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].x, | |
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].y, | |
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, | |
] | |
left_elbow = [ | |
landmarks[pose.PoseLandmark.LEFT_ELBOW.value].x, | |
landmarks[pose.PoseLandmark.LEFT_ELBOW.value].y, | |
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, | |
] | |
left_wrist = [ | |
landmarks[pose.PoseLandmark.LEFT_WRIST.value].x, | |
landmarks[pose.PoseLandmark.LEFT_WRIST.value].y, | |
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, | |
] | |
left_angle = calculate_angle(left_shoulder, left_elbow, left_wrist) | |
right_shoulder = [ | |
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].x, | |
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].y, | |
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, | |
] | |
right_elbow = [ | |
landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].x, | |
landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].y, | |
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, | |
] | |
right_wrist = [ | |
landmarks[pose.PoseLandmark.RIGHT_WRIST.value].x, | |
landmarks[pose.PoseLandmark.RIGHT_WRIST.value].y, | |
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, | |
] | |
right_angle = calculate_angle(right_shoulder, right_elbow, right_wrist) | |
is_visible = all( | |
[ | |
left_shoulder[2] > 0, | |
left_elbow[2] > 0, | |
left_wrist[2] > 0, | |
right_shoulder[2] > 0, | |
right_elbow[2] > 0, | |
right_wrist[2] > 0, | |
] | |
) | |
return all( | |
[ | |
is_visible, | |
left_angle < angle_threshold, | |
right_angle < angle_threshold, | |
] | |
) | |