Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify | |
import cv2 | |
import numpy as np | |
import tensorflow as tf | |
from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel | |
import torch | |
import os | |
import requests | |
from tempfile import NamedTemporaryFile | |
import gc | |
import tensorflow_hub as hub | |
# Ensure that Hugging Face uses the appropriate cache directory | |
os.environ['TRANSFORMERS_CACHE'] = '/app/cache' | |
os.environ['HF_HOME'] = '/app/cache' | |
movenet_model_path = '/models/movenet/movenet_lightning' | |
# Check if the model path exists | |
if not os.path.exists(movenet_model_path): | |
# Download the model from TensorFlow Hub | |
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") | |
else: | |
movenet_model = tf.saved_model.load(movenet_model_path) | |
# Load BLIP model | |
blip_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-large') | |
blip_processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-large') | |
# Load CLIP model | |
clip_model = CLIPModel.from_pretrained('openai/clip-vit-large-patch14') | |
clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-large-patch14') | |
# Keypoint dictionary for reference | |
KEYPOINT_DICT = { | |
'nose': 0, | |
'left_eye': 1, | |
'right_eye': 2, | |
'left_ear': 3, | |
'right_ear': 4, | |
'left_shoulder': 5, | |
'right_shoulder': 6, | |
'left_elbow': 7, | |
'right_elbow': 8, | |
'left_wrist': 9, | |
'right_wrist': 10, | |
'left_hip': 11, | |
'right_hip': 12, | |
'left_knee': 13, | |
'right_knee': 14, | |
'left_ankle': 15, | |
'right_ankle': 16 | |
} | |
app = Flask(__name__) | |
def process_video(): | |
try: | |
# Clear previous cache | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Get the video URL from the request | |
video_url = request.json.get('videoURL') | |
height = request.json.get('height') | |
weight = request.json.get('weight') | |
wingspan = request.json.get('wingspan') | |
if not video_url: | |
return jsonify({"error": "No video URL provided"}), 400 | |
if not all([height, weight, wingspan]): | |
return jsonify({"error": "Height, weight, and wingspan are required"}), 400 | |
# Download the video from the S3 URL | |
with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: | |
response = requests.get(video_url) | |
if response.status_code != 200: | |
return jsonify({"error": "Failed to download video from the provided URL"}), 400 | |
temp_video_file.write(response.content) | |
video_path = temp_video_file.name | |
# Open the video file | |
cap = cv2.VideoCapture(video_path) | |
frames = [] | |
# Extract 60 frames from the video | |
success, frame = cap.read() | |
frame_count = 0 | |
while success and frame_count < 20: | |
frames.append(frame) | |
success, frame = cap.read() | |
frame_count += 1 | |
cap.release() | |
os.remove(video_path) | |
# Process each frame with MoveNet (to get 3D keypoints and detect stance) | |
movenet_results = [] | |
stances = [] | |
hip_rotations = [] | |
arm_extensions = [] | |
stepping_jabs = [] | |
guard_up = [] | |
hand_returned = [] | |
hips_width_apart = [] | |
leg_angle_correct = [] | |
punch_started = False | |
initial_left_wrist = None | |
initial_right_wrist = None | |
for frame_index, frame in enumerate(frames): | |
input_tensor = tf.image.resize_with_pad(tf.convert_to_tensor(frame, dtype=tf.uint8), 128, 128) | |
input_tensor = tf.cast(input_tensor, dtype=tf.int32) # Cast to int32 instead of float32 | |
input_tensor = tf.expand_dims(input_tensor, axis=0) | |
keypoints = movenet_model.signatures['serving_default'](input_tensor) | |
keypoints_3d = keypoints['output_0'][0].numpy().tolist() # Assuming the model returns 3D keypoints | |
movenet_results.append(keypoints_3d) | |
# Detect stance based on keypoints (using ankles and wrists) | |
left_ankle = keypoints_3d[KEYPOINT_DICT['left_ankle']] | |
right_ankle = keypoints_3d[KEYPOINT_DICT['right_ankle']] | |
left_wrist = keypoints_3d[KEYPOINT_DICT['left_wrist']] | |
right_wrist = keypoints_3d[KEYPOINT_DICT['right_wrist']] | |
if right_ankle[0] < left_ankle[0] and right_wrist[0] < left_wrist[0]: | |
stance = "orthodox" | |
elif left_ankle[0] < right_ankle[0] and left_wrist[0] < right_wrist[0]: | |
stance = "southpaw" | |
else: | |
stance = "unknown" | |
stances.append(stance) | |
# Detect if guard is up (both hands near eye level at the side of the head) | |
nose = keypoints_3d[KEYPOINT_DICT['nose']] | |
guard_threshold = 0.1 # Threshold distance to consider hands near the head | |
left_hand_near_head = abs(left_wrist[1] - nose[1]) < guard_threshold | |
right_hand_near_head = abs(right_wrist[1] - nose[1]) < guard_threshold | |
guard_up.append(left_hand_near_head and right_hand_near_head) | |
# Determine if the punch has started (based on wrist movement) | |
if frame_index > 0: | |
previous_left_wrist = movenet_results[frame_index - 1][KEYPOINT_DICT['left_wrist']] | |
previous_right_wrist = movenet_results[frame_index - 1][KEYPOINT_DICT['right_wrist']] | |
if stance == "orthodox" and (left_wrist[0] - previous_left_wrist[0]) > 0.05: | |
punch_started = True | |
if initial_left_wrist is None: | |
initial_left_wrist = left_wrist | |
elif stance == "southpaw" and (right_wrist[0] - previous_right_wrist[0]) > 0.05: | |
punch_started = True | |
if initial_right_wrist is None: | |
initial_right_wrist = right_wrist | |
# Detect hip rotation (based on left and right hips, considering stance and punch start) | |
left_hip = keypoints_3d[KEYPOINT_DICT['left_hip']] | |
right_hip = keypoints_3d[KEYPOINT_DICT['right_hip']] | |
if punch_started: | |
if stance == "orthodox": | |
hip_rotation = right_hip[0] - left_hip[0] # Right hip should move forward | |
elif stance == "southpaw": | |
hip_rotation = left_hip[0] - right_hip[0] # Left hip should move forward | |
else: | |
hip_rotation = 0 | |
else: | |
hip_rotation = 0 | |
hip_rotations.append(hip_rotation) | |
# Detect full arm extension (based on shoulder, elbow, and wrist, considering stance) | |
left_shoulder = keypoints_3d[KEYPOINT_DICT['left_shoulder']] | |
left_elbow = keypoints_3d[KEYPOINT_DICT['left_elbow']] | |
right_shoulder = keypoints_3d[KEYPOINT_DICT['right_shoulder']] | |
right_elbow = keypoints_3d[KEYPOINT_DICT['right_elbow']] | |
if stance == "orthodox": | |
lead_arm_extension = np.linalg.norm(np.array(left_wrist) - np.array(left_shoulder)) | |
elif stance == "southpaw": | |
lead_arm_extension = np.linalg.norm(np.array(right_wrist) - np.array(right_shoulder)) | |
else: | |
lead_arm_extension = 0 | |
arm_extensions.append(lead_arm_extension) | |
# Detect stepping with the jab and coming back (based on ankles, considering stance and punch start) | |
if punch_started and frame_index > 0: | |
previous_left_ankle = movenet_results[frame_index - 1][KEYPOINT_DICT['left_ankle']] | |
previous_right_ankle = movenet_results[frame_index - 1][KEYPOINT_DICT['right_ankle']] | |
if stance == "orthodox": | |
step_movement = (left_ankle[0] - previous_left_ankle[0]) > 0.05 # Lead foot is left | |
elif stance == "southpaw": | |
step_movement = (right_ankle[0] - previous_right_ankle[0]) > 0.05 # Lead foot is right | |
else: | |
step_movement = False | |
stepping_jabs.append(step_movement) | |
else: | |
stepping_jabs.append(False) | |
# Detect if the hand returns to the initial position after the punch | |
if punch_started: | |
if stance == "orthodox" and initial_left_wrist is not None: | |
hand_returned.append(np.linalg.norm(np.array(left_wrist) - np.array(initial_left_wrist)) < 0.05) | |
elif stance == "southpaw" and initial_right_wrist is not None: | |
hand_returned.append(np.linalg.norm(np.array(right_wrist) - np.array(initial_right_wrist)) < 0.05) | |
else: | |
hand_returned.append(False) | |
else: | |
hand_returned.append(False) | |
# Detect if hips are shoulder width apart | |
left_shoulder = keypoints_3d[KEYPOINT_DICT['left_shoulder']] | |
right_shoulder = keypoints_3d[KEYPOINT_DICT['right_shoulder']] | |
shoulder_width = abs(left_shoulder[0] - right_shoulder[0]) | |
hips_width = abs(left_hip[0] - right_hip[0]) | |
hips_width_apart.append(hips_width > 0.9 * shoulder_width and hips_width < 1.1 * shoulder_width) | |
# Detect if the back leg is at a 45 degree angle outward (for orthodox and southpaw) | |
if stance == "orthodox": | |
right_leg_angle = np.arctan2(right_ankle[1] - right_hip[1], right_ankle[0] - right_hip[0]) * 180 / np.pi | |
leg_angle_correct.append(40 <= right_leg_angle <= 50) | |
elif stance == "southpaw": | |
left_leg_angle = np.arctan2(left_ankle[1] - left_hip[1], left_ankle[0] - left_hip[0]) * 180 / np.pi | |
leg_angle_correct.append(40 <= left_leg_angle <= 50) | |
else: | |
leg_angle_correct.append(False) | |
# Generate captions for all 60 frames using BLIP | |
captions = [] | |
for frame in frames: | |
inputs = blip_processor(images=frame, return_tensors="pt") | |
with torch.no_grad(): | |
caption = blip_model.generate(**inputs) | |
captions.append(blip_processor.decode(caption[0], skip_special_tokens=True)) | |
# Use CLIP to assess the similarity of frames to a Muay Thai jab prompt, including stance | |
clip_results = [] | |
for i, frame in enumerate(frames): | |
stance = stances[i] | |
prompt = f"A person performing a Muay Thai jab in {stance} stance at {height} in in height, {weight} lbs in weight, and a wingspan of {wingspan} cm, with hip rotation of {hip_rotations[i]:.2f}, arm extension of {arm_extensions[i]:.2f}, {'stepping forward' if stepping_jabs[i] else 'not stepping'}, {'guard up' if guard_up[i] else 'guard down'}, {'hand returned to initial position' if hand_returned[i] else 'hand not returned'}, {'hips shoulder width apart' if hips_width_apart[i] else 'hips not shoulder width apart'}, and {'correct leg angle' if leg_angle_correct[i] else 'incorrect leg angle'}" | |
text_inputs = clip_processor(text=[prompt], return_tensors="pt") | |
image_inputs = clip_processor(images=frame, return_tensors="pt") | |
with torch.no_grad(): | |
image_features = clip_model.get_image_features(**image_inputs) | |
text_features = clip_model.get_text_features(**text_inputs) | |
similarity = torch.nn.functional.cosine_similarity(image_features, text_features) | |
clip_results.append(similarity.item()) | |
# Calculate score based on CLIP results and BLIP captions | |
avg_clip_similarity = sum(clip_results) / len(clip_results) if clip_results else 0 | |
guard_score = sum(guard_up) / len(guard_up) if guard_up else 0 | |
hand_return_score = sum(hand_returned) / len(hand_returned) if hand_returned else 0 | |
hips_width_score = sum(hips_width_apart) / len(hips_width_apart) if hips_width_apart else 0 | |
leg_angle_score = sum(leg_angle_correct) / len(leg_angle_correct) if leg_angle_correct else 0 | |
overall_score = (avg_clip_similarity + guard_score + hand_return_score + hips_width_score + leg_angle_score) / 5 | |
# Scale the overall score to a range of 0 - 10 | |
overall_score = max(0, min(overall_score * 10, 10)) | |
# Return combined results | |
response = { | |
"movenet_results": movenet_results, | |
"blip_captions": captions, | |
"clip_similarities": clip_results, | |
"stances": stances, | |
"hip_rotations": hip_rotations, | |
"arm_extensions": arm_extensions, | |
"stepping_jabs": stepping_jabs, | |
"hips_width_apart": hips_width_apart, | |
"leg_angle_correct": leg_angle_correct, | |
"overall_score": overall_score, | |
"guard_score": guard_score, | |
"hand_return_score": hand_return_score, | |
"hips_width_score":hips_width_score, | |
"leg_angle_score": leg_angle_score, | |
} | |
return jsonify(response) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# if __name__ == '__main__': | |
# app.run(host='0.0.0.0', port=7860) | |
if __name__ == '__main__': | |
# Clear any cache before starting the Flask server | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Start the Flask app | |
app.run(host='0.0.0.0', port=7860) | |