Spaces:
Sleeping
Sleeping

Integrate DeepFashion2 dataset: add evaluation module, utilities, and API endpoints for dataset management and analysis
f8b306b
""" | |
DeepFashion2 Evaluation Module | |
Provides evaluation capabilities using DeepFashion2 dataset as benchmark | |
for the Vestiq fashion analysis system. | |
""" | |
import torch | |
import numpy as np | |
from typing import Dict, List, Tuple, Optional | |
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from pathlib import Path | |
import json | |
from tqdm import tqdm | |
from deepfashion2_utils import ( | |
DeepFashion2Config, | |
DeepFashion2Dataset, | |
DeepFashion2CategoryMapper, | |
create_deepfashion2_dataloader | |
) | |
class DeepFashion2Evaluator: | |
"""Evaluate fashion models using DeepFashion2 dataset""" | |
def __init__(self, config: DeepFashion2Config, analyzer=None): | |
""" | |
Initialize evaluator | |
Args: | |
config: DeepFashion2 configuration | |
analyzer: HuggingFaceFashionAnalyzer instance | |
""" | |
self.config = config | |
self.analyzer = analyzer | |
self.category_mapper = DeepFashion2CategoryMapper() | |
self.results = {} | |
def evaluate_detection_accuracy(self, split: str = 'validation', | |
max_samples: Optional[int] = None) -> Dict: | |
""" | |
Evaluate fashion object detection accuracy on DeepFashion2 | |
Args: | |
split: Dataset split to evaluate on | |
max_samples: Maximum number of samples to evaluate (None for all) | |
Returns: | |
Dictionary containing evaluation metrics | |
""" | |
if not self.analyzer: | |
raise ValueError("Analyzer not provided") | |
print(f"Evaluating detection accuracy on {split} split...") | |
# Load dataset | |
dataset = DeepFashion2Dataset( | |
root_dir=self.config.dataset_root, | |
split=split, | |
transform=None, | |
load_annotations=True | |
) | |
if max_samples: | |
dataset.image_files = dataset.image_files[:max_samples] | |
# Evaluation metrics | |
true_categories = [] | |
predicted_categories = [] | |
detection_scores = [] | |
for i in tqdm(range(len(dataset)), desc="Evaluating detection"): | |
try: | |
item = dataset[i] | |
image_path = item['image_path'] | |
annotations = item['annotations'] | |
# Get ground truth categories | |
gt_categories = dataset.get_categories_in_image(annotations) | |
gt_yainage_categories = [ | |
self.category_mapper.map_to_yainage90(cat) | |
for cat in gt_categories | |
] | |
gt_yainage_categories = list(set(gt_yainage_categories)) | |
if not gt_yainage_categories: | |
continue | |
# Get model predictions | |
with open(image_path, 'rb') as f: | |
image_bytes = f.read() | |
detection_results = self.analyzer.detect_fashion_objects( | |
self.analyzer.process_image_from_bytes(image_bytes) | |
) | |
if 'detected_items' in detection_results: | |
pred_categories = [ | |
item['category'] for item in detection_results['detected_items'] | |
if item['confidence'] > 0.5 | |
] | |
pred_categories = list(set(pred_categories)) | |
# Calculate detection score (IoU-like for categories) | |
if pred_categories and gt_yainage_categories: | |
intersection = set(pred_categories) & set(gt_yainage_categories) | |
union = set(pred_categories) | set(gt_yainage_categories) | |
score = len(intersection) / len(union) if union else 0 | |
detection_scores.append(score) | |
# Store for classification metrics | |
for gt_cat in gt_yainage_categories: | |
true_categories.append(gt_cat) | |
predicted_categories.append( | |
gt_cat if gt_cat in pred_categories else 'none' | |
) | |
except Exception as e: | |
print(f"Error processing image {i}: {e}") | |
continue | |
# Calculate metrics | |
metrics = self._calculate_classification_metrics( | |
true_categories, predicted_categories | |
) | |
metrics['detection_scores'] = detection_scores | |
metrics['mean_detection_score'] = np.mean(detection_scores) if detection_scores else 0 | |
metrics['num_samples'] = len(dataset) | |
self.results['detection_accuracy'] = metrics | |
return metrics | |
def evaluate_feature_extraction(self, split: str = 'validation', | |
max_samples: Optional[int] = None) -> Dict: | |
""" | |
Evaluate feature extraction quality using DeepFashion2 | |
Args: | |
split: Dataset split to evaluate on | |
max_samples: Maximum number of samples to evaluate | |
Returns: | |
Dictionary containing feature evaluation metrics | |
""" | |
if not self.analyzer: | |
raise ValueError("Analyzer not provided") | |
print(f"Evaluating feature extraction on {split} split...") | |
dataset = DeepFashion2Dataset( | |
root_dir=self.config.dataset_root, | |
split=split, | |
transform=None, | |
load_annotations=True | |
) | |
if max_samples: | |
dataset.image_files = dataset.image_files[:max_samples] | |
features_by_category = {} | |
feature_dimensions = [] | |
for i in tqdm(range(len(dataset)), desc="Extracting features"): | |
try: | |
item = dataset[i] | |
image_path = item['image_path'] | |
annotations = item['annotations'] | |
# Get ground truth categories | |
gt_categories = dataset.get_categories_in_image(annotations) | |
gt_yainage_categories = [ | |
self.category_mapper.map_to_yainage90(cat) | |
for cat in gt_categories | |
] | |
if not gt_yainage_categories: | |
continue | |
# Extract features | |
with open(image_path, 'rb') as f: | |
image_bytes = f.read() | |
feature_results = self.analyzer.extract_fashion_features( | |
self.analyzer.process_image_from_bytes(image_bytes) | |
) | |
if 'feature_vector' in feature_results: | |
features = np.array(feature_results['feature_vector']) | |
feature_dimensions.append(feature_results['feature_dimension']) | |
# Group features by category | |
for category in gt_yainage_categories: | |
if category not in features_by_category: | |
features_by_category[category] = [] | |
features_by_category[category].append(features) | |
except Exception as e: | |
print(f"Error processing image {i}: {e}") | |
continue | |
# Calculate feature quality metrics | |
metrics = { | |
'feature_dimension': np.mean(feature_dimensions) if feature_dimensions else 0, | |
'categories_found': list(features_by_category.keys()), | |
'samples_per_category': { | |
cat: len(feats) for cat, feats in features_by_category.items() | |
} | |
} | |
# Calculate intra-category similarity and inter-category distance | |
if len(features_by_category) > 1: | |
intra_similarities = [] | |
inter_distances = [] | |
categories = list(features_by_category.keys()) | |
for i, cat1 in enumerate(categories): | |
cat1_features = np.array(features_by_category[cat1]) | |
# Intra-category similarity | |
if len(cat1_features) > 1: | |
similarities = [] | |
for j in range(len(cat1_features)): | |
for k in range(j+1, len(cat1_features)): | |
sim = np.dot(cat1_features[j], cat1_features[k]) | |
similarities.append(sim) | |
intra_similarities.extend(similarities) | |
# Inter-category distance | |
for j, cat2 in enumerate(categories[i+1:], i+1): | |
cat2_features = np.array(features_by_category[cat2]) | |
for feat1 in cat1_features: | |
for feat2 in cat2_features: | |
dist = np.linalg.norm(feat1 - feat2) | |
inter_distances.append(dist) | |
metrics['mean_intra_similarity'] = np.mean(intra_similarities) if intra_similarities else 0 | |
metrics['mean_inter_distance'] = np.mean(inter_distances) if inter_distances else 0 | |
metrics['feature_separability'] = ( | |
metrics['mean_inter_distance'] - metrics['mean_intra_similarity'] | |
) | |
self.results['feature_extraction'] = metrics | |
return metrics | |
def _calculate_classification_metrics(self, y_true: List[str], | |
y_pred: List[str]) -> Dict: | |
"""Calculate classification metrics""" | |
if not y_true or not y_pred: | |
return {} | |
# Get unique labels | |
labels = list(set(y_true + y_pred)) | |
# Calculate metrics | |
accuracy = accuracy_score(y_true, y_pred) | |
precision, recall, f1, support = precision_recall_fscore_support( | |
y_true, y_pred, labels=labels, average='weighted', zero_division=0 | |
) | |
# Per-class metrics | |
precision_per_class, recall_per_class, f1_per_class, support_per_class = \ | |
precision_recall_fscore_support( | |
y_true, y_pred, labels=labels, average=None, zero_division=0 | |
) | |
per_class_metrics = {} | |
for i, label in enumerate(labels): | |
per_class_metrics[label] = { | |
'precision': precision_per_class[i], | |
'recall': recall_per_class[i], | |
'f1': f1_per_class[i], | |
'support': support_per_class[i] | |
} | |
return { | |
'accuracy': accuracy, | |
'precision': precision, | |
'recall': recall, | |
'f1': f1, | |
'per_class_metrics': per_class_metrics, | |
'confusion_matrix': confusion_matrix(y_true, y_pred, labels=labels).tolist(), | |
'labels': labels | |
} | |
def generate_evaluation_report(self, output_dir: str = "./evaluation_results") -> str: | |
"""Generate comprehensive evaluation report""" | |
output_path = Path(output_dir) | |
output_path.mkdir(exist_ok=True) | |
report_file = output_path / "deepfashion2_evaluation_report.json" | |
# Compile all results | |
full_report = { | |
'config': { | |
'dataset_root': self.config.dataset_root, | |
'categories': self.config.categories, | |
'image_size': self.config.image_size | |
}, | |
'results': self.results, | |
'summary': self._generate_summary() | |
} | |
# Save report | |
with open(report_file, 'w') as f: | |
json.dump(full_report, f, indent=2) | |
print(f"Evaluation report saved to: {report_file}") | |
return str(report_file) | |
def _generate_summary(self) -> Dict: | |
"""Generate evaluation summary""" | |
summary = {} | |
if 'detection_accuracy' in self.results: | |
det_results = self.results['detection_accuracy'] | |
summary['detection'] = { | |
'accuracy': det_results.get('accuracy', 0), | |
'f1_score': det_results.get('f1', 0), | |
'mean_detection_score': det_results.get('mean_detection_score', 0) | |
} | |
if 'feature_extraction' in self.results: | |
feat_results = self.results['feature_extraction'] | |
summary['features'] = { | |
'feature_dimension': feat_results.get('feature_dimension', 0), | |
'categories_evaluated': len(feat_results.get('categories_found', [])), | |
'feature_separability': feat_results.get('feature_separability', 0) | |
} | |
return summary | |
def plot_confusion_matrix(self, output_dir: str = "./evaluation_results"): | |
"""Plot confusion matrix for detection results""" | |
if 'detection_accuracy' not in self.results: | |
print("No detection results available for plotting") | |
return | |
results = self.results['detection_accuracy'] | |
if 'confusion_matrix' not in results: | |
return | |
cm = np.array(results['confusion_matrix']) | |
labels = results['labels'] | |
plt.figure(figsize=(10, 8)) | |
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
xticklabels=labels, yticklabels=labels) | |
plt.title('Fashion Object Detection Confusion Matrix') | |
plt.xlabel('Predicted') | |
plt.ylabel('Actual') | |
output_path = Path(output_dir) | |
output_path.mkdir(exist_ok=True) | |
plt.savefig(output_path / 'confusion_matrix.png', dpi=300, bbox_inches='tight') | |
plt.close() | |
print(f"Confusion matrix saved to: {output_path / 'confusion_matrix.png'}") | |
def run_full_evaluation(analyzer, config: Optional[DeepFashion2Config] = None, | |
max_samples: int = 100) -> str: | |
""" | |
Run full evaluation pipeline | |
Args: | |
analyzer: HuggingFaceFashionAnalyzer instance | |
config: DeepFashion2 configuration | |
max_samples: Maximum samples to evaluate | |
Returns: | |
Path to evaluation report | |
""" | |
if config is None: | |
config = DeepFashion2Config() | |
evaluator = DeepFashion2Evaluator(config, analyzer) | |
print("Starting DeepFashion2 evaluation...") | |
# Run detection evaluation | |
try: | |
evaluator.evaluate_detection_accuracy(max_samples=max_samples) | |
print("β Detection evaluation completed") | |
except Exception as e: | |
print(f"β Detection evaluation failed: {e}") | |
# Run feature extraction evaluation | |
try: | |
evaluator.evaluate_feature_extraction(max_samples=max_samples) | |
print("β Feature extraction evaluation completed") | |
except Exception as e: | |
print(f"β Feature extraction evaluation failed: {e}") | |
# Generate report | |
report_path = evaluator.generate_evaluation_report() | |
# Plot confusion matrix | |
try: | |
evaluator.plot_confusion_matrix() | |
print("β Confusion matrix plotted") | |
except Exception as e: | |
print(f"β Confusion matrix plotting failed: {e}") | |
return report_path | |