from flask import Flask, request, jsonify import cv2 import numpy as np import tensorflow as tf from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel import torch import os import requests from tempfile import NamedTemporaryFile import gc import tensorflow_hub as hub import logging from PIL import Image # Configure logging logging.basicConfig(level=logging.ERROR) # Ensure that Hugging Face uses the appropriate cache directory os.environ['TRANSFORMERS_CACHE'] = '/app/cache' os.environ['HF_HOME'] = '/app/cache' movenet_model_path = '/models/movenet/movenet_lightning' # Keypoint dictionary for reference KEYPOINT_DICT = { 'nose': 0, 'left_eye': 1, 'right_eye': 2, 'left_ear': 3, 'right_ear': 4, 'left_shoulder': 5, 'right_shoulder': 6, 'left_elbow': 7, 'right_elbow': 8, 'left_wrist': 9, 'right_wrist': 10, 'left_hip': 11, 'right_hip': 12, 'left_knee': 13, 'right_knee': 14, 'left_ankle': 15, 'right_ankle': 16 } app = Flask(__name__) @app.route('/process_video', methods=['POST']) def process_video(): try: # Clear previous cache gc.collect() torch.cuda.empty_cache() # Get the video URL from the request video_url = request.json.get('videoURL') height = request.json.get('height') weight = request.json.get('weight') wingspan = request.json.get('wingspan') if not video_url: return jsonify({"error": "No video URL provided"}), 400 if not all([height, weight, wingspan]): return jsonify({"error": "Height, weight, and wingspan are required"}), 400 # Download the video from the S3 URL with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file: response = requests.get(video_url) if response.status_code != 200: return jsonify({"error": "Failed to download video from the provided URL"}), 400 temp_video_file.write(response.content) video_path = temp_video_file.name # Open the video file cap = cv2.VideoCapture(video_path) frames = [] # Extract 60 frames from the video success, frame = cap.read() frame_count = 0 while success and frame_count < 60: frames.append(frame) success, frame = cap.read() frame_count += 1 cap.release() os.remove(video_path) # Check if the model path exists and load MoveNet model if not os.path.exists(movenet_model_path): # Download the model from TensorFlow Hub movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") else: movenet_model = tf.saved_model.load(movenet_model_path) # Process each frame with MoveNet (to get 3D keypoints and detect stance) movenet_results = [] stances = [] guard_up = [] for frame_index, frame in enumerate(frames): input_tensor = tf.image.resize_with_pad(tf.convert_to_tensor(frame, dtype=tf.uint8), 256, 256) input_tensor = tf.cast(input_tensor, dtype=tf.int32) # Cast to int32 instead of float32 input_tensor = tf.expand_dims(input_tensor, axis=0) keypoints = movenet_model.signatures['serving_default'](input_tensor) keypoints_3d = keypoints['output_0'][0].numpy().tolist() # Assuming the model returns 3D keypoints movenet_results.append(keypoints_3d) # Detect stance based on keypoints (using ankles and wrists) left_ankle = keypoints_3d[KEYPOINT_DICT['left_ankle']] right_ankle = keypoints_3d[KEYPOINT_DICT['right_ankle']] left_wrist = keypoints_3d[KEYPOINT_DICT['left_wrist']] right_wrist = keypoints_3d[KEYPOINT_DICT['right_wrist']] if right_ankle[0] < left_ankle[0] and right_wrist[0] < left_wrist[0]: stance = "orthodox" elif left_ankle[0] < right_ankle[0] and left_wrist[0] < right_wrist[0]: stance = "southpaw" else: stance = "unknown" stances.append(stance) # Detect if guard is up (both hands near eye level at the side of the head) nose = keypoints_3d[KEYPOINT_DICT['nose']] guard_threshold = 0.1 # Threshold distance to consider hands near the head left_hand_near_head = abs(left_wrist[1] - nose[1]) < guard_threshold right_hand_near_head = abs(right_wrist[1] - nose[1]) < guard_threshold guard_up.append(left_hand_near_head and right_hand_near_head) # Free up memory used by MoveNet del movenet_model gc.collect() # Generate captions for all 60 frames using BLIP captions = [] blip_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-base').to('cuda' if torch.cuda.is_available() else 'cpu') blip_processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-base') for frame in frames: frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image inputs = blip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') with torch.no_grad(): caption = blip_model.generate(**inputs) captions.append(blip_processor.decode(caption[0], skip_special_tokens=True)) # Free up memory used by BLIP del blip_model, blip_processor torch.cuda.empty_cache() gc.collect() # Use CLIP to assess the similarity of frames to a Muay Thai jab prompt, including stance clip_results = [] clip_model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32').to('cuda' if torch.cuda.is_available() else 'cpu') clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32') for i, frame in enumerate(frames): frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Convert frame to PIL image stance = stances[i] prompt = f"A person performing a Muay Thai jab in {stance} stance at {height} in in height, {weight} lbs in weight, and a wingspan of {wingspan} cm." text_inputs = clip_processor(text=[prompt], return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') image_inputs = clip_processor(images=frame_pil, return_tensors="pt").to('cuda' if torch.cuda.is_available() else 'cpu') with torch.no_grad(): image_features = clip_model.get_image_features(**image_inputs) text_features = clip_model.get_text_features(**text_inputs) similarity = torch.nn.functional.cosine_similarity(image_features, text_features) clip_results.append(similarity.item()) # Free up memory used by CLIP del clip_model, clip_processor torch.cuda.empty_cache() gc.collect() # Calculate score based on CLIP results and BLIP captions avg_clip_similarity = sum(clip_results) / len(clip_results) if clip_results else 0 guard_score = sum(guard_up) / len(guard_up) if guard_up else 0 overall_score = (avg_clip_similarity + guard_score) / 2 # Scale the overall score to a range of 0 - 10 overall_score = max(0, min(overall_score * 10, 10)) # Return combined results response = { "movenet_results": movenet_results, "blip_captions": captions, "clip_similarities": clip_results, "stances": stances, "overall_score": overall_score, "guard_score": guard_score } return jsonify(response) except Exception as e: logging.error(str(e)) return jsonify({"error": str(e)}), 500 if __name__ == '__main__': # Clear any cache before starting the Flask server gc.collect() torch.cuda.empty_cache() # Start the Flask app app.run(host='0.0.0.0', port=7860)