File size: 5,540 Bytes
786f6a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import math
import torch
from torch.utils.data import Sampler
import torch.distributed as dist
class OrderedDistributedSampler(Sampler):
"""Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
:class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
process can pass a DistributedSampler instance as a DataLoader sampler,
and load a subset of the original dataset that is exclusive to it.
.. note::
Dataset is assumed to be of constant size.
Arguments:
dataset: Dataset used for sampling.
num_replicas (optional): Number of processes participating in
distributed training.
rank (optional): Rank of the current process within num_replicas.
"""
def __init__(self, dataset, num_replicas=None, rank=None):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
def __iter__(self):
indices = list(range(len(self.dataset)))
# add extra samples to make it evenly divisible
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
class RepeatAugSampler(Sampler):
"""Sampler that restricts data loading to a subset of the dataset for distributed,
with repeated augmentation.
It ensures that different each augmented version of a sample will be visible to a
different process (GPU). Heavily based on torch.utils.data.DistributedSampler
This sampler was taken from https://github.com/facebookresearch/deit/blob/0c4b8f60/samplers.py
Used in
Copyright (c) 2015-present, Facebook, Inc.
"""
def __init__(
self,
dataset,
num_replicas=None,
rank=None,
shuffle=True,
num_repeats=3,
selected_round=256,
selected_ratio=0,
):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.shuffle = shuffle
self.num_repeats = num_repeats
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * num_repeats / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
# Determine the number of samples to select per epoch for each rank.
# num_selected logic defaults to be the same as original RASampler impl, but this one can be tweaked
# via selected_ratio and selected_round args.
selected_ratio = selected_ratio or num_replicas # ratio to reduce selected samples by, num_replicas if 0
if selected_round:
self.num_selected_samples = int(math.floor(
len(self.dataset) // selected_round * selected_round / selected_ratio))
else:
self.num_selected_samples = int(math.ceil(len(self.dataset) / selected_ratio))
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
if self.shuffle:
indices = torch.randperm(len(self.dataset), generator=g)
else:
indices = torch.arange(start=0, end=len(self.dataset))
# produce repeats e.g. [0, 0, 0, 1, 1, 1, 2, 2, 2....]
if isinstance(self.num_repeats, float) and not self.num_repeats.is_integer():
# resample for repeats w/ non-integer ratio
repeat_size = math.ceil(self.num_repeats * len(self.dataset))
indices = indices[torch.tensor([int(i // self.num_repeats) for i in range(repeat_size)])]
else:
indices = torch.repeat_interleave(indices, repeats=int(self.num_repeats), dim=0)
indices = indices.tolist() # leaving as tensor thrashes dataloader memory
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size > 0:
indices += indices[:padding_size]
assert len(indices) == self.total_size
# subsample per rank
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# return up to num selected samples
return iter(indices[:self.num_selected_samples])
def __len__(self):
return self.num_selected_samples
def set_epoch(self, epoch):
self.epoch = epoch
|