amitesh11's picture
Upload 3019 files
369fac9 verified
raw
history blame
15 kB
import cv2, pickle
import mediapipe as mp
import numpy as np
import pandas as pd
from .utils import (
calculate_angle,
extract_important_keypoints,
get_static_file_url,
get_drawing_color,
)
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
def analyze_knee_angle(
mp_results,
stage: str,
angle_thresholds: list,
knee_over_toe: bool = False,
draw_to_image: tuple = None,
) -> dict:
"""Calculate angle of each knee while performer at the DOWN position
Args:
mp_results (): MediaPipe Pose results
stage (str): stage of the exercise
angle_thresholds (list): lower and upper limits for the knee angles
knee_over_toe (bool): if knee_over_toe error occur, ignore knee angles. Default to False
draw_to_image (tuple, optional): Contains an OpenCV frame and its dimension. Defaults to None.
Returns:
dict: Statistic from analyze knee angles
"""
results = {
"error": None,
"right": {"error": None, "angle": None},
"left": {"error": None, "angle": None},
}
landmarks = mp_results.pose_landmarks.landmark
# Calculate right knee angle
right_hip = [
landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y,
]
right_knee = [
landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y,
]
right_ankle = [
landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y,
]
results["right"]["angle"] = calculate_angle(right_hip, right_knee, right_ankle)
# Calculate left knee angle
left_hip = [
landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y,
]
left_knee = [
landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y,
]
left_ankle = [
landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x,
landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y,
]
results["left"]["angle"] = calculate_angle(left_hip, left_knee, left_ankle)
# Draw to image
if draw_to_image is not None and stage != "down":
(image, video_dimensions) = draw_to_image
# Visualize angles
cv2.putText(
image,
str(int(results["right"]["angle"])),
tuple(np.multiply(right_knee, video_dimensions).astype(int)),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(255, 255, 255),
1,
cv2.LINE_AA,
)
cv2.putText(
image,
str(int(results["left"]["angle"])),
tuple(np.multiply(left_knee, video_dimensions).astype(int)),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(255, 255, 255),
1,
cv2.LINE_AA,
)
if stage != "down":
return results
# Ignore checking for knee angle error if knee_over_toe error occur
if knee_over_toe:
return results
# Evaluation
results["error"] = False
if angle_thresholds[0] <= results["right"]["angle"] <= angle_thresholds[1]:
results["right"]["error"] = False
else:
results["right"]["error"] = True
results["error"] = True
if angle_thresholds[0] <= results["left"]["angle"] <= angle_thresholds[1]:
results["left"]["error"] = False
else:
results["left"]["error"] = True
results["error"] = True
# Draw to image
if draw_to_image is not None:
(image, video_dimensions) = draw_to_image
if results["error"]:
cv2.rectangle(image, (0, 50), (120, 100), (245, 117, 16), -1)
cv2.putText(
image,
"KNEE ANGLE ERROR",
(10, 62),
cv2.FONT_HERSHEY_COMPLEX,
0.3,
(0, 0, 0),
1,
cv2.LINE_AA,
)
cv2.putText(
image,
"LEFT KNEE" if results["left"]["error"] else "RIGHT KNEE",
(10, 82),
cv2.FONT_HERSHEY_COMPLEX,
0.3,
(255, 255, 255),
1,
cv2.LINE_AA,
)
right_color = (255, 255, 255) if not results["right"]["error"] else (0, 0, 255)
left_color = (255, 255, 255) if not results["left"]["error"] else (0, 0, 255)
# Visualize angles
cv2.putText(
image,
str(int(results["right"]["angle"])),
tuple(np.multiply(right_knee, video_dimensions).astype(int)),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
right_color,
1,
cv2.LINE_AA,
)
cv2.putText(
image,
str(int(results["left"]["angle"])),
tuple(np.multiply(left_knee, video_dimensions).astype(int)),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
left_color,
1,
cv2.LINE_AA,
)
return results
class LungeDetection:
STAGE_ML_MODEL_PATH = get_static_file_url("model/lunge_stage_model.pkl")
ERR_ML_MODEL_PATH = get_static_file_url("model/lunge_err_model.pkl")
INPUT_SCALER_PATH = get_static_file_url("model/lunge_input_scaler.pkl")
PREDICTION_PROB_THRESHOLD = 0.8
KNEE_ANGLE_THRESHOLD = [60, 125]
def __init__(self) -> None:
self.init_important_landmarks()
self.load_machine_learning_model()
self.current_stage = ""
self.counter = 0
self.results = []
self.has_error = False
def init_important_landmarks(self) -> None:
"""
Determine Important landmarks for lunge detection
"""
self.important_landmarks = [
"NOSE",
"LEFT_SHOULDER",
"RIGHT_SHOULDER",
"LEFT_HIP",
"RIGHT_HIP",
"LEFT_KNEE",
"RIGHT_KNEE",
"LEFT_ANKLE",
"RIGHT_ANKLE",
"LEFT_HEEL",
"RIGHT_HEEL",
"LEFT_FOOT_INDEX",
"RIGHT_FOOT_INDEX",
]
# Generate all columns of the data frame
self.headers = ["label"] # Label column
for lm in self.important_landmarks:
self.headers += [
f"{lm.lower()}_x",
f"{lm.lower()}_y",
f"{lm.lower()}_z",
f"{lm.lower()}_v",
]
def load_machine_learning_model(self) -> None:
"""
Load machine learning model
"""
if (
not self.STAGE_ML_MODEL_PATH
or not self.INPUT_SCALER_PATH
or not self.ERR_ML_MODEL_PATH
):
raise Exception("Cannot found lunge files for prediction")
try:
with open(self.ERR_ML_MODEL_PATH, "rb") as f:
self.err_model = pickle.load(f)
with open(self.STAGE_ML_MODEL_PATH, "rb") as f:
self.stage_model = pickle.load(f)
with open(self.INPUT_SCALER_PATH, "rb") as f2:
self.input_scaler = pickle.load(f2)
except Exception as e:
raise Exception(f"Error loading model, {e}")
def handle_detected_results(self, video_name: str) -> tuple:
"""
Save frame as evidence
"""
file_name, _ = video_name.split(".")
save_folder = get_static_file_url("images")
for index, error in enumerate(self.results):
try:
image_name = f"{file_name}_{index}.jpg"
cv2.imwrite(f"{save_folder}/{file_name}_{index}.jpg", error["frame"])
self.results[index]["frame"] = image_name
except Exception as e:
print("ERROR cannot save frame: " + str(e))
self.results[index]["frame"] = None
return self.results, self.counter
def clear_results(self) -> None:
self.results = []
self.counter = 0
self.current_stage = ""
self.has_error = False
def detect(self, mp_results, image, timestamp) -> None:
"""
Make Lunge Errors detection
"""
try:
video_dimensions = [image.shape[1], image.shape[0]]
# * Model prediction for LUNGE counter
# Extract keypoints from frame for the input
row = extract_important_keypoints(mp_results, self.important_landmarks)
X = pd.DataFrame([row], columns=self.headers[1:])
X = pd.DataFrame(self.input_scaler.transform(X))
# Make prediction and its probability
stage_predicted_class = self.stage_model.predict(X)[0]
stage_prediction_probabilities = self.stage_model.predict_proba(X)[0]
stage_prediction_probability = round(
stage_prediction_probabilities[stage_prediction_probabilities.argmax()],
2,
)
# Evaluate stage prediction for counter
if (
stage_predicted_class == "I"
and stage_prediction_probability >= self.PREDICTION_PROB_THRESHOLD
):
self.current_stage = "init"
elif (
stage_predicted_class == "M"
and stage_prediction_probability >= self.PREDICTION_PROB_THRESHOLD
):
self.current_stage = "mid"
elif (
stage_predicted_class == "D"
and stage_prediction_probability >= self.PREDICTION_PROB_THRESHOLD
):
if self.current_stage in ["init", "mid"]:
self.counter += 1
self.current_stage = "down"
# Check out errors from a rep to reduce repeated warning
errors_from_this_rep = map(
lambda el: el["stage"],
filter(lambda el: el["counter"] == self.counter, self.results),
)
# Analyze lunge pose
# Knee over toe
k_o_t_error = None
err_predicted_class = None
err_prediction_probabilities = None
err_prediction_probability = None
if self.current_stage == "down":
err_predicted_class = self.err_model.predict(X)[0]
err_prediction_probabilities = self.err_model.predict_proba(X)[0]
err_prediction_probability = round(
err_prediction_probabilities[err_prediction_probabilities.argmax()],
2,
)
if (
err_predicted_class == "L"
and err_prediction_probability >= self.PREDICTION_PROB_THRESHOLD
):
k_o_t_error = "Incorrect"
self.has_error = True
# Limit save error frames saved in a rep
if (
len(self.results) == 0
or "knee over toe" not in errors_from_this_rep
):
self.results.append(
{
"stage": f"knee over toe",
"frame": image,
"timestamp": timestamp,
"counter": self.counter,
}
)
elif (
err_predicted_class == "C"
and err_prediction_probability >= self.PREDICTION_PROB_THRESHOLD
):
k_o_t_error = "Correct"
self.has_error = False
else:
self.has_error = False
# Analyze lunge pose
# * Knee angle
analyzed_results = analyze_knee_angle(
mp_results=mp_results,
stage=self.current_stage,
angle_thresholds=self.KNEE_ANGLE_THRESHOLD,
knee_over_toe=(k_o_t_error == "Incorrect"),
draw_to_image=(image, video_dimensions),
)
# Stage management for saving results
self.has_error = (
analyzed_results["error"] if not self.has_error else self.has_error
)
if analyzed_results["error"]:
# Limit save error frames saved in a rep
if len(self.results) == 0 or "knee angle" not in errors_from_this_rep:
self.results.append(
{
"stage": f"knee angle",
"frame": image,
"timestamp": timestamp,
"counter": self.counter,
}
)
# Visualization
# Draw landmarks and connections
landmark_color, connection_color = get_drawing_color(self.has_error)
mp_drawing.draw_landmarks(
image,
mp_results.pose_landmarks,
mp_pose.POSE_CONNECTIONS,
mp_drawing.DrawingSpec(
color=landmark_color, thickness=2, circle_radius=2
),
mp_drawing.DrawingSpec(
color=connection_color, thickness=2, circle_radius=1
),
)
# Status box
cv2.rectangle(image, (0, 0), (325, 40), (245, 117, 16), -1)
# Display Stage prediction for count
cv2.putText(
image,
"COUNT",
(10, 12),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(0, 0, 0),
1,
cv2.LINE_AA,
)
cv2.putText(
image,
f'{str(self.counter)}, {stage_predicted_class.split(" ")[0]}, {str(stage_prediction_probability)}',
(5, 30),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(255, 255, 255),
1,
cv2.LINE_AA,
)
# Display KNEE_OVER_TOE error prediction
cv2.putText(
image,
"KNEE_OVER_TOE",
(145, 12),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(0, 0, 0),
1,
cv2.LINE_AA,
)
cv2.putText(
image,
f"{err_predicted_class}, {err_prediction_probability}, {k_o_t_error}",
(135, 30),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(255, 255, 255),
1,
cv2.LINE_AA,
)
except Exception as e:
print(f"Error while detecting lunge errors: {e}")