SAM-SLR-V1 / utils /data.py
votuongquan2004@gmail.com
update data.py
5362fbc
raw
history blame
9.57 kB
import cv2
import torch
import random
import numpy as np
from mediapipe.python.solutions import pose
SELECTED_JOINTS = {
27: {
'pose': [0, 11, 12, 13, 14, 15, 16],
'hand': [0, 4, 5, 8, 9, 12, 13, 16, 17, 20],
}, # 27
}
def pad(joints: np.ndarray, num_frames: int = 150) -> np.ndarray:
'''
Add padding to the joints.
Parameters
----------
joints : np.ndarray
The joints to pad.
num_frames : int, default=150
The number of frames to pad.
Returns
-------
np.ndarray
The padded joints.
'''
if joints.shape[0] < num_frames:
L = joints.shape[0]
padded_joints = np.zeros((num_frames, joints.shape[1], joints.shape[2]))
padded_joints[:L, :, :] = joints
rest = num_frames - L
num = int(np.ceil(rest / L))
pad = np.concatenate([joints for _ in range(num)], 0)[:rest]
padded_joints[L:, :, :] = pad
else:
padded_joints = joints[:num_frames]
return padded_joints
def extract_joints(
source: str,
keypoints_detector,
resize_to: tuple = (256, 256),
num_joints: int = 27,
num_frames: int = 150,
num_bodies: int = 1,
num_channels: int = 3,
) -> np.ndarray:
'''
Extract the joints from the video.
Parameters
----------
source : str
The path to the video.
keypoints_detector : mediapipe.solutions.holistic.Holistic
The keypoints detector.
resize_to : tuple, default=(256, 256)
The size to resize the image.
num_joints : int, default=27
The number of joints.
num_frames : int, default=150
The number of frames.
num_bodies : int, default=1
The number of bodies.
num_channels : int, default=3
The number of channels.
Returns
-------
np.ndarray
The extracted joints.
'''
cap = cv2.VideoCapture(source)
extracted_joints = []
while cap.isOpened():
success, image = cap.read()
if not success:
break
image = cv2.resize(image, resize_to)
image = cv2.flip(image, flipCode=1)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
frame_joints = []
results = keypoints_detector.process(image)
pose = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['pose'])
if results.pose_landmarks is not None:
pose = [
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility)
for i, landmark in enumerate(results.pose_landmarks.landmark)
if i in SELECTED_JOINTS[num_joints]['pose']
]
frame_joints.extend(pose)
left_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand'])
if results.left_hand_landmarks is not None:
left_hand = [
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility)
for i, landmark in enumerate(results.left_hand_landmarks.landmark)
if i in SELECTED_JOINTS[num_joints]['hand']
]
frame_joints.extend(left_hand)
right_hand = [(0.0, 0.0, 0.0)] * len(SELECTED_JOINTS[num_joints]['hand'])
if results.right_hand_landmarks is not None:
right_hand = [
(landmark.x * resize_to[0], landmark.y * resize_to[1], landmark.visibility)
for i, landmark in enumerate(results.right_hand_landmarks.landmark)
if i in SELECTED_JOINTS[num_joints]['hand']
]
frame_joints.extend(right_hand)
assert len(frame_joints) == num_joints, \
f'Expected {num_joints} joints, got {len(frame_joints)} joints.'
extracted_joints.append(frame_joints)
extracted_joints = np.array(extracted_joints)
extracted_joints = pad(extracted_joints, num_frames=num_frames)
fp = np.zeros(
(num_frames, num_joints, num_channels, num_bodies),
dtype=np.float32,
)
fp[:, :, :, 0] = extracted_joints
return np.transpose(fp, [2, 0, 1, 3])
def preprocess(
source: str,
keypoints_detector,
normalization: bool = True,
random_choose: bool = True,
window_size: int = 120,
) -> np.ndarray:
'''
Preprocess the video.
Parameters
----------
source : str
The path to the video.
keypoints_detector : mediapipe.solutions.holistic.Holistic
The keypoints detector.
normalization : bool, default=True
Whether to normalize the data.
random_choose : bool, default=True
Whether to randomly sample the data.
window_size : int, default=120
The window size.
Returns
-------
np.ndarray
The processed inputs for model.
'''
inputs = extract_joints(source=source, keypoints_detector=keypoints_detector)
# T = inputs.shape[1]
# ori_data = inputs
# for t in range(T - 1):
# inputs[:, t, :, :] = ori_data[:, t + 1, :, :] - ori_data[:, t, :, :]
# inputs[:, T - 1, :, :] = 0
if random_choose:
inputs = random_sample_np(inputs, window_size)
else:
inputs = uniform_sample_np(inputs, window_size)
# if normalization:
# assert inputs.shape[0] == 3
# inputs[0, :, :, :] = inputs[0, :, :, :] - inputs[0, :, 0, 0].mean(axis=0)
# inputs[1, :, :, :] = inputs[1, :, :, :] - inputs[1, :, 0, 0].mean(axis=0)
return inputs[np.newaxis, :].astype(np.float32)
def random_sample_np(data: np.ndarray, size: int) -> np.ndarray:
'''
Sample the data randomly.
Parameters
----------
data : np.ndarray
The data to sample.
size : int
The size of the data to sample.
Returns
-------
np.ndarray
The sampled data.
'''
C, T, V, M = data.shape
if T == size:
return data
interval = int(np.ceil(size / T))
random_list = sorted(random.sample(list(range(T))*interval, size))
return data[:, random_list]
def uniform_sample_np(data: np.ndarray, size: int) -> np.ndarray:
'''
Sample the data uniformly.
Parameters
----------
data : np.ndarray
The data to sample.
size : int
The size of the data to sample.
Returns
-------
np.ndarray
The sampled data.
'''
C, T, V, M = data.shape
if T == size:
return data
interval = T / size
uniform_list = [int(i * interval) for i in range(size)]
return data[:, uniform_list]
def calculate_angle(
shoulder: list,
elbow: list,
wrist: list,
) -> float:
'''
Calculate the angle between the shoulder, elbow, and wrist.
Parameters
----------
shoulder : list
Shoulder coordinates.
elbow : list
Elbow coordinates.
wrist : list
Wrist coordinates.
Returns
-------
float
Angle in degree between the shoulder, elbow, and wrist.
'''
shoulder = np.array(shoulder)
elbow = np.array(elbow)
wrist = np.array(wrist)
radians = np.arctan2(wrist[1] - elbow[1], wrist[0] - elbow[0]) \
- np.arctan2(shoulder[1] - elbow[1], shoulder[0] - elbow[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
def do_hands_relax(
pose_landmarks: list,
angle_threshold: float = 160.0,
) -> bool:
'''
Check if the hand is down.
Parameters
----------
hand_landmarks : list
Hand landmarks.
angle_threshold : float, optional
Angle threshold, by default 160.0.
Returns
-------
bool
True if the hand is down, False otherwise.
'''
if pose_landmarks is None:
return True
landmarks = pose_landmarks.landmark
left_shoulder = [
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].x,
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].y,
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility,
]
left_elbow = [
landmarks[pose.PoseLandmark.LEFT_ELBOW.value].x,
landmarks[pose.PoseLandmark.LEFT_ELBOW.value].y,
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility,
]
left_wrist = [
landmarks[pose.PoseLandmark.LEFT_WRIST.value].x,
landmarks[pose.PoseLandmark.LEFT_WRIST.value].y,
landmarks[pose.PoseLandmark.LEFT_SHOULDER.value].visibility,
]
left_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
right_shoulder = [
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].x,
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].y,
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility,
]
right_elbow = [
landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].x,
landmarks[pose.PoseLandmark.RIGHT_ELBOW.value].y,
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility,
]
right_wrist = [
landmarks[pose.PoseLandmark.RIGHT_WRIST.value].x,
landmarks[pose.PoseLandmark.RIGHT_WRIST.value].y,
landmarks[pose.PoseLandmark.RIGHT_SHOULDER.value].visibility,
]
right_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
is_visible = all(
[
left_shoulder[2] > 0,
left_elbow[2] > 0,
left_wrist[2] > 0,
right_shoulder[2] > 0,
right_elbow[2] > 0,
right_wrist[2] > 0,
]
)
return all(
[
is_visible,
left_angle < angle_threshold,
right_angle < angle_threshold,
]
)