Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| from collections import defaultdict | |
| from typing import Dict, List, Tuple, Optional | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form | |
| from fastapi.responses import ORJSONResponse | |
| from fastapi.encoders import jsonable_encoder | |
| from .models import Gesture, GestureResponse, GESTURE_MAPPING, FULL_GESTURE_MAPPING, PRODUCTION_GESTURE_MAPPING | |
| from .config import get_logfire_token, is_monitoring_enabled | |
| # Import the gesture detection components | |
| from .main_controller import MainController | |
| # Configure logfire monitoring if token is available | |
| logfire = None | |
| if is_monitoring_enabled(): | |
| try: | |
| import logfire | |
| logfire.configure(token=get_logfire_token()) | |
| logfire.instrument_fastapi = logfire.instrument_fastapi | |
| except ImportError: | |
| logfire = None | |
| app = FastAPI(default_response_class=ORJSONResponse) | |
| # Instrument FastAPI with logfire if monitoring is enabled | |
| if logfire is not None: | |
| logfire.instrument_fastapi(app, capture_headers=True) | |
| def process_video_for_gestures(video_path: str, detector_path: str = "models/hand_detector.onnx", | |
| classifier_path: str = "models/crops_classifier.onnx", | |
| frame_skip: int = 1) -> List[Gesture]: | |
| """ | |
| Process a video file to detect gestures using the MainController. | |
| Parameters | |
| ---------- | |
| video_path : str | |
| Path to the video file to process | |
| detector_path : str | |
| Path to the hand detection ONNX model | |
| classifier_path : str | |
| Path to the gesture classification ONNX model | |
| frame_skip : int | |
| Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame) | |
| Returns | |
| ------- | |
| List[Gesture] | |
| List of detected gestures with duration and confidence | |
| """ | |
| # Create monitoring span for video processing | |
| span_context = None | |
| if logfire is not None: | |
| span_context = logfire.span('process_video_for_gestures', | |
| video_path=video_path, | |
| detector_path=detector_path, | |
| classifier_path=classifier_path) | |
| span_context.__enter__() | |
| try: | |
| # Initialize the main controller | |
| if logfire is not None: | |
| with logfire.span('initialize_controller'): | |
| controller = MainController(detector_path, classifier_path) | |
| else: | |
| controller = MainController(detector_path, classifier_path) | |
| # Open video file | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Could not open video file: {video_path}") | |
| # Get video properties for monitoring | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if logfire is not None: | |
| logfire.info('Video properties', | |
| total_frames=total_frames, | |
| fps=fps, | |
| duration_seconds=total_frames/fps if fps > 0 else 0) | |
| # Track gestures per hand ID | |
| gesture_tracks: Dict[int, List[Tuple[int, float]]] = defaultdict(list) # {hand_id: [(gesture_id, confidence), ...]} | |
| frame_count = 0 | |
| processed_frames = 0 | |
| detection_stats = { | |
| 'frames_with_detections': 0, | |
| 'total_detections': 0, | |
| 'gesture_counts': defaultdict(int) | |
| } | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Skip frames based on frame_skip parameter | |
| if frame_count % frame_skip == 0: | |
| # Process frame through the controller | |
| bboxes, ids, labels = controller(frame) | |
| processed_frames += 1 | |
| if bboxes is not None and ids is not None and labels is not None: | |
| detection_stats['frames_with_detections'] += 1 | |
| detection_stats['total_detections'] += len(bboxes) | |
| # Track gestures for each detected hand | |
| for i in range(len(bboxes)): | |
| hand_id = int(ids[i]) | |
| gesture_id = labels[i] | |
| if gesture_id is not None: | |
| # Get confidence from bbox (assuming it's the last element) | |
| confidence = 0.8 # Default confidence, could be extracted from bbox if available | |
| gesture_tracks[hand_id].append((gesture_id, confidence)) | |
| detection_stats['gesture_counts'][gesture_id] += 1 | |
| # Log individual detections for debugging | |
| if logfire is not None: | |
| gesture_name = FULL_GESTURE_MAPPING.get(gesture_id, f"unknown_{gesture_id}") | |
| logfire.debug('Hand detection', | |
| frame=frame_count, | |
| hand_id=hand_id, | |
| gesture_id=gesture_id, | |
| gesture_name=gesture_name, | |
| confidence=confidence, | |
| bbox=bboxes[i].tolist() if len(bboxes[i]) >= 4 else None) | |
| else: | |
| # Advance tracker on skipped frames to keep state consistent | |
| controller.update(np.empty((0, 5)), None) | |
| frame_count += 1 | |
| # Log progress every 100 frames | |
| if frame_count % 100 == 0 and logfire is not None: | |
| progress = (frame_count / total_frames) * 100 if total_frames > 0 else 0 | |
| logfire.info('Processing progress', | |
| frame=frame_count, | |
| total_frames=total_frames, | |
| progress_percent=round(progress, 2)) | |
| finally: | |
| cap.release() | |
| # Log final detection statistics | |
| if logfire is not None: | |
| logfire.info('Detection statistics', | |
| total_frames=frame_count, | |
| processed_frames=processed_frames, | |
| frame_skip=frame_skip, | |
| frames_with_detections=detection_stats['frames_with_detections'], | |
| total_detections=detection_stats['total_detections'], | |
| detection_rate=detection_stats['frames_with_detections']/processed_frames if processed_frames > 0 else 0, | |
| gesture_counts=dict(detection_stats['gesture_counts'])) | |
| # Process gesture tracks to find continuous gestures | |
| detected_gestures = [] | |
| for hand_id, gesture_sequence in gesture_tracks.items(): | |
| if not gesture_sequence: | |
| continue | |
| # Group consecutive identical gestures | |
| current_gesture = None | |
| current_duration = 0 | |
| current_confidence = 0.0 | |
| for gesture_id, confidence in gesture_sequence: | |
| if current_gesture is None or current_gesture != gesture_id: | |
| # Save previous gesture if it was significant | |
| # Adjust minimum duration based on frame skip | |
| min_duration = max(5, frame_skip * 2) # At least 2 processed frames | |
| if current_gesture is not None and current_duration >= min_duration: | |
| gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}") | |
| avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0 | |
| # Scale duration back to original frame count | |
| scaled_duration = current_duration * frame_skip | |
| detected_gestures.append(Gesture( | |
| gesture=gesture_name, | |
| duration=scaled_duration, | |
| confidence=avg_confidence | |
| )) | |
| # Log significant gesture detection | |
| if logfire is not None: | |
| logfire.info('Significant gesture detected', | |
| hand_id=hand_id, | |
| gesture=gesture_name, | |
| duration_frames=current_duration, | |
| confidence=avg_confidence) | |
| # Start new gesture | |
| current_gesture = gesture_id | |
| current_duration = 1 | |
| current_confidence = confidence | |
| else: | |
| # Continue current gesture | |
| current_duration += 1 | |
| current_confidence += confidence | |
| # Don't forget the last gesture | |
| min_duration = max(5, frame_skip * 2) # At least 2 processed frames | |
| if current_gesture is not None and current_duration >= min_duration: | |
| gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}") | |
| avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0 | |
| # Scale duration back to original frame count | |
| scaled_duration = current_duration * frame_skip | |
| detected_gestures.append(Gesture( | |
| gesture=gesture_name, | |
| duration=scaled_duration, | |
| confidence=avg_confidence | |
| )) | |
| # Log final gesture detection | |
| if logfire is not None: | |
| logfire.info('Final gesture detected', | |
| hand_id=hand_id, | |
| gesture=gesture_name, | |
| duration_frames=current_duration, | |
| confidence=avg_confidence) | |
| # Log final results | |
| if logfire is not None: | |
| logfire.info('Video processing completed', | |
| total_gestures_detected=len(detected_gestures), | |
| unique_hands=len(gesture_tracks), | |
| gestures=[{'gesture': g.gesture, 'duration': g.duration, 'confidence': g.confidence} for g in detected_gestures]) | |
| return detected_gestures | |
| finally: | |
| if span_context is not None: | |
| span_context.__exit__(None, None, None) | |
| async def health(): | |
| """Health check endpoint.""" | |
| if logfire is not None: | |
| logfire.info('Health check requested') | |
| return {"message": "OK"} | |
| async def detect_gestures(video: UploadFile = File(...), frame_skip: int = Form(1)): | |
| """ | |
| Detect gestures in an uploaded video file. | |
| Parameters | |
| ---------- | |
| video : UploadFile | |
| The video file to process | |
| frame_skip : int | |
| Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame) | |
| Returns | |
| ------- | |
| GestureResponse | |
| Response containing detected gestures with duration and confidence | |
| """ | |
| # Log request details | |
| if logfire is not None: | |
| logfire.info('Gesture detection request received', | |
| filename=video.filename, | |
| content_type=video.content_type, | |
| content_length=video.size if hasattr(video, 'size') else 'unknown') | |
| # Validate file type | |
| if not video.content_type.startswith('video/'): | |
| if logfire is not None: | |
| logfire.warning('Invalid file type received', content_type=video.content_type) | |
| raise HTTPException(status_code=400, detail="File must be a video") | |
| # Create temporary file to save uploaded video | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: | |
| try: | |
| # Write uploaded content to temporary file | |
| content = await video.read() | |
| temp_file.write(content) | |
| temp_file.flush() | |
| if logfire is not None: | |
| logfire.info('Video file saved for processing', | |
| temp_file=temp_file.name, | |
| file_size_bytes=len(content)) | |
| # Process the video with frame skip parameter | |
| gestures = process_video_for_gestures(temp_file.name, frame_skip=frame_skip) | |
| if logfire is not None: | |
| logfire.info('Gesture detection completed successfully', | |
| total_gestures=len(gestures), | |
| gestures=[g.gesture for g in gestures]) | |
| return GestureResponse(gestures=gestures) | |
| except Exception as e: | |
| if logfire is not None: | |
| logfire.error('Error processing video', | |
| error=str(e), | |
| error_type=type(e).__name__, | |
| temp_file=temp_file.name) | |
| raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}") | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(temp_file.name): | |
| os.unlink(temp_file.name) | |
| if logfire is not None: | |
| logfire.debug('Temporary file cleaned up', temp_file=temp_file.name) | |