Stray_Dogs / reid.py
mustafa2ak's picture
Update reid.py
44e3141 verified
raw
history blame
15.8 kB
"""
reid.py - Simplified Dog Re-Identification with Body Part Crops
Uses ResNet50 features on full image + body part crops
"""
from dataclasses import dataclass
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity
from typing import Dict, List, Optional, Tuple
import time
from dataclasses import dataclass
from tracking import Track
@dataclass
class DogFeatures:
"""Container for dog features including body parts"""
full_features: np.ndarray
head_features: Optional[np.ndarray] = None
torso_features: Optional[np.ndarray] = None
rear_features: Optional[np.ndarray] = None
color_histogram: Optional[np.ndarray] = None
position: Tuple[float, float] = (0, 0)
timestamp: float = 0
confidence: float = 0
@dataclass
class BodyPartCrops:
"""Container for body part image crops"""
full_image: np.ndarray
head_crop: Optional[np.ndarray] = None
torso_crop: Optional[np.ndarray] = None
rear_crop: Optional[np.ndarray] = None
crop_confidences: Dict[str, float] = None
class BodyPartDogReID:
"""
Dog ReID using body part crops + ResNet features
"""
def __init__(self,
similarity_threshold: float = 0.7,
device: str = 'cuda',
use_body_parts: bool = True):
"""
Initialize ReID system with body part support
Args:
similarity_threshold: Threshold for matching
device: 'cuda' or 'cpu'
use_body_parts: Whether to use body part crops
"""
self.device = device if torch.cuda.is_available() else 'cpu'
self.similarity_threshold = similarity_threshold
self.use_body_parts = use_body_parts
# Initialize ResNet50 for feature extraction
self.resnet = models.resnet50(weights='IMAGENET1K_V1')
self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])
self.resnet.to(self.device).eval()
# Image preprocessing
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
# Feature weights for different parts
self.part_weights = {
'full': 0.4,
'head': 0.3,
'torso': 0.2,
'rear': 0.1
}
# Dog database
self.dog_database: Dict[int, List[DogFeatures]] = {}
self.dog_images: Dict[int, List[np.ndarray]] = {}
self.dog_body_parts: Dict[int, List[BodyPartCrops]] = {}
# Tracking
self.track_to_dog: Dict[int, int] = {}
self.next_dog_id = 1
self.last_positions: Dict[int, Tuple[float, float, float]] = {}
def extract_body_parts(self, image: np.ndarray, bbox: List[float]) -> BodyPartCrops:
"""
Extract body part crops from dog image
Simple geometric division - can be improved with detection
Args:
image: Full dog image crop
bbox: Bounding box of dog
Returns:
BodyPartCrops object
"""
h, w = image.shape[:2]
crops = BodyPartCrops(full_image=image)
confidences = {}
try:
# Head: Top 35% of image
head_h = int(h * 0.35)
head_crop = image[:head_h, :]
if head_crop.size > 0:
crops.head_crop = head_crop
confidences['head'] = self._validate_crop(head_crop)
# Torso: Middle 40% of image
torso_start = int(h * 0.25)
torso_end = int(h * 0.65)
torso_crop = image[torso_start:torso_end, :]
if torso_crop.size > 0:
crops.torso_crop = torso_crop
confidences['torso'] = self._validate_crop(torso_crop)
# Rear: Bottom 40% of image
rear_start = int(h * 0.60)
rear_crop = image[rear_start:, :]
if rear_crop.size > 0:
crops.rear_crop = rear_crop
confidences['rear'] = self._validate_crop(rear_crop)
crops.crop_confidences = confidences
except Exception as e:
print(f"Error extracting body parts: {e}")
return crops
def _validate_crop(self, crop: np.ndarray) -> float:
"""
Validate crop quality (simple version)
Returns confidence score 0-1
"""
if crop.size == 0:
return 0.0
# Check if crop has sufficient content (not too uniform)
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop
std_dev = np.std(gray)
# Higher standard deviation = more texture/content
confidence = min(1.0, std_dev / 50.0)
return confidence
def extract_resnet_features(self, image: np.ndarray) -> Optional[np.ndarray]:
"""Extract ResNet50 features from image"""
if image is None or image.size == 0:
return None
try:
# Convert BGR to RGB
img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Preprocess
img_tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
# Extract features
with torch.no_grad():
features = self.resnet(img_tensor)
features = features.squeeze().cpu().numpy()
# L2 normalize
features = features / (np.linalg.norm(features) + 1e-7)
return features
except Exception as e:
print(f"Feature extraction error: {e}")
return None
def extract_all_features(self, crops: BodyPartCrops) -> DogFeatures:
"""
Extract features from full image and all body parts
Args:
crops: Body part crops
Returns:
DogFeatures object
"""
# Extract features from full image (required)
full_features = self.extract_resnet_features(crops.full_image)
if full_features is None:
full_features = np.zeros(2048)
# Extract features from body parts if available
head_features = None
torso_features = None
rear_features = None
if self.use_body_parts:
if crops.head_crop is not None:
head_features = self.extract_resnet_features(crops.head_crop)
if crops.torso_crop is not None:
torso_features = self.extract_resnet_features(crops.torso_crop)
if crops.rear_crop is not None:
rear_features = self.extract_resnet_features(crops.rear_crop)
# Extract color histogram
color_hist = self.extract_color_histogram(crops.full_image)
return DogFeatures(
full_features=full_features,
head_features=head_features,
torso_features=torso_features,
rear_features=rear_features,
color_histogram=color_hist,
timestamp=time.time()
)
def extract_color_histogram(self, image: np.ndarray) -> np.ndarray:
"""Extract color histogram features"""
try:
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [50], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [60], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [60], [0, 256])
hist = np.concatenate([hist_h, hist_s, hist_v])
hist = cv2.normalize(hist, hist).flatten()
return hist
except Exception as e:
print(f"Color extraction error: {e}")
return np.zeros(170)
def compare_features(self, features1: DogFeatures, features2: DogFeatures) -> float:
"""
Compare two feature sets including body parts
Args:
features1: First dog's features
features2: Second dog's features
Returns:
Similarity score 0-1
"""
scores = {}
weights = {}
# Always compare full image features
full_sim = cosine_similarity(
features1.full_features.reshape(1, -1),
features2.full_features.reshape(1, -1)
)[0, 0]
scores['full'] = full_sim
weights['full'] = self.part_weights['full']
# Compare body parts if available in both
if self.use_body_parts:
if features1.head_features is not None and features2.head_features is not None:
head_sim = cosine_similarity(
features1.head_features.reshape(1, -1),
features2.head_features.reshape(1, -1)
)[0, 0]
scores['head'] = head_sim
weights['head'] = self.part_weights['head']
if features1.torso_features is not None and features2.torso_features is not None:
torso_sim = cosine_similarity(
features1.torso_features.reshape(1, -1),
features2.torso_features.reshape(1, -1)
)[0, 0]
scores['torso'] = torso_sim
weights['torso'] = self.part_weights['torso']
if features1.rear_features is not None and features2.rear_features is not None:
rear_sim = cosine_similarity(
features1.rear_features.reshape(1, -1),
features2.rear_features.reshape(1, -1)
)[0, 0]
scores['rear'] = rear_sim
weights['rear'] = self.part_weights['rear']
# Compare color histograms if available
if features1.color_histogram is not None and features2.color_histogram is not None:
color_sim = cv2.compareHist(
features1.color_histogram.astype(np.float32),
features2.color_histogram.astype(np.float32),
cv2.HISTCMP_CORREL
)
scores['color'] = color_sim
weights['color'] = 0.1
# Calculate weighted average
if scores:
total_weight = sum(weights.values())
weighted_score = sum(scores[k] * weights[k] for k in scores.keys())
return weighted_score / total_weight if total_weight > 0 else 0.0
return 0.0
def calculate_temporal_score(self, track: Track, dog_id: int) -> float:
"""Calculate temporal/position coherence score"""
if dog_id not in self.last_positions:
return 1.0
last_x, last_y, last_time = self.last_positions[dog_id]
bbox = track.bbox
curr_x = (bbox[0] + bbox[2]) / 2
curr_y = (bbox[1] + bbox[3]) / 2
curr_time = time.time()
time_diff = curr_time - last_time
distance = np.sqrt((curr_x - last_x)**2 + (curr_y - last_y)**2)
max_speed = 500
max_distance = max_speed * time_diff
if distance <= max_distance:
return 1.0
else:
return max(0.0, 1.0 - (distance - max_distance) / max_distance)
def match_or_register(self, track: Track) -> Tuple[int, float]:
"""
Match track to existing dog or register new one
Returns:
(dog_id, confidence_score)
"""
# Get most recent detection with image
detection = None
for det in reversed(track.detections):
if det.image_crop is not None:
detection = det
break
if detection is None:
return 0, 0.0
# Extract body parts
crops = self.extract_body_parts(detection.image_crop, detection.bbox)
# Extract features
current_features = self.extract_all_features(crops)
# Get position
bbox = track.bbox
position = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
current_features.position = position
current_features.confidence = detection.confidence
# Find best match
best_dog_id = None
best_score = -1.0
for dog_id, feature_list in self.dog_database.items():
# Calculate temporal score
temporal_score = self.calculate_temporal_score(track, dog_id)
# Compare with most recent features
if feature_list:
latest_features = feature_list[-1]
similarity = self.compare_features(current_features, latest_features)
# Apply temporal boost
final_score = similarity * (0.8 + 0.2 * temporal_score)
if final_score > best_score:
best_score = final_score
best_dog_id = dog_id
# Decide if match or new dog
if best_dog_id is not None and best_score >= self.similarity_threshold:
# Update existing dog
self.dog_database[best_dog_id].append(current_features)
self.dog_images[best_dog_id].append(detection.image_crop)
self.dog_body_parts[best_dog_id].append(crops)
# Keep only recent data
if len(self.dog_database[best_dog_id]) > 20:
self.dog_database[best_dog_id] = self.dog_database[best_dog_id][-20:]
self.dog_images[best_dog_id] = self.dog_images[best_dog_id][-20:]
self.dog_body_parts[best_dog_id] = self.dog_body_parts[best_dog_id][-20:]
# Update position
self.last_positions[best_dog_id] = (position[0], position[1], time.time())
self.track_to_dog[track.track_id] = best_dog_id
return best_dog_id, best_score
else:
# Register new dog
new_dog_id = self.next_dog_id
self.next_dog_id += 1
self.dog_database[new_dog_id] = [current_features]
self.dog_images[new_dog_id] = [detection.image_crop]
self.dog_body_parts[new_dog_id] = [crops]
self.last_positions[new_dog_id] = (position[0], position[1], time.time())
self.track_to_dog[track.track_id] = new_dog_id
return new_dog_id, best_score
def get_body_parts_for_export(self, dog_id: int) -> List[BodyPartCrops]:
"""Get all body part crops for a specific dog"""
return self.dog_body_parts.get(dog_id, [])
def set_threshold(self, threshold: float):
"""Update similarity threshold"""
self.similarity_threshold = max(0.3, min(0.95, threshold))
def set_use_body_parts(self, use_parts: bool):
"""Enable/disable body part features"""
self.use_body_parts = use_parts
def reset(self):
"""Reset all stored data"""
self.dog_database.clear()
self.dog_images.clear()
self.dog_body_parts.clear()
self.track_to_dog.clear()
self.last_positions.clear()
self.next_dog_id = 1
# Backward compatibility
DogReID = BodyPartDogReID
SimplifiedDogReID = BodyPartDogReID
SimpleReID = BodyPartDogReID