|
|
|
|
|
|
|
|
|
|
|
import math |
|
|
import random |
|
|
from typing import Iterator, TypeVar |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from torch.utils.data import DataLoader, Dataset, Sampler |
|
|
|
|
|
from internlm.core.context import ParallelMode |
|
|
from internlm.core.context import global_context as gpc |
|
|
from internlm.utils.logger import get_logger |
|
|
|
|
|
logger = get_logger(__file__) |
|
|
|
|
|
T_co = TypeVar("T_co", covariant=True) |
|
|
|
|
|
|
|
|
class DataParallelSampler(Sampler): |
|
|
"""A data sampler for distributed data parallelism. |
|
|
|
|
|
Args: |
|
|
dataset (:class:`torch.utils.data.Dataset`): The Dataset for sampling. |
|
|
shuffle (bool, optional): Whether to shuffle data, defaults to False. |
|
|
seed (int, optional): The random seed used for sampling, defaults to 0. |
|
|
drop_last (bool, optional): Set to True to drop the last incomplete batch, if the dataset size |
|
|
is not divisible by the batch size. If False and the size of dataset is not divisible by |
|
|
the batch size, then the last batch will be smaller, defaults to False. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
dataset: Dataset, |
|
|
shuffle: bool = False, |
|
|
seed: int = 0, |
|
|
drop_last: bool = False, |
|
|
) -> None: |
|
|
self.dataset = dataset |
|
|
self.num_replicas = gpc.get_world_size(ParallelMode.DATA) |
|
|
self.rank = gpc.get_local_rank(ParallelMode.DATA) |
|
|
self.epoch = 0 |
|
|
self.drop_last = drop_last |
|
|
|
|
|
|
|
|
|
|
|
if self.drop_last and len(self.dataset) % self.num_replicas != 0: |
|
|
|
|
|
|
|
|
|
|
|
self.num_samples = math.ceil( |
|
|
|
|
|
|
|
|
(len(self.dataset) - self.num_replicas) |
|
|
/ self.num_replicas |
|
|
) |
|
|
else: |
|
|
self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) |
|
|
self.total_size = self.num_samples * self.num_replicas |
|
|
self.shuffle = shuffle |
|
|
self.seed = seed |
|
|
|
|
|
def __iter__(self) -> Iterator[T_co]: |
|
|
if self.shuffle: |
|
|
|
|
|
g = torch.Generator() |
|
|
g.manual_seed(self.seed + self.epoch) |
|
|
|
|
|
indices = torch.randperm(len(self.dataset), generator=g).tolist() |
|
|
|
|
|
|
|
|
|
|
|
self.epoch += 1 |
|
|
else: |
|
|
indices = list(range(len(self.dataset))) |
|
|
|
|
|
if not self.drop_last: |
|
|
|
|
|
padding_size = self.total_size - len(indices) |
|
|
if padding_size <= len(indices): |
|
|
indices += indices[:padding_size] |
|
|
else: |
|
|
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] |
|
|
else: |
|
|
|
|
|
indices = indices[: self.total_size] |
|
|
assert len(indices) == self.total_size |
|
|
|
|
|
|
|
|
indices = indices[self.rank : self.total_size : self.num_replicas] |
|
|
assert len(indices) == self.num_samples |
|
|
|
|
|
return iter(indices) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return self.num_samples |
|
|
|
|
|
def set_epoch(self, epoch: int) -> None: |
|
|
r"""Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas |
|
|
use a different random ordering for each epoch. Otherwise, the next iteration of this |
|
|
sampler will yield the same ordering. |
|
|
|
|
|
Args: |
|
|
epoch (int): Epoch number. |
|
|
""" |
|
|
self.epoch = epoch |
|
|
|
|
|
|
|
|
def get_dpsampler_dataloader( |
|
|
dataset, |
|
|
shuffle=False, |
|
|
seed=1024, |
|
|
add_sampler=True, |
|
|
drop_last=False, |
|
|
pin_memory=False, |
|
|
num_workers=0, |
|
|
**kwargs, |
|
|
): |
|
|
r"""Set up a deterministic dataloader (also configure seed workers, samplers and whether shuffle or not) |
|
|
|
|
|
Note: |
|
|
When pipeline parallel is enabled, shuffle cannot be True as it will result in mismatch between input data |
|
|
on the 1st stage and label on the last stage. |
|
|
|
|
|
Args: |
|
|
dataset (:class:`torch.utils.data.Dataset`): The dataset to be loaded. |
|
|
shuffle (bool, optional): Whether to shuffle the dataset. Defaults to False. |
|
|
seed (int, optional): Random worker seed for sampling, defaults to 1024. |
|
|
add_sampler: Whether to add ``DistributedDataParallelSampler`` to the dataset. Defaults to True. |
|
|
drop_last (bool, optional): Set to True to drop the last incomplete batch, if the dataset size |
|
|
is not divisible by the batch size. If False and the size of dataset is not divisible by |
|
|
the batch size, then the last batch will be smaller, defaults to False. |
|
|
pin_memory (bool, optional): Whether to pin memory address in CPU memory. Defaults to False. |
|
|
num_workers (int, optional): Number of worker threads for this dataloader. Defaults to 0. |
|
|
kwargs (dict): optional parameters for ``torch.utils.data.DataLoader``, more details could be found in |
|
|
`DataLoader <https://pytorch.org/docs/stable/_modules/torch/utils/data/dataloader.html#DataLoader>`_. |
|
|
|
|
|
Returns: |
|
|
:class:`torch.utils.data.DataLoader`: A DataLoader used for training or testing. |
|
|
""" |
|
|
_kwargs = kwargs.copy() |
|
|
|
|
|
if add_sampler and gpc.is_initialized(ParallelMode.DATA) and gpc.get_world_size(ParallelMode.DATA) > 1: |
|
|
sampler = DataParallelSampler(dataset, shuffle=shuffle, drop_last=drop_last) |
|
|
else: |
|
|
sampler = None |
|
|
|
|
|
|
|
|
def seed_worker(): |
|
|
worker_seed = seed |
|
|
np.random.seed(worker_seed) |
|
|
torch.manual_seed(worker_seed) |
|
|
random.seed(worker_seed) |
|
|
|
|
|
if sampler is None: |
|
|
return DataLoader( |
|
|
dataset, |
|
|
worker_init_fn=seed_worker, |
|
|
shuffle=shuffle, |
|
|
drop_last=drop_last, |
|
|
pin_memory=pin_memory, |
|
|
num_workers=num_workers, |
|
|
**_kwargs, |
|
|
) |
|
|
else: |
|
|
return DataLoader( |
|
|
dataset, |
|
|
sampler=sampler, |
|
|
worker_init_fn=seed_worker, |
|
|
drop_last=drop_last, |
|
|
pin_memory=pin_memory, |
|
|
num_workers=num_workers, |
|
|
**_kwargs, |
|
|
) |
|
|
|
|
|
|
|
|
class StaticBatchSampler: |
|
|
""" |
|
|
A static batch sampler that generates batches with a fixed micro-batch size. |
|
|
|
|
|
Args: |
|
|
num_samples (int): The total number of samples in the dataset. |
|
|
batch_size (int): The batch size for the current rank. Defaults to 192. |
|
|
rampup_batch_size (str): A string with three space-separated integers representing the |
|
|
starting batch size, the increment, and the number of steps between |
|
|
each increment. For tools, "192 24 8" means that the batch size |
|
|
starts at 192 and increases by 24 every 8 steps. Defaults to |
|
|
"6 2 8", which corresponds to a batch size of 2 for the first 6 steps. |
|
|
micro_bsz (int): The micro-batch size. Defaults to 2. |
|
|
seed (int): The random seed for shuffling the indices. Defaults to 0. |
|
|
drop_last (bool): If True, drop the last incomplete batch. Currently only supports True. Defaults to True. |
|
|
data_rank (int): The rank of the current process in the data parallel group. Defaults to 0. |
|
|
data_world_size (int): The number of processes in the data parallel group. Defaults to 1. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
datasets, |
|
|
batch_size=192, |
|
|
rampup_batch_size="6 2 8", |
|
|
micro_bsz=2, |
|
|
seed=0, |
|
|
drop_last=True, |
|
|
data_rank=0, |
|
|
data_world_size=1, |
|
|
): |
|
|
assert drop_last is True, "Currently only support drop last" |
|
|
if rampup_batch_size: |
|
|
|
|
|
start_bsz, bsz_incre, incre_every = map(int, rampup_batch_size.split()) |
|
|
else: |
|
|
start_bsz, bsz_incre, incre_every = batch_size, batch_size, 1 |
|
|
self.raw_rampup_batch_size = rampup_batch_size |
|
|
self.start_bsz = start_bsz |
|
|
self.bsz_incre = bsz_incre |
|
|
self.incre_every = incre_every |
|
|
if gpc.is_initialized(ParallelMode.PIPELINE): |
|
|
assert ( |
|
|
batch_size - self.start_bsz |
|
|
) % self.bsz_incre == 0, f"{batch_size} - {self.start_bsz} should be multiple of {self.bsz_incre}" |
|
|
assert batch_size % micro_bsz == 0, f"batch_size({batch_size}) should be multiple of micro_bsz({micro_bsz})" |
|
|
assert ( |
|
|
self.start_bsz % micro_bsz == 0 |
|
|
), f"start_bsz({self.start_bsz}) should be multiple of micro_bsz({micro_bsz})" |
|
|
assert ( |
|
|
self.bsz_incre % micro_bsz == 0 |
|
|
), f"bsz_incre({self.bsz_incre}) should be multiple of micro_bsz({micro_bsz})" |
|
|
|
|
|
self.batch_size = batch_size |
|
|
self.epoch = 0 |
|
|
self.seed = seed |
|
|
self.rng = np.random.RandomState(seed) |
|
|
self.batch_count = 0 |
|
|
self.micro_bsz = micro_bsz |
|
|
self.data_rank = data_rank |
|
|
self.data_world_size = data_world_size |
|
|
self.num_consumed_samples_in_epoch = 0 |
|
|
self.datasets = datasets |
|
|
self.num_samples = sum([len(ds) for ds in datasets]) |
|
|
|
|
|
self.get_indices() |
|
|
|
|
|
def get_indices(self, old_indices=None): |
|
|
if old_indices is not None: |
|
|
assert ( |
|
|
len(old_indices) <= self.num_samples |
|
|
), f"The checkpoint has {len(old_indices)} samples, \ |
|
|
while the new restart use less samples ({self.num_samples})" |
|
|
|
|
|
else: |
|
|
old_indices = np.array([]) |
|
|
|
|
|
|
|
|
indices = np.arange(len(old_indices), self.num_samples) |
|
|
self.rng_state = self.rng.get_state() |
|
|
self.rng.shuffle(indices) |
|
|
|
|
|
ramp_steps = (self.batch_size - self.start_bsz) // self.bsz_incre |
|
|
if self.batch_count < ramp_steps * self.incre_every: |
|
|
rampup_samples = 0 |
|
|
for i in range(ramp_steps): |
|
|
rampup_samples += (i * self.bsz_incre + self.start_bsz) * self.incre_every |
|
|
assert ( |
|
|
rampup_samples * self.data_world_size <= self.num_samples |
|
|
), f"Too much rampup samples: \ |
|
|
{rampup_samples*self.data_world_size} Vs. self.num_samples: {self.num_samples}" |
|
|
|
|
|
num_samples = (self.num_samples - rampup_samples * self.data_world_size) // ( |
|
|
self.batch_size * self.data_world_size |
|
|
) |
|
|
num_samples = num_samples * self.batch_size * self.data_world_size + rampup_samples * self.data_world_size |
|
|
else: |
|
|
num_samples = self.num_samples // (self.batch_size * self.data_world_size) |
|
|
num_samples = num_samples * self.batch_size * self.data_world_size |
|
|
indices = np.concatenate([old_indices, indices]).astype(int) |
|
|
indices = indices[:num_samples] |
|
|
self.indices = indices |
|
|
assert len(self.indices) >= self.batch_size, "The number of samples should be larger than batch_size" |
|
|
self.num_consumed_samples_in_epoch = 0 |
|
|
|
|
|
def set_epoch(self, epoch): |
|
|
self.epoch = epoch |
|
|
self.rng = np.random.RandomState(self.seed + self.epoch) |
|
|
|
|
|
def __len__(self): |
|
|
ramp_steps = (self.batch_size - self.start_bsz) // self.bsz_incre |
|
|
if self.batch_count < ramp_steps * self.incre_every: |
|
|
rampup_samples = 0 |
|
|
for i in range(ramp_steps): |
|
|
rampup_samples += (i * self.bsz_incre + self.start_bsz) * self.incre_every |
|
|
assert ( |
|
|
rampup_samples * self.data_world_size <= self.num_samples |
|
|
), f"Too much rampup samples: {rampup_samples*self.data_world_size} \ |
|
|
Vs. self.num_samples: {self.num_samples}" |
|
|
|
|
|
num_batches = (self.num_samples - rampup_samples * self.data_world_size) // self.batch_size |
|
|
num_batches = num_batches // self.data_world_size + self.incre_every * ramp_steps |
|
|
else: |
|
|
num_batches = self.num_samples // self.batch_size // self.data_world_size |
|
|
|
|
|
return num_batches |
|
|
|
|
|
def __iter__(self): |
|
|
indices = self.indices[self.data_rank :: self.data_world_size] |
|
|
while self.num_consumed_samples_in_epoch < len(indices): |
|
|
batch_rampup_idx = self.batch_count // self.incre_every |
|
|
cur_batch_size = batch_rampup_idx * self.bsz_incre + self.start_bsz |
|
|
cur_batch_size = min(cur_batch_size, self.batch_size) |
|
|
batch = indices[self.num_consumed_samples_in_epoch : self.num_consumed_samples_in_epoch + cur_batch_size] |
|
|
yield batch |
|
|
self.num_consumed_samples_in_epoch += len(batch) |
|
|
self.batch_count += 1 |
|
|
self.get_indices() |
|
|
|
|
|
def state_dict(self): |
|
|
states = { |
|
|
"batch_size": self.batch_size, |
|
|
"raw_rampup_batch_size": self.raw_rampup_batch_size, |
|
|
"rng_state": self.rng_state, |
|
|
"epoch": self.epoch, |
|
|
"seed": self.seed, |
|
|
"data_world_size": self.data_world_size, |
|
|
"num_consumed_samples_in_epoch": self.num_consumed_samples_in_epoch, |
|
|
"batch_count": self.batch_count, |
|
|
|
|
|
"indices": self.indices, |
|
|
} |
|
|
|
|
|
return states |
|
|
|
|
|
def load_state_dict(self, states): |
|
|
for name in ("data_world_size", "raw_rampup_batch_size", "seed"): |
|
|
assert states[name] == getattr(self, name), (name, states[name], getattr(self, name)) |
|
|
self.rng.set_state(states["rng_state"]) |
|
|
self.get_indices(old_indices=None) |
|
|
self.epoch = states["epoch"] |
|
|
self.batch_count = states["batch_count"] |
|
|
self.num_consumed_samples_in_epoch = states["num_consumed_samples_in_epoch"] |
|
|
|
|
|
def copy(self): |
|
|
copy_sampler = StaticBatchSampler( |
|
|
self.datasets, |
|
|
self.batch_size, |
|
|
self.raw_rampup_batch_size, |
|
|
self.micro_bsz, |
|
|
self.seed, |
|
|
drop_last=True, |
|
|
data_rank=self.data_rank, |
|
|
data_world_size=self.data_world_size, |
|
|
) |
|
|
|
|
|
copy_sampler.load_state_dict(self.state_dict()) |
|
|
return copy_sampler |
|
|
|