DynamicVemesv2 / detection.py
Taino's picture
Update detection.py
59009ce verified
import cv2
from django import conf
import numpy as np
from ultralytics import YOLO
from insightface.app import FaceAnalysis
import torchreid
import torch
# Configuration
DETECTION_THRESHOLD = 0.75 # Confidence threshold for person detection
# =============================================================================
# MODEL INITIALIZATION
# =============================================================================
# Load YOLOv8 model with ByteTrack tracker for person detection and tracking
# YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs
model = YOLO(r'detection.pt') # Replace with your trained model path
# Initialize InsightFace for facial feature extraction
# Uses buffalo_l model which provides high-quality face embeddings
face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider'])
face_app.prepare(ctx_id=0) # Prepare for GPU inference
# Initialize TorchReID for full-body person re-identification
# OSNet is a lightweight but effective model for person ReID
reid_extractor = torchreid.utils.FeatureExtractor(
model_name='osnet_x0_25',
model_path='osnet_x0_25_market1501.pth', # Pre-trained on Market1501 dataset
device='cuda'
)
# =============================================================================
# GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION
# =============================================================================
# Storage for known person embeddings and their assigned global IDs
known_embeddings = [] # List of combined face+body embeddings
known_ids = [] # Corresponding global IDs for each embedding
next_global_id = 1 # Counter for assigning new global IDs
# Mapping from ByteTrack tracker IDs to global person IDs
# This helps maintain consistency when tracker IDs change
track_to_global = {}
# =============================================================================
# VIDEO INPUT/OUTPUT SETUP
# =============================================================================
# Initialize video capture and output writer
cap = cv2.VideoCapture("demo.mp4") # Input video file
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Create output video writer with same properties as input
out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
# =============================================================================
# MAIN PROCESSING LOOP
# =============================================================================
while True:
ret, frame = cap.read()
if not ret:
break # End of video
# Run YOLOv8 detection with ByteTrack tracking
# persist=True maintains tracking across frames
results = model.track(frame, tracker="bytetrack.yaml", persist=True,
verbose=False, conf=DETECTION_THRESHOLD)
# Process each detection result
for result in results:
# Extract bounding boxes in (x1, y1, x2, y2) format
boxes = result.boxes.xyxy.cpu().numpy()
# Extract tracking IDs if available
if result.boxes.id is not None:
track_ids = result.boxes.id.int().cpu().tolist()
else:
# No tracking IDs available, assign None for each detection
track_ids = [None] * len(boxes)
# Process each detected person
for box, track_id in zip(boxes, track_ids):
x1, y1, x2, y2 = map(int, box)
# Crop the person from the frame
person_crop = frame[y1:y2, x1:x2]
# Initialize embedding variables
face_embedding = None
body_embedding = None
# =============================================================
# FACE EMBEDDING EXTRACTION
# =============================================================
# Extract face embedding using InsightFace
faces = face_app.get(person_crop)
if faces:
# Use the first detected face (most confident)
face_embedding = faces[0].embedding
# =============================================================
# BODY EMBEDDING EXTRACTION
# =============================================================
# Extract body embedding using TorchReID
try:
# TorchReID expects 128x256 RGB input
body_input = cv2.resize(person_crop, (128, 256))
body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB)
# Extract features and convert to numpy
body_embedding = reid_extractor(body_input)[0].cpu().numpy()
except:
# Handle cases where crop is too small or invalid
pass
# =============================================================
# EMBEDDING COMBINATION AND PERSON MATCHING
# =============================================================
# Combine face and body embeddings for robust person representation
embedding = None
if face_embedding is not None and body_embedding is not None:
# Concatenate both embeddings for maximum distinctiveness
embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32)
elif face_embedding is not None:
# Use only face embedding if body embedding failed
embedding = face_embedding.astype(np.float32)
elif body_embedding is not None:
# Use only body embedding if face detection failed
embedding = body_embedding.astype(np.float32)
# Assign global ID based on embedding similarity
if embedding is not None:
match_found = False
# Search for similar embeddings among known people
if known_embeddings:
# Only compare embeddings of the same dimension
matching_embeddings = [
(emb, gid) for emb, gid in zip(known_embeddings, known_ids)
if emb.shape[0] == embedding.shape[0]
]
if matching_embeddings:
embs, gids = zip(*matching_embeddings)
embs = np.array(embs)
# Calculate cosine similarity with all known embeddings
sims = np.dot(embs, embedding) / (
np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6
)
# Find the best match
best_match = np.argmax(sims)
if sims[best_match] > 0.6: # Similarity threshold
global_id = gids[best_match]
match_found = True
# If no match found, assign new global ID
if not match_found:
global_id = next_global_id
next_global_id += 1
known_embeddings.append(embedding)
known_ids.append(global_id)
# Update tracker ID to global ID mapping
if track_id is not None:
track_to_global[track_id] = global_id
display_id = global_id
else:
# No usable embedding available, fallback to tracker ID
global_id = track_to_global.get(track_id, f"T{track_id}")
display_id = global_id
# =============================================================
# VISUALIZATION
# =============================================================
# Draw bounding box around detected person
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Display the global ID above the bounding box
cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# =============================================================================
# OUTPUT AND DISPLAY
# =============================================================================
# Show the frame with tracking results
cv2.imshow("Tracking + ReID", frame)
# Break loop if 'q' key is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Write frame to output video
out.write(frame)
# =============================================================================
# CLEANUP
# =============================================================================
# Release video capture and writer resources
cap.release()
out.release()
cv2.destroyAllWindows()