tommulder's picture
Gesture simplification
afecd35
raw
history blame
14.1 kB
import cv2
import numpy as np
import tempfile
import os
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.responses import ORJSONResponse
from fastapi.encoders import jsonable_encoder
from .models import Gesture, GestureResponse, GESTURE_MAPPING, FULL_GESTURE_MAPPING, PRODUCTION_GESTURE_MAPPING
from .config import get_logfire_token, is_monitoring_enabled
# Import the gesture detection components
from .main_controller import MainController
# Configure logfire monitoring if token is available
logfire = None
if is_monitoring_enabled():
try:
import logfire
logfire.configure(token=get_logfire_token())
logfire.instrument_fastapi = logfire.instrument_fastapi
except ImportError:
logfire = None
app = FastAPI(default_response_class=ORJSONResponse)
# Instrument FastAPI with logfire if monitoring is enabled
if logfire is not None:
logfire.instrument_fastapi(app, capture_headers=True)
def process_video_for_gestures(video_path: str, detector_path: str = "models/hand_detector.onnx",
classifier_path: str = "models/crops_classifier.onnx",
frame_skip: int = 1) -> List[Gesture]:
"""
Process a video file to detect gestures using the MainController.
Parameters
----------
video_path : str
Path to the video file to process
detector_path : str
Path to the hand detection ONNX model
classifier_path : str
Path to the gesture classification ONNX model
frame_skip : int
Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame)
Returns
-------
List[Gesture]
List of detected gestures with duration and confidence
"""
# Create monitoring span for video processing
span_context = None
if logfire is not None:
span_context = logfire.span('process_video_for_gestures',
video_path=video_path,
detector_path=detector_path,
classifier_path=classifier_path)
span_context.__enter__()
try:
# Initialize the main controller
if logfire is not None:
with logfire.span('initialize_controller'):
controller = MainController(detector_path, classifier_path)
else:
controller = MainController(detector_path, classifier_path)
# Open video file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Could not open video file: {video_path}")
# Get video properties for monitoring
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
if logfire is not None:
logfire.info('Video properties',
total_frames=total_frames,
fps=fps,
duration_seconds=total_frames/fps if fps > 0 else 0)
# Track gestures per hand ID
gesture_tracks: Dict[int, List[Tuple[int, float]]] = defaultdict(list) # {hand_id: [(gesture_id, confidence), ...]}
frame_count = 0
processed_frames = 0
detection_stats = {
'frames_with_detections': 0,
'total_detections': 0,
'gesture_counts': defaultdict(int)
}
try:
while True:
ret, frame = cap.read()
if not ret:
break
# Skip frames based on frame_skip parameter
if frame_count % frame_skip == 0:
# Process frame through the controller
bboxes, ids, labels = controller(frame)
processed_frames += 1
if bboxes is not None and ids is not None and labels is not None:
detection_stats['frames_with_detections'] += 1
detection_stats['total_detections'] += len(bboxes)
# Track gestures for each detected hand
for i in range(len(bboxes)):
hand_id = int(ids[i])
gesture_id = labels[i]
if gesture_id is not None:
# Get confidence from bbox (assuming it's the last element)
confidence = 0.8 # Default confidence, could be extracted from bbox if available
gesture_tracks[hand_id].append((gesture_id, confidence))
detection_stats['gesture_counts'][gesture_id] += 1
# Log individual detections for debugging
if logfire is not None:
gesture_name = FULL_GESTURE_MAPPING.get(gesture_id, f"unknown_{gesture_id}")
logfire.debug('Hand detection',
frame=frame_count,
hand_id=hand_id,
gesture_id=gesture_id,
gesture_name=gesture_name,
confidence=confidence,
bbox=bboxes[i].tolist() if len(bboxes[i]) >= 4 else None)
else:
# Advance tracker on skipped frames to keep state consistent
controller.update(np.empty((0, 5)), None)
frame_count += 1
# Log progress every 100 frames
if frame_count % 100 == 0 and logfire is not None:
progress = (frame_count / total_frames) * 100 if total_frames > 0 else 0
logfire.info('Processing progress',
frame=frame_count,
total_frames=total_frames,
progress_percent=round(progress, 2))
finally:
cap.release()
# Log final detection statistics
if logfire is not None:
logfire.info('Detection statistics',
total_frames=frame_count,
processed_frames=processed_frames,
frame_skip=frame_skip,
frames_with_detections=detection_stats['frames_with_detections'],
total_detections=detection_stats['total_detections'],
detection_rate=detection_stats['frames_with_detections']/processed_frames if processed_frames > 0 else 0,
gesture_counts=dict(detection_stats['gesture_counts']))
# Process gesture tracks to find continuous gestures
detected_gestures = []
for hand_id, gesture_sequence in gesture_tracks.items():
if not gesture_sequence:
continue
# Group consecutive identical gestures
current_gesture = None
current_duration = 0
current_confidence = 0.0
for gesture_id, confidence in gesture_sequence:
if current_gesture is None or current_gesture != gesture_id:
# Save previous gesture if it was significant
# Adjust minimum duration based on frame skip
min_duration = max(5, frame_skip * 2) # At least 2 processed frames
if current_gesture is not None and current_duration >= min_duration:
gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}")
avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0
# Scale duration back to original frame count
scaled_duration = current_duration * frame_skip
detected_gestures.append(Gesture(
gesture=gesture_name,
duration=scaled_duration,
confidence=avg_confidence
))
# Log significant gesture detection
if logfire is not None:
logfire.info('Significant gesture detected',
hand_id=hand_id,
gesture=gesture_name,
duration_frames=current_duration,
confidence=avg_confidence)
# Start new gesture
current_gesture = gesture_id
current_duration = 1
current_confidence = confidence
else:
# Continue current gesture
current_duration += 1
current_confidence += confidence
# Don't forget the last gesture
min_duration = max(5, frame_skip * 2) # At least 2 processed frames
if current_gesture is not None and current_duration >= min_duration:
gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}")
avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0
# Scale duration back to original frame count
scaled_duration = current_duration * frame_skip
detected_gestures.append(Gesture(
gesture=gesture_name,
duration=scaled_duration,
confidence=avg_confidence
))
# Log final gesture detection
if logfire is not None:
logfire.info('Final gesture detected',
hand_id=hand_id,
gesture=gesture_name,
duration_frames=current_duration,
confidence=avg_confidence)
# Log final results
if logfire is not None:
logfire.info('Video processing completed',
total_gestures_detected=len(detected_gestures),
unique_hands=len(gesture_tracks),
gestures=[{'gesture': g.gesture, 'duration': g.duration, 'confidence': g.confidence} for g in detected_gestures])
return detected_gestures
finally:
if span_context is not None:
span_context.__exit__(None, None, None)
@app.get("/health")
async def health():
"""Health check endpoint."""
if logfire is not None:
logfire.info('Health check requested')
return {"message": "OK"}
@app.post("/gestures", response_model=GestureResponse)
async def detect_gestures(video: UploadFile = File(...), frame_skip: int = Form(1)):
"""
Detect gestures in an uploaded video file.
Parameters
----------
video : UploadFile
The video file to process
frame_skip : int
Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame)
Returns
-------
GestureResponse
Response containing detected gestures with duration and confidence
"""
# Log request details
if logfire is not None:
logfire.info('Gesture detection request received',
filename=video.filename,
content_type=video.content_type,
content_length=video.size if hasattr(video, 'size') else 'unknown')
# Validate file type
if not video.content_type.startswith('video/'):
if logfire is not None:
logfire.warning('Invalid file type received', content_type=video.content_type)
raise HTTPException(status_code=400, detail="File must be a video")
# Create temporary file to save uploaded video
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file:
try:
# Write uploaded content to temporary file
content = await video.read()
temp_file.write(content)
temp_file.flush()
if logfire is not None:
logfire.info('Video file saved for processing',
temp_file=temp_file.name,
file_size_bytes=len(content))
# Process the video with frame skip parameter
gestures = process_video_for_gestures(temp_file.name, frame_skip=frame_skip)
if logfire is not None:
logfire.info('Gesture detection completed successfully',
total_gestures=len(gestures),
gestures=[g.gesture for g in gestures])
return GestureResponse(gestures=gestures)
except Exception as e:
if logfire is not None:
logfire.error('Error processing video',
error=str(e),
error_type=type(e).__name__,
temp_file=temp_file.name)
raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}")
finally:
# Clean up temporary file
if os.path.exists(temp_file.name):
os.unlink(temp_file.name)
if logfire is not None:
logfire.debug('Temporary file cleaned up', temp_file=temp_file.name)