import cv2 import torch import random import numpy as np from mediapipe.python.solutions import pose SELECTED_JOINTS = { 27: { 'pose': [0, 11, 12, 13, 14, 15, 16], 'hand': [0, 4, 5, 8, 9, 12, 13, 16, 17, 20], }, # 27 } def pad(joints: np.ndarray, num_frames: int = 150) -> np.ndarray: ''' Add padding to the joints. Parameters ---------- joints : np.ndarray The joints to pad. num_frames : int, default=150 The number of frames to pad. Returns ------- np.ndarray The padded joints. ''' if joints.shape[0] < num_frames: L = joints.shape[0] padded_joints = np.zeros((num_frames, joints.shape[1], joints.shape[2])) padded_joints[:L, :, :] = joints rest = num_frames - L num = int(np.ceil(rest / L)) pad = np.concatenate([joints for _ in range(num)], 0)[:rest] padded_joints[L:, :, :] = pad else: padded_joints = joints[:num_frames] return padded_joints def extract_joints( source: str, keypoints_detector, resize_to: tuple = (256, 256), num_joints: int = 27, num_frames: int = 150, num_bodies: int = 1, num_channels: int = 3, ) -> np.ndarray: ''' Extract the joints from the video. Parameters ---------- source : str The path to the video. keypoints_detector : mediapipe.solutions.holistic.Holistic The keypoints detector. resize_to : tuple, default=(256, 256) The size to resize the image. num_joints : int, default=27 The number of joints. num_frames : int, default=150 The number of frames. num_bodies : int, default=1 The number of bodies. num_channels : int, default=3 The number of channels. Returns ------- np.ndarray The extracted joints. ''' cap = cv2.VideoCapture(source) extracted_joints = [] while cap.isOpened(): success, image = cap.read() if not success: break image = cv2.resize(image, resize_to) image = cv2.flip(image, flipCode=1) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) frame_joints = [] results = keypoints_detector.process(image) pose = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['pose']) if results.pose_landmarks is not None: pose = [ (landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) for i, landmark in enumerate(results.pose_landmarks.landmark) if i in SELECTED_JOINTS[num_joints]['pose'] ] frame_joints.extend(pose) left_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand']) if results.left_hand_landmarks is not None: left_hand = [ (landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) for i, landmark in enumerate(results.left_hand_landmarks.landmark) if i in SELECTED_JOINTS[num_joints]['hand'] ] frame_joints.extend(left_hand) right_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand']) if results.right_hand_landmarks is not None: right_hand = [ (landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility) for i, landmark in enumerate(results.right_hand_landmarks.landmark) if i in SELECTED_JOINTS[num_joints]['hand'] ] frame_joints.extend(right_hand) assert len(frame_joints) == num_joints, \ f'Expected {num_joints} joints, got {len(frame_joints)} joints.' extracted_joints.append(frame_joints) extracted_joints = np.array(extracted_joints) extracted_joints = pad(extracted_joints, num_frames=num_frames) fp = np.zeros( (num_frames, num_joints, num_channels, num_bodies), dtype=np.float32, ) fp[:, :, :, 0] = extracted_joints return np.transpose(fp, [2, 0, 1, 3]) def preprocess( source: str, keypoints_detector, normalization: bool = True, random_choose: bool = True, window_size: int = 120, ) -> np.ndarray: ''' Preprocess the video. Parameters ---------- source : str The path to the video. keypoints_detector : mediapipe.solutions.holistic.Holistic The keypoints detector. normalization : bool, default=True Whether to normalize the data. random_choose : bool, default=True Whether to randomly sample the data. window_size : int, default=120 The window size. Returns ------- np.ndarray The processed inputs for model. ''' inputs = extract_joints(source=source, keypoints_detector=keypoints_detector) T = inputs.shape[1] ori_data = inputs for t in range(T - 1): inputs[:, t, :, :] = ori_data[:, t + 1, :, :] - ori_data[:, t, :, :] inputs[:, T - 1, :, :] = 0 if random_choose: inputs = random_sample_np(inputs, window_size) else: inputs = uniform_sample_np(inputs, window_size) if normalization: assert inputs.shape[0] == 3 inputs[0, :, :, :] = inputs[0, :, :, :] - inputs[0, :, 0, 0].mean(axis=0) inputs[1, :, :, :] = inputs[1, :, :, :] - inputs[1, :, 0, 0].mean(axis=0) return inputs[np.newaxis, :].astype(np.float32) def random_sample_np(data: np.ndarray, size: int) -> np.ndarray: ''' Sample the data randomly. Parameters ---------- data : np.ndarray The data to sample. size : int The size of the data to sample. Returns ------- np.ndarray The sampled data. ''' C, T, V, M = data.shape if T == size: return data interval = int(np.ceil(size / T)) random_list = sorted(random.sample(list(range(T))*interval, size)) return data[:, random_list] def uniform_sample_np(data: np.ndarray, size: int) -> np.ndarray: ''' Sample the data uniformly. Parameters ---------- data : np.ndarray The data to sample. size : int The size of the data to sample. Returns ------- np.ndarray The sampled data. ''' C, T, V, M = data.shape if T == size: return data interval = T / size uniform_list = [int(i * interval) for i in range(size)] return data[:, uniform_list] def calculate_angle( shoulder: list, elbow: list, wrist: list, ) -> float: ''' Calculate the angle between the shoulder, elbow, and wrist. Parameters ---------- shoulder : list Shoulder coordinates. elbow : list Elbow coordinates. wrist : list Wrist coordinates. Returns ------- float Angle in degree between the shoulder, elbow, and wrist. ''' shoulder = np.array(shoulder) elbow = np.array(elbow) wrist = np.array(wrist) radians = np.arctan2(wrist[1] - elbow[1], wrist[0] - elbow[0]) \ - np.arctan2(shoulder[1] - elbow[1], shoulder[0] - elbow[0]) angle = np.abs(radians * 180.0 / np.pi) if angle > 180.0: angle = 360 - angle return angle def do_hands_relax( pose_landmarks: list, angle_threshold: float = 160.0, ) -> bool: ''' Check if the hand is down. Parameters ---------- hand_landmarks : list Hand landmarks. angle_threshold : float, optional Angle threshold, by default 160.0. Returns ------- bool True if the hand is down, False otherwise. ''' if pose_landmarks is None: return True landmarks = pose_landmarks.landmark left_shoulder = [ landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].y, landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, ] left_elbow = [ landmarks[pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[pose.PoseLandmark.LEFT_ELBOW.value].y, landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, ] left_wrist = [ landmarks[pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[pose.PoseLandmark.LEFT_WRIST.value].y, landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility, ] left_angle = calculate_angle(left_shoulder, left_elbow, left_wrist) right_shoulder = [ landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].y, landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, ] right_elbow = [ landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].y, landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, ] right_wrist = [ landmarks[pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[pose.PoseLandmark.RIGHT_WRIST.value].y, landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility, ] right_angle = calculate_angle(right_shoulder, right_elbow, right_wrist) is_visible = all( [ left_shoulder[2] > 0, left_elbow[2] > 0, left_wrist[2] > 0, right_shoulder[2] > 0, right_elbow[2] > 0, right_wrist[2] > 0, ] ) return all( [ is_visible, left_angle < angle_threshold, right_angle < angle_threshold, ] )