Spaces:
Sleeping
Sleeping
| """ | |
| reid.py - Simplified Dog Re-Identification with Body Part Crops | |
| Uses ResNet50 features on full image + body part crops | |
| """ | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import cv2 | |
| import torch | |
| import torch.nn as nn | |
| import torchvision.models as models | |
| import torchvision.transforms as transforms | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from typing import Dict, List, Optional, Tuple | |
| import time | |
| from dataclasses import dataclass | |
| from tracking import Track | |
| class DogFeatures: | |
| """Container for dog features including body parts""" | |
| full_features: np.ndarray | |
| head_features: Optional[np.ndarray] = None | |
| torso_features: Optional[np.ndarray] = None | |
| rear_features: Optional[np.ndarray] = None | |
| color_histogram: Optional[np.ndarray] = None | |
| position: Tuple[float, float] = (0, 0) | |
| timestamp: float = 0 | |
| confidence: float = 0 | |
| class BodyPartCrops: | |
| """Container for body part image crops""" | |
| full_image: np.ndarray | |
| head_crop: Optional[np.ndarray] = None | |
| torso_crop: Optional[np.ndarray] = None | |
| rear_crop: Optional[np.ndarray] = None | |
| crop_confidences: Dict[str, float] = None | |
| class BodyPartDogReID: | |
| """ | |
| Dog ReID using body part crops + ResNet features | |
| """ | |
| def __init__(self, | |
| similarity_threshold: float = 0.7, | |
| device: str = 'cuda', | |
| use_body_parts: bool = True): | |
| """ | |
| Initialize ReID system with body part support | |
| Args: | |
| similarity_threshold: Threshold for matching | |
| device: 'cuda' or 'cpu' | |
| use_body_parts: Whether to use body part crops | |
| """ | |
| self.device = device if torch.cuda.is_available() else 'cpu' | |
| self.similarity_threshold = similarity_threshold | |
| self.use_body_parts = use_body_parts | |
| # Initialize ResNet50 for feature extraction | |
| self.resnet = models.resnet50(weights='IMAGENET1K_V1') | |
| self.resnet = nn.Sequential(*list(self.resnet.children())[:-1]) | |
| self.resnet.to(self.device).eval() | |
| # Image preprocessing | |
| self.transform = transforms.Compose([ | |
| transforms.ToPILImage(), | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize( | |
| mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225] | |
| ) | |
| ]) | |
| # Feature weights for different parts | |
| self.part_weights = { | |
| 'full': 0.4, | |
| 'head': 0.3, | |
| 'torso': 0.2, | |
| 'rear': 0.1 | |
| } | |
| # Dog database | |
| self.dog_database: Dict[int, List[DogFeatures]] = {} | |
| self.dog_images: Dict[int, List[np.ndarray]] = {} | |
| self.dog_body_parts: Dict[int, List[BodyPartCrops]] = {} | |
| # Tracking | |
| self.track_to_dog: Dict[int, int] = {} | |
| self.next_dog_id = 1 | |
| self.last_positions: Dict[int, Tuple[float, float, float]] = {} | |
| def extract_body_parts(self, image: np.ndarray, bbox: List[float]) -> BodyPartCrops: | |
| """ | |
| Extract body part crops from dog image | |
| Simple geometric division - can be improved with detection | |
| Args: | |
| image: Full dog image crop | |
| bbox: Bounding box of dog | |
| Returns: | |
| BodyPartCrops object | |
| """ | |
| h, w = image.shape[:2] | |
| crops = BodyPartCrops(full_image=image) | |
| confidences = {} | |
| try: | |
| # Head: Top 35% of image | |
| head_h = int(h * 0.35) | |
| head_crop = image[:head_h, :] | |
| if head_crop.size > 0: | |
| crops.head_crop = head_crop | |
| confidences['head'] = self._validate_crop(head_crop) | |
| # Torso: Middle 40% of image | |
| torso_start = int(h * 0.25) | |
| torso_end = int(h * 0.65) | |
| torso_crop = image[torso_start:torso_end, :] | |
| if torso_crop.size > 0: | |
| crops.torso_crop = torso_crop | |
| confidences['torso'] = self._validate_crop(torso_crop) | |
| # Rear: Bottom 40% of image | |
| rear_start = int(h * 0.60) | |
| rear_crop = image[rear_start:, :] | |
| if rear_crop.size > 0: | |
| crops.rear_crop = rear_crop | |
| confidences['rear'] = self._validate_crop(rear_crop) | |
| crops.crop_confidences = confidences | |
| except Exception as e: | |
| print(f"Error extracting body parts: {e}") | |
| return crops | |
| def _validate_crop(self, crop: np.ndarray) -> float: | |
| """ | |
| Validate crop quality (simple version) | |
| Returns confidence score 0-1 | |
| """ | |
| if crop.size == 0: | |
| return 0.0 | |
| # Check if crop has sufficient content (not too uniform) | |
| gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if len(crop.shape) == 3 else crop | |
| std_dev = np.std(gray) | |
| # Higher standard deviation = more texture/content | |
| confidence = min(1.0, std_dev / 50.0) | |
| return confidence | |
| def extract_resnet_features(self, image: np.ndarray) -> Optional[np.ndarray]: | |
| """Extract ResNet50 features from image""" | |
| if image is None or image.size == 0: | |
| return None | |
| try: | |
| # Convert BGR to RGB | |
| img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Preprocess | |
| img_tensor = self.transform(img_rgb).unsqueeze(0).to(self.device) | |
| # Extract features | |
| with torch.no_grad(): | |
| features = self.resnet(img_tensor) | |
| features = features.squeeze().cpu().numpy() | |
| # L2 normalize | |
| features = features / (np.linalg.norm(features) + 1e-7) | |
| return features | |
| except Exception as e: | |
| print(f"Feature extraction error: {e}") | |
| return None | |
| def extract_all_features(self, crops: BodyPartCrops) -> DogFeatures: | |
| """ | |
| Extract features from full image and all body parts | |
| Args: | |
| crops: Body part crops | |
| Returns: | |
| DogFeatures object | |
| """ | |
| # Extract features from full image (required) | |
| full_features = self.extract_resnet_features(crops.full_image) | |
| if full_features is None: | |
| full_features = np.zeros(2048) | |
| # Extract features from body parts if available | |
| head_features = None | |
| torso_features = None | |
| rear_features = None | |
| if self.use_body_parts: | |
| if crops.head_crop is not None: | |
| head_features = self.extract_resnet_features(crops.head_crop) | |
| if crops.torso_crop is not None: | |
| torso_features = self.extract_resnet_features(crops.torso_crop) | |
| if crops.rear_crop is not None: | |
| rear_features = self.extract_resnet_features(crops.rear_crop) | |
| # Extract color histogram | |
| color_hist = self.extract_color_histogram(crops.full_image) | |
| return DogFeatures( | |
| full_features=full_features, | |
| head_features=head_features, | |
| torso_features=torso_features, | |
| rear_features=rear_features, | |
| color_histogram=color_hist, | |
| timestamp=time.time() | |
| ) | |
| def extract_color_histogram(self, image: np.ndarray) -> np.ndarray: | |
| """Extract color histogram features""" | |
| try: | |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
| hist_h = cv2.calcHist([hsv], [0], None, [50], [0, 180]) | |
| hist_s = cv2.calcHist([hsv], [1], None, [60], [0, 256]) | |
| hist_v = cv2.calcHist([hsv], [2], None, [60], [0, 256]) | |
| hist = np.concatenate([hist_h, hist_s, hist_v]) | |
| hist = cv2.normalize(hist, hist).flatten() | |
| return hist | |
| except Exception as e: | |
| print(f"Color extraction error: {e}") | |
| return np.zeros(170) | |
| def compare_features(self, features1: DogFeatures, features2: DogFeatures) -> float: | |
| """ | |
| Compare two feature sets including body parts | |
| Args: | |
| features1: First dog's features | |
| features2: Second dog's features | |
| Returns: | |
| Similarity score 0-1 | |
| """ | |
| scores = {} | |
| weights = {} | |
| # Always compare full image features | |
| full_sim = cosine_similarity( | |
| features1.full_features.reshape(1, -1), | |
| features2.full_features.reshape(1, -1) | |
| )[0, 0] | |
| scores['full'] = full_sim | |
| weights['full'] = self.part_weights['full'] | |
| # Compare body parts if available in both | |
| if self.use_body_parts: | |
| if features1.head_features is not None and features2.head_features is not None: | |
| head_sim = cosine_similarity( | |
| features1.head_features.reshape(1, -1), | |
| features2.head_features.reshape(1, -1) | |
| )[0, 0] | |
| scores['head'] = head_sim | |
| weights['head'] = self.part_weights['head'] | |
| if features1.torso_features is not None and features2.torso_features is not None: | |
| torso_sim = cosine_similarity( | |
| features1.torso_features.reshape(1, -1), | |
| features2.torso_features.reshape(1, -1) | |
| )[0, 0] | |
| scores['torso'] = torso_sim | |
| weights['torso'] = self.part_weights['torso'] | |
| if features1.rear_features is not None and features2.rear_features is not None: | |
| rear_sim = cosine_similarity( | |
| features1.rear_features.reshape(1, -1), | |
| features2.rear_features.reshape(1, -1) | |
| )[0, 0] | |
| scores['rear'] = rear_sim | |
| weights['rear'] = self.part_weights['rear'] | |
| # Compare color histograms if available | |
| if features1.color_histogram is not None and features2.color_histogram is not None: | |
| color_sim = cv2.compareHist( | |
| features1.color_histogram.astype(np.float32), | |
| features2.color_histogram.astype(np.float32), | |
| cv2.HISTCMP_CORREL | |
| ) | |
| scores['color'] = color_sim | |
| weights['color'] = 0.1 | |
| # Calculate weighted average | |
| if scores: | |
| total_weight = sum(weights.values()) | |
| weighted_score = sum(scores[k] * weights[k] for k in scores.keys()) | |
| return weighted_score / total_weight if total_weight > 0 else 0.0 | |
| return 0.0 | |
| def calculate_temporal_score(self, track: Track, dog_id: int) -> float: | |
| """Calculate temporal/position coherence score""" | |
| if dog_id not in self.last_positions: | |
| return 1.0 | |
| last_x, last_y, last_time = self.last_positions[dog_id] | |
| bbox = track.bbox | |
| curr_x = (bbox[0] + bbox[2]) / 2 | |
| curr_y = (bbox[1] + bbox[3]) / 2 | |
| curr_time = time.time() | |
| time_diff = curr_time - last_time | |
| distance = np.sqrt((curr_x - last_x)**2 + (curr_y - last_y)**2) | |
| max_speed = 500 | |
| max_distance = max_speed * time_diff | |
| if distance <= max_distance: | |
| return 1.0 | |
| else: | |
| return max(0.0, 1.0 - (distance - max_distance) / max_distance) | |
| def match_or_register(self, track: Track) -> Tuple[int, float]: | |
| """ | |
| Match track to existing dog or register new one | |
| Returns: | |
| (dog_id, confidence_score) | |
| """ | |
| # Get most recent detection with image | |
| detection = None | |
| for det in reversed(track.detections): | |
| if det.image_crop is not None: | |
| detection = det | |
| break | |
| if detection is None: | |
| return 0, 0.0 | |
| # Extract body parts | |
| crops = self.extract_body_parts(detection.image_crop, detection.bbox) | |
| # Extract features | |
| current_features = self.extract_all_features(crops) | |
| # Get position | |
| bbox = track.bbox | |
| position = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) | |
| current_features.position = position | |
| current_features.confidence = detection.confidence | |
| # Find best match | |
| best_dog_id = None | |
| best_score = -1.0 | |
| for dog_id, feature_list in self.dog_database.items(): | |
| # Calculate temporal score | |
| temporal_score = self.calculate_temporal_score(track, dog_id) | |
| # Compare with most recent features | |
| if feature_list: | |
| latest_features = feature_list[-1] | |
| similarity = self.compare_features(current_features, latest_features) | |
| # Apply temporal boost | |
| final_score = similarity * (0.8 + 0.2 * temporal_score) | |
| if final_score > best_score: | |
| best_score = final_score | |
| best_dog_id = dog_id | |
| # Decide if match or new dog | |
| if best_dog_id is not None and best_score >= self.similarity_threshold: | |
| # Update existing dog | |
| self.dog_database[best_dog_id].append(current_features) | |
| self.dog_images[best_dog_id].append(detection.image_crop) | |
| self.dog_body_parts[best_dog_id].append(crops) | |
| # Keep only recent data | |
| if len(self.dog_database[best_dog_id]) > 20: | |
| self.dog_database[best_dog_id] = self.dog_database[best_dog_id][-20:] | |
| self.dog_images[best_dog_id] = self.dog_images[best_dog_id][-20:] | |
| self.dog_body_parts[best_dog_id] = self.dog_body_parts[best_dog_id][-20:] | |
| # Update position | |
| self.last_positions[best_dog_id] = (position[0], position[1], time.time()) | |
| self.track_to_dog[track.track_id] = best_dog_id | |
| return best_dog_id, best_score | |
| else: | |
| # Register new dog | |
| new_dog_id = self.next_dog_id | |
| self.next_dog_id += 1 | |
| self.dog_database[new_dog_id] = [current_features] | |
| self.dog_images[new_dog_id] = [detection.image_crop] | |
| self.dog_body_parts[new_dog_id] = [crops] | |
| self.last_positions[new_dog_id] = (position[0], position[1], time.time()) | |
| self.track_to_dog[track.track_id] = new_dog_id | |
| return new_dog_id, best_score | |
| def get_body_parts_for_export(self, dog_id: int) -> List[BodyPartCrops]: | |
| """Get all body part crops for a specific dog""" | |
| return self.dog_body_parts.get(dog_id, []) | |
| def set_threshold(self, threshold: float): | |
| """Update similarity threshold""" | |
| self.similarity_threshold = max(0.3, min(0.95, threshold)) | |
| def set_use_body_parts(self, use_parts: bool): | |
| """Enable/disable body part features""" | |
| self.use_body_parts = use_parts | |
| def reset(self): | |
| """Reset all stored data""" | |
| self.dog_database.clear() | |
| self.dog_images.clear() | |
| self.dog_body_parts.clear() | |
| self.track_to_dog.clear() | |
| self.last_positions.clear() | |
| self.next_dog_id = 1 | |
| # Backward compatibility | |
| DogReID = BodyPartDogReID | |
| SimplifiedDogReID = BodyPartDogReID | |
| SimpleReID = BodyPartDogReID |