Spaces:
Runtime error
Runtime error
import numpy as np | |
import random | |
import copy | |
import time | |
import warnings | |
from torch.utils.data import Sampler | |
from torch._six import int_classes as _int_classes | |
class CustomPairBatchSampler(Sampler): | |
"""Wraps another sampler to yield a mini-batch of indices. | |
The structure of this sampler is way to complicated because it is a shorter/simplified version of | |
CustomBatchSampler. The relations between breeds are not relevant for the cvpr 2022 paper, but we kept | |
this structure which we were using for the experiments with clade related losses. ToDo: restructure | |
this sampler. | |
Args: | |
data_sampler_info (dict): a dictionnary, containing information about the dataset and breeds. | |
batch_size (int): Size of mini-batch. | |
""" | |
def __init__(self, data_sampler_info, batch_size): | |
if not isinstance(batch_size, _int_classes) or isinstance(batch_size, bool) or \ | |
batch_size <= 0: | |
raise ValueError("batch_size should be a positive integer value, " | |
"but got batch_size={}".format(batch_size)) | |
assert batch_size%2 == 0 | |
self.data_sampler_info = data_sampler_info | |
self.batch_size = batch_size | |
self.n_desired_batches = int(np.floor(len(self.data_sampler_info['name_list']) / batch_size)) # 157 | |
def get_description(self): | |
description = "\ | |
This sampler works only for even batch sizes. \n\ | |
It returns pairs of dogs of the same breed" | |
return description | |
def __iter__(self): | |
breeds_summary = self.data_sampler_info['breeds_summary'] | |
breed_image_dict_orig = {} | |
for img_name in self.data_sampler_info['name_list']: # ['n02093859-Kerry_blue_terrier/n02093859_913.jpg', ... ] | |
folder_name = img_name.split('/')[0] | |
breed_name = folder_name.split(folder_name.split('-')[0] + '-')[1] | |
if not (breed_name in breed_image_dict_orig): | |
breed_image_dict_orig[breed_name] = [img_name] | |
else: | |
breed_image_dict_orig[breed_name].append(img_name) | |
lengths = np.zeros((len(breed_image_dict_orig.values()))) | |
for ind, value in enumerate(breed_image_dict_orig.values()): | |
lengths[ind] = len(value) | |
sim_matrix_raw = self.data_sampler_info['breeds_sim_martix_raw'] | |
sim_matrix_raw[sim_matrix_raw>0].shape # we have 1061 connections | |
# from ind_in_sim_mat to breed_name | |
inverse_sim_dict = {} | |
for abbrev, ind in self.data_sampler_info['breeds_sim_abbrev_inds'].items(): | |
# breed_name might be None | |
breed = breeds_summary[abbrev] | |
breed_name = breed._name_stanext | |
inverse_sim_dict[ind] = {'abbrev': abbrev, | |
'breed_name': breed_name} | |
# similarity for relevant breeds only: | |
related_breeds_top_orig = {} | |
temp = np.arange(sim_matrix_raw.shape[0]) | |
for breed_name, breed_images in breed_image_dict_orig.items(): | |
abbrev = self.data_sampler_info['breeds_abbrev_dict'][breed_name] | |
related_breeds = {} | |
if abbrev in self.data_sampler_info['breeds_sim_abbrev_inds'].keys(): | |
ind_in_sim_mat = self.data_sampler_info['breeds_sim_abbrev_inds'][abbrev] | |
row = sim_matrix_raw[ind_in_sim_mat, :] | |
rel_inds = temp[row>0] | |
for ind in rel_inds: | |
rel_breed_name = inverse_sim_dict[ind]['breed_name'] | |
rel_abbrev = inverse_sim_dict[ind]['abbrev'] | |
# does this breed exist in this dataset? | |
if (rel_breed_name is not None) and (rel_breed_name in breed_image_dict_orig.keys()) and not (rel_breed_name==breed_name): | |
related_breeds[rel_breed_name] = row[ind] | |
related_breeds_top_orig[breed_name] = related_breeds | |
breed_image_dict = copy.deepcopy(breed_image_dict_orig) | |
related_breeds_top = copy.deepcopy(related_breeds_top_orig) | |
# clean the related_breeds_top dict such that it only contains breeds which are available | |
for breed_name, breed_images in breed_image_dict.items(): | |
if len(breed_image_dict[breed_name]) < 1: | |
for breed_name_rel in list(related_breeds_top[breed_name].keys()): | |
related_breeds_top[breed_name_rel].pop(breed_name, None) | |
related_breeds_top[breed_name].pop(breed_name_rel, None) | |
# 1) build pairs of dogs | |
set_of_breeds_with_at_least_2 = set() | |
for breed_name, breed_images in breed_image_dict.items(): | |
if len(breed_images) >= 2: | |
set_of_breeds_with_at_least_2.add(breed_name) | |
n_unused_images = len(self.data_sampler_info['name_list']) | |
all_dog_duos = [] | |
n_new_duos = 1 | |
while n_new_duos > 0: | |
for breed_name, breed_images in breed_image_dict.items(): | |
# shuffle image list for this specific breed (this changes the dict) | |
random.shuffle(breed_images) | |
breed_list = list(related_breeds_top.keys()) | |
random.shuffle(breed_list) | |
n_new_duos = 0 | |
for breed_name in breed_list: | |
if len(breed_image_dict[breed_name]) >= 2: | |
dog_a = breed_image_dict[breed_name].pop() | |
dog_b = breed_image_dict[breed_name].pop() | |
dog_duo = [dog_a, dog_b] | |
all_dog_duos.append({'image_names': dog_duo}) | |
# clean the related_breeds_top dict such that it only contains breeds which are still available | |
if len(breed_image_dict[breed_name]) < 1: | |
for breed_name_rel in list(related_breeds_top[breed_name].keys()): | |
related_breeds_top[breed_name_rel].pop(breed_name, None) | |
related_breeds_top[breed_name].pop(breed_name_rel, None) | |
n_new_duos += 1 | |
n_unused_images -= 2 | |
image_name_to_ind = {} | |
for ind_img_name, img_name in enumerate(self.data_sampler_info['name_list']): | |
image_name_to_ind[img_name] = ind_img_name | |
# take all images and create the batches | |
n_avail_2 = len(all_dog_duos) | |
all_batches = [] | |
ind_in_duos = 0 | |
n_imgs_used_twice = 0 | |
for ind_b in range(0, self.n_desired_batches): | |
batch_with_image_names = [] | |
for ind in range(int(np.floor(self.batch_size / 2))): | |
if ind_in_duos >= n_avail_2: | |
ind_rand = random.randint(0, n_avail_2-1) | |
batch_with_image_names.extend(all_dog_duos[ind_rand]['image_names']) | |
n_imgs_used_twice += 2 | |
else: | |
batch_with_image_names.extend(all_dog_duos[ind_in_duos]['image_names']) | |
ind_in_duos += 1 | |
batch_with_inds = [] | |
for image_name in batch_with_image_names: # rather a folder than name | |
batch_with_inds.append(image_name_to_ind[image_name]) | |
all_batches.append(batch_with_inds) | |
for batch in all_batches: | |
yield batch | |
def __len__(self): | |
# Since we are sampling pairs of dogs and not each breed has an even number of dogs, we can not | |
# guarantee to show each dog exacly once. What we do instead, is returning the same amount of | |
# batches as we would return with a standard sampler which is not based on dog pairs. | |
'''if self.drop_last: | |
return len(self.sampler) // self.batch_size # type: ignore | |
else: | |
return (len(self.sampler) + self.batch_size - 1) // self.batch_size # type: ignore''' | |
return self.n_desired_batches | |