|
import copy |
|
import numpy as np |
|
import random |
|
from collections import defaultdict |
|
from torch.utils.data.sampler import Sampler, RandomSampler, SequentialSampler |
|
|
|
|
|
class RandomDomainSampler(Sampler): |
|
"""Randomly samples N domains each with K images |
|
to form a minibatch of size N*K. |
|
|
|
Args: |
|
data_source (list): list of Datums. |
|
batch_size (int): batch size. |
|
n_domain (int): number of domains to sample in a minibatch. |
|
""" |
|
|
|
def __init__(self, data_source, batch_size, n_domain): |
|
self.data_source = data_source |
|
|
|
|
|
self.domain_dict = defaultdict(list) |
|
for i, item in enumerate(data_source): |
|
self.domain_dict[item.domain].append(i) |
|
self.domains = list(self.domain_dict.keys()) |
|
|
|
|
|
if n_domain is None or n_domain <= 0: |
|
n_domain = len(self.domains) |
|
assert batch_size % n_domain == 0 |
|
self.n_img_per_domain = batch_size // n_domain |
|
|
|
self.batch_size = batch_size |
|
|
|
self.n_domain = n_domain |
|
self.length = len(list(self.__iter__())) |
|
|
|
def __iter__(self): |
|
domain_dict = copy.deepcopy(self.domain_dict) |
|
final_idxs = [] |
|
stop_sampling = False |
|
|
|
while not stop_sampling: |
|
selected_domains = random.sample(self.domains, self.n_domain) |
|
|
|
for domain in selected_domains: |
|
idxs = domain_dict[domain] |
|
selected_idxs = random.sample(idxs, self.n_img_per_domain) |
|
final_idxs.extend(selected_idxs) |
|
|
|
for idx in selected_idxs: |
|
domain_dict[domain].remove(idx) |
|
|
|
remaining = len(domain_dict[domain]) |
|
if remaining < self.n_img_per_domain: |
|
stop_sampling = True |
|
|
|
return iter(final_idxs) |
|
|
|
def __len__(self): |
|
return self.length |
|
|
|
|
|
class SeqDomainSampler(Sampler): |
|
"""Sequential domain sampler, which randomly samples K |
|
images from each domain to form a minibatch. |
|
|
|
Args: |
|
data_source (list): list of Datums. |
|
batch_size (int): batch size. |
|
""" |
|
|
|
def __init__(self, data_source, batch_size): |
|
self.data_source = data_source |
|
|
|
|
|
self.domain_dict = defaultdict(list) |
|
for i, item in enumerate(data_source): |
|
self.domain_dict[item.domain].append(i) |
|
self.domains = list(self.domain_dict.keys()) |
|
self.domains.sort() |
|
|
|
|
|
n_domain = len(self.domains) |
|
assert batch_size % n_domain == 0 |
|
self.n_img_per_domain = batch_size // n_domain |
|
|
|
self.batch_size = batch_size |
|
|
|
self.n_domain = n_domain |
|
self.length = len(list(self.__iter__())) |
|
|
|
def __iter__(self): |
|
domain_dict = copy.deepcopy(self.domain_dict) |
|
final_idxs = [] |
|
stop_sampling = False |
|
|
|
while not stop_sampling: |
|
for domain in self.domains: |
|
idxs = domain_dict[domain] |
|
selected_idxs = random.sample(idxs, self.n_img_per_domain) |
|
final_idxs.extend(selected_idxs) |
|
|
|
for idx in selected_idxs: |
|
domain_dict[domain].remove(idx) |
|
|
|
remaining = len(domain_dict[domain]) |
|
if remaining < self.n_img_per_domain: |
|
stop_sampling = True |
|
|
|
return iter(final_idxs) |
|
|
|
def __len__(self): |
|
return self.length |
|
|
|
|
|
class RandomClassSampler(Sampler): |
|
"""Randomly samples N classes each with K instances to |
|
form a minibatch of size N*K. |
|
|
|
Modified from https://github.com/KaiyangZhou/deep-person-reid. |
|
|
|
Args: |
|
data_source (list): list of Datums. |
|
batch_size (int): batch size. |
|
n_ins (int): number of instances per class to sample in a minibatch. |
|
""" |
|
|
|
def __init__(self, data_source, batch_size, n_ins): |
|
if batch_size < n_ins: |
|
raise ValueError( |
|
"batch_size={} must be no less " |
|
"than n_ins={}".format(batch_size, n_ins) |
|
) |
|
|
|
self.data_source = data_source |
|
self.batch_size = batch_size |
|
self.n_ins = n_ins |
|
self.ncls_per_batch = self.batch_size // self.n_ins |
|
self.index_dic = defaultdict(list) |
|
for index, item in enumerate(data_source): |
|
self.index_dic[item.label].append(index) |
|
self.labels = list(self.index_dic.keys()) |
|
assert len(self.labels) >= self.ncls_per_batch |
|
|
|
|
|
self.length = len(list(self.__iter__())) |
|
|
|
def __iter__(self): |
|
batch_idxs_dict = defaultdict(list) |
|
|
|
for label in self.labels: |
|
idxs = copy.deepcopy(self.index_dic[label]) |
|
if len(idxs) < self.n_ins: |
|
idxs = np.random.choice(idxs, size=self.n_ins, replace=True) |
|
random.shuffle(idxs) |
|
batch_idxs = [] |
|
for idx in idxs: |
|
batch_idxs.append(idx) |
|
if len(batch_idxs) == self.n_ins: |
|
batch_idxs_dict[label].append(batch_idxs) |
|
batch_idxs = [] |
|
|
|
avai_labels = copy.deepcopy(self.labels) |
|
final_idxs = [] |
|
|
|
while len(avai_labels) >= self.ncls_per_batch: |
|
selected_labels = random.sample(avai_labels, self.ncls_per_batch) |
|
for label in selected_labels: |
|
batch_idxs = batch_idxs_dict[label].pop(0) |
|
final_idxs.extend(batch_idxs) |
|
if len(batch_idxs_dict[label]) == 0: |
|
avai_labels.remove(label) |
|
|
|
return iter(final_idxs) |
|
|
|
def __len__(self): |
|
return self.length |
|
|
|
|
|
def build_sampler( |
|
sampler_type, |
|
cfg=None, |
|
data_source=None, |
|
batch_size=32, |
|
n_domain=0, |
|
n_ins=16 |
|
): |
|
if sampler_type == "RandomSampler": |
|
return RandomSampler(data_source) |
|
|
|
elif sampler_type == "SequentialSampler": |
|
return SequentialSampler(data_source) |
|
|
|
elif sampler_type == "RandomDomainSampler": |
|
return RandomDomainSampler(data_source, batch_size, n_domain) |
|
|
|
elif sampler_type == "SeqDomainSampler": |
|
return SeqDomainSampler(data_source, batch_size) |
|
|
|
elif sampler_type == "RandomClassSampler": |
|
return RandomClassSampler(data_source, batch_size, n_ins) |
|
|
|
else: |
|
raise ValueError("Unknown sampler type: {}".format(sampler_type)) |
|
|