Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify | |
import cv2 | |
import numpy as np | |
import tensorflow as tf | |
from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel | |
import torch | |
import os | |
import requests | |
from tempfile import NamedTemporaryFile | |
import gc | |
import tensorflow_hub as hub | |
import logging | |
from PIL import Image | |
# Configure logging | |
logging.basicConfig(level=logging.ERROR) | |
# Ensure that Hugging Face uses the appropriate cache directory | |
os.environ['TRANSFORMERS_CACHE'] = '/app/cache' | |
os.environ['HF_HOME'] = '/app/cache' | |
movenet_model_path = '/models/movenet/movenet_lightning' | |
# Keypoint dictionary for reference | |
KEYPOINT_DICT = { | |
'nose': 0, | |
'left_eye': 1, | |
'right_eye': 2, | |
'left_ear': 3, | |
'right_ear': 4, | |
'left_shoulder': 5, | |
'right_shoulder': 6, | |
'left_elbow': 7, | |
'right_elbow': 8, | |
'left_wrist': 9, | |
'right_wrist': 10, | |
'left_hip': 11, | |
'right_hip': 12, | |
'left_knee': 13, | |
'right_knee': 14, | |
'left_ankle': 15, | |
'right_ankle': 16 | |
} | |
app = Flask(__name__) | |
def process_video(): | |
try: | |
# Clear previous cache | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Get the video URL from the request | |
video_url = request.json.get('videoURL') | |
height = request.json.get('height') | |
weight = request.json.get('weight') | |
wingspan = request.json.get('wingspan') | |
if not video_url: | |
return jsonify({"error": "No video URL provided"}), 400 | |
if not all([height, weight, wingspan]): | |
return jsonify({"error": "Height, weight, and wingspan are required"}), 400 | |
# Download the video from the S3 URL | |
with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: | |
response = requests.get(video_url) | |
if response.status_code != 200: | |
return jsonify({"error": "Failed to download video from the provided URL"}), 400 | |
temp_video_file.write(response.content) | |
video_path = temp_video_file.name | |
# Open the video file | |
cap = cv2.VideoCapture(video_path) | |
frames = [] | |
# Extract 60 frames from the video | |
success, frame = cap.read() | |
frame_count = 0 | |
while success and frame_count < 60: | |
frames.append(frame) | |
success, frame = cap.read() | |
frame_count += 1 | |
cap.release() | |
os.remove(video_path) | |
# Check if the model path exists and load MoveNet model | |
if not os.path.exists(movenet_model_path): | |
# Download the model from TensorFlow Hub | |
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") | |
else: | |
movenet_model = tf.saved_model.load(movenet_model_path) | |
# Process each frame with MoveNet (to get 3D keypoints and detect stance) | |
movenet_results = [] | |
stances = [] | |
guard_up = [] | |
for frame_index, frame in enumerate(frames): | |
input_tensor = tf.image.resize_with_pad(tf.convert_to_tensor(frame, dtype=tf.uint8), 256, 256) | |
input_tensor = tf.cast(input_tensor, dtype=tf.int32) # Cast to int32 instead of float32 | |
input_tensor = tf.expand_dims(input_tensor, axis=0) | |
keypoints = movenet_model.signatures['serving_default'](input_tensor) | |
keypoints_3d = keypoints['output_0'][0].numpy().tolist() # Assuming the model returns 3D keypoints | |
movenet_results.append(keypoints_3d) | |
# Detect stance based on keypoints (using ankles and wrists) | |
left_ankle = keypoints_3d[KEYPOINT_DICT['left_ankle']] | |
right_ankle = keypoints_3d[KEYPOINT_DICT['right_ankle']] | |
left_wrist = keypoints_3d[KEYPOINT_DICT['left_wrist']] | |
right_wrist = keypoints_3d[KEYPOINT_DICT['right_wrist']] | |
if right_ankle[0] < left_ankle[0] and right_wrist[0] < left_wrist[0]: | |
stance = "orthodox" | |
elif left_ankle[0] < right_ankle[0] and left_wrist[0] < right_wrist[0]: | |
stance = "southpaw" | |
else: | |
stance = "unknown" | |
stances.append(stance) | |
# Detect if guard is up (both hands near eye level at the side of the head) | |
nose = keypoints_3d[KEYPOINT_DICT['nose']] | |
guard_threshold = 0.1 # Threshold distance to consider hands near the head | |
left_hand_near_head = abs(left_wrist[1] - nose[1]) < guard_threshold | |
right_hand_near_head = abs(right_wrist[1] - nose[1]) < guard_threshold | |
guard_up.append(left_hand_near_head and right_hand_near_head) | |
# Free up memory used by MoveNet | |
del movenet_model | |
gc.collect() | |
# Generate captions for all 60 frames using BLIP | |
captions = [] | |
blip_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-base').to('cuda' if torch.cuda.is_available() else 'cpu') | |
blip_processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-base') | |
for frame in frames: | |
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image | |
inputs = blip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
with torch.no_grad(): | |
caption = blip_model.generate(**inputs) | |
captions.append(blip_processor.decode(caption[0], skip_special_tokens=True)) | |
# Free up memory used by BLIP | |
del blip_model, blip_processor | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Use CLIP to assess the similarity of frames to a Muay Thai jab prompt, including stance | |
clip_results = [] | |
clip_model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32').to('cuda' if torch.cuda.is_available() else 'cpu') | |
clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32') | |
for i, frame in enumerate(frames): | |
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image | |
stance = stances[i] | |
prompt = f"A person performing a Muay Thai jab in {stance} stance at {height} in in height, {weight} lbs in weight, and a wingspan of {wingspan} cm." | |
text_inputs = clip_processor(text=[prompt], return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
image_inputs = clip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') | |
with torch.no_grad(): | |
image_features = clip_model.get_image_features(**image_inputs) | |
text_features = clip_model.get_text_features(**text_inputs) | |
similarity = torch.nn.functional.cosine_similarity(image_features, text_features) | |
clip_results.append(similarity.item()) | |
# Free up memory used by CLIP | |
del clip_model, clip_processor | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Calculate score based on CLIP results and BLIP captions | |
avg_clip_similarity = sum(clip_results) / len(clip_results) if clip_results else 0 | |
guard_score = sum(guard_up) / len(guard_up) if guard_up else 0 | |
overall_score = (avg_clip_similarity + guard_score) / 2 | |
# Scale the overall score to a range of 0 - 10 | |
overall_score = max(0, min(overall_score * 10, 10)) | |
# Return combined results | |
response = { | |
"movenet_results": movenet_results, | |
"blip_captions": captions, | |
"clip_similarities": clip_results, | |
"stances": stances, | |
"overall_score": overall_score, | |
"guard_score": guard_score | |
} | |
return jsonify(response) | |
except Exception as e: | |
logging.error(str(e)) | |
return jsonify({"error": str(e)}), 500 | |
if __name__ == '__main__': | |
# Clear any cache before starting the Flask server | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Start the Flask app | |
app.run(host='0.0.0.0', port=7860) | |