Spaces:
Sleeping
Sleeping
import cv2 | |
from django import conf | |
import numpy as np | |
from ultralytics import YOLO | |
from insightface.app import FaceAnalysis | |
import torchreid | |
import torch | |
# Configuration | |
DETECTION_THRESHOLD = 0.75 # Confidence threshold for person detection | |
# ============================================================================= | |
# MODEL INITIALIZATION | |
# ============================================================================= | |
# Load YOLOv8 model with ByteTrack tracker for person detection and tracking | |
# YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs | |
model = YOLO(r'detection.pt') # Replace with your trained model path | |
# Initialize InsightFace for facial feature extraction | |
# Uses buffalo_l model which provides high-quality face embeddings | |
face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider']) | |
face_app.prepare(ctx_id=0) # Prepare for GPU inference | |
# Initialize TorchReID for full-body person re-identification | |
# OSNet is a lightweight but effective model for person ReID | |
reid_extractor = torchreid.utils.FeatureExtractor( | |
model_name='osnet_x0_25', | |
model_path='osnet_x0_25_market1501.pth', # Pre-trained on Market1501 dataset | |
device='cuda' | |
) | |
# ============================================================================= | |
# GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION | |
# ============================================================================= | |
# Storage for known person embeddings and their assigned global IDs | |
known_embeddings = [] # List of combined face+body embeddings | |
known_ids = [] # Corresponding global IDs for each embedding | |
next_global_id = 1 # Counter for assigning new global IDs | |
# Mapping from ByteTrack tracker IDs to global person IDs | |
# This helps maintain consistency when tracker IDs change | |
track_to_global = {} | |
# ============================================================================= | |
# VIDEO INPUT/OUTPUT SETUP | |
# ============================================================================= | |
# Initialize video capture and output writer | |
cap = cv2.VideoCapture("demo.mp4") # Input video file | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Create output video writer with same properties as input | |
out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) | |
# ============================================================================= | |
# MAIN PROCESSING LOOP | |
# ============================================================================= | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break # End of video | |
# Run YOLOv8 detection with ByteTrack tracking | |
# persist=True maintains tracking across frames | |
results = model.track(frame, tracker="bytetrack.yaml", persist=True, | |
verbose=False, conf=DETECTION_THRESHOLD) | |
# Process each detection result | |
for result in results: | |
# Extract bounding boxes in (x1, y1, x2, y2) format | |
boxes = result.boxes.xyxy.cpu().numpy() | |
# Extract tracking IDs if available | |
if result.boxes.id is not None: | |
track_ids = result.boxes.id.int().cpu().tolist() | |
else: | |
# No tracking IDs available, assign None for each detection | |
track_ids = [None] * len(boxes) | |
# Process each detected person | |
for box, track_id in zip(boxes, track_ids): | |
x1, y1, x2, y2 = map(int, box) | |
# Crop the person from the frame | |
person_crop = frame[y1:y2, x1:x2] | |
# Initialize embedding variables | |
face_embedding = None | |
body_embedding = None | |
# ============================================================= | |
# FACE EMBEDDING EXTRACTION | |
# ============================================================= | |
# Extract face embedding using InsightFace | |
faces = face_app.get(person_crop) | |
if faces: | |
# Use the first detected face (most confident) | |
face_embedding = faces[0].embedding | |
# ============================================================= | |
# BODY EMBEDDING EXTRACTION | |
# ============================================================= | |
# Extract body embedding using TorchReID | |
try: | |
# TorchReID expects 128x256 RGB input | |
body_input = cv2.resize(person_crop, (128, 256)) | |
body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB) | |
# Extract features and convert to numpy | |
body_embedding = reid_extractor(body_input)[0].cpu().numpy() | |
except: | |
# Handle cases where crop is too small or invalid | |
pass | |
# ============================================================= | |
# EMBEDDING COMBINATION AND PERSON MATCHING | |
# ============================================================= | |
# Combine face and body embeddings for robust person representation | |
embedding = None | |
if face_embedding is not None and body_embedding is not None: | |
# Concatenate both embeddings for maximum distinctiveness | |
embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32) | |
elif face_embedding is not None: | |
# Use only face embedding if body embedding failed | |
embedding = face_embedding.astype(np.float32) | |
elif body_embedding is not None: | |
# Use only body embedding if face detection failed | |
embedding = body_embedding.astype(np.float32) | |
# Assign global ID based on embedding similarity | |
if embedding is not None: | |
match_found = False | |
# Search for similar embeddings among known people | |
if known_embeddings: | |
# Only compare embeddings of the same dimension | |
matching_embeddings = [ | |
(emb, gid) for emb, gid in zip(known_embeddings, known_ids) | |
if emb.shape[0] == embedding.shape[0] | |
] | |
if matching_embeddings: | |
embs, gids = zip(*matching_embeddings) | |
embs = np.array(embs) | |
# Calculate cosine similarity with all known embeddings | |
sims = np.dot(embs, embedding) / ( | |
np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6 | |
) | |
# Find the best match | |
best_match = np.argmax(sims) | |
if sims[best_match] > 0.6: # Similarity threshold | |
global_id = gids[best_match] | |
match_found = True | |
# If no match found, assign new global ID | |
if not match_found: | |
global_id = next_global_id | |
next_global_id += 1 | |
known_embeddings.append(embedding) | |
known_ids.append(global_id) | |
# Update tracker ID to global ID mapping | |
if track_id is not None: | |
track_to_global[track_id] = global_id | |
display_id = global_id | |
else: | |
# No usable embedding available, fallback to tracker ID | |
global_id = track_to_global.get(track_id, f"T{track_id}") | |
display_id = global_id | |
# ============================================================= | |
# VISUALIZATION | |
# ============================================================= | |
# Draw bounding box around detected person | |
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
# Display the global ID above the bounding box | |
cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) | |
# ============================================================================= | |
# OUTPUT AND DISPLAY | |
# ============================================================================= | |
# Show the frame with tracking results | |
cv2.imshow("Tracking + ReID", frame) | |
# Break loop if 'q' key is pressed | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
# Write frame to output video | |
out.write(frame) | |
# ============================================================================= | |
# CLEANUP | |
# ============================================================================= | |
# Release video capture and writer resources | |
cap.release() | |
out.release() | |
cv2.destroyAllWindows() |