Spaces:
Running
on
Zero
Running
on
Zero
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
# All rights reserved. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import os.path as osp | |
import pickle | |
import shutil | |
import tempfile | |
from collections import OrderedDict | |
from typing import Any, Dict, Generator, List, Optional, Tuple, Union | |
import numpy as np | |
import torch | |
from torch import Tensor | |
from torch import distributed as torch_dist | |
from torch._utils import (_flatten_dense_tensors, _take_tensors, | |
_unflatten_dense_tensors) | |
from torch.distributed import ProcessGroup | |
import mmengine | |
from .utils import (get_world_size, get_rank, get_backend, get_dist_info, | |
get_default_group, barrier, get_data_device, | |
get_comm_device, cast_data_device) | |
from mmengine.utils import digit_version | |
from mmengine.utils.dl_utils import TORCH_VERSION | |
from mmengine.device import is_npu_available | |
def _get_reduce_op(name: str) -> torch_dist.ReduceOp: | |
op_mappings = { | |
'sum': torch_dist.ReduceOp.SUM, | |
'product': torch_dist.ReduceOp.PRODUCT, | |
'min': torch_dist.ReduceOp.MIN, | |
'max': torch_dist.ReduceOp.MAX, | |
'band': torch_dist.ReduceOp.BAND, | |
'bor': torch_dist.ReduceOp.BOR, | |
'bxor': torch_dist.ReduceOp.BXOR, | |
} | |
if name.lower() not in op_mappings: | |
raise ValueError( | |
f'reduce op should be one of {op_mappings.keys()}, bug got {name}') | |
return op_mappings[name.lower()] | |
def all_reduce(data: Tensor, | |
op: str = 'sum', | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Reduces the tensor data across all machines in such a way that all get | |
the final result. | |
After the call ``data`` is going to be bitwise identical in all | |
processes. | |
Note: | |
Calling ``all_reduce`` in non-distributed environment does nothing. | |
Args: | |
data (Tensor): Input and output of the collective. The function | |
operates in-place. | |
op (str): Operation to reduce data. Defaults to 'sum'. Optional values | |
are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and | |
'bxor'. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = torch.arange(2, dtype=torch.int64) | |
>>> dist.all_reduce(data) | |
>>> data | |
tensor([0, 1]) | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank | |
>>> data | |
tensor([1, 2]) # Rank 0 | |
tensor([3, 4]) # Rank 1 | |
>>> dist.all_reduce(data, op=dist.ReduceOp.SUM) | |
>>> data | |
tensor([4, 6]) # Rank 0 | |
tensor([4, 6]) # Rank 1 | |
""" | |
world_size = get_world_size(group) | |
if world_size > 1: | |
if group is None: | |
group = get_default_group() | |
input_device = get_data_device(data) | |
backend_device = get_comm_device(group) | |
data_on_device = cast_data_device(data, backend_device) | |
# pytorch does not support 'mean' operation so we fall back to support | |
# it with 'sum' operation. | |
if op.lower() == 'mean': | |
torch_dist.all_reduce(data_on_device, _get_reduce_op('sum'), group) | |
# use true_divide to handle torch1.6.0 throws an RuntimeError when | |
# the type of `data_on_device` is int64 | |
data_on_device = torch.true_divide(data_on_device, world_size) | |
else: | |
torch_dist.all_reduce(data_on_device, _get_reduce_op(op), group) | |
cast_data_device(data_on_device, input_device, out=data) | |
def all_gather(data: Tensor, | |
group: Optional[ProcessGroup] = None) -> List[Tensor]: | |
"""Gather data from the whole group in a list. | |
Note: | |
Calling ``all_gather`` in non-distributed environment does nothing | |
and just returns a list containing :attr:`data` itself. | |
Note: | |
Unlike PyTorch ``torch.distributed.all_gather``, :meth:`all_gather` in | |
MMEngine does not pass in an empty list ``gather_list`` and returns | |
the ``gather_list`` directly, which is more convenient. The difference | |
between their interfaces is as below: | |
- MMEngine: all_gather(data, group) -> gather_list | |
- PyTorch: all_gather(gather_list, data, group) -> None | |
Args: | |
data (Tensor): Tensor to be gathered. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
list[Tensor]: Return a list containing data from the whole group if | |
in distributed environment, otherwise a list only containing | |
:attr:`data` itself. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = torch.arange(2, dtype=torch.int64) | |
>>> data | |
tensor([0, 1]) | |
>>> output = dist.all_gather(data) | |
>>> output | |
[tensor([0, 1])] | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank | |
>>> data | |
tensor([1, 2]) # Rank 0 | |
tensor([3, 4]) # Rank 1 | |
>>> output = dist.all_gather(data) | |
>>> output | |
[tensor([1, 2]), tensor([3, 4])] # Rank 0 | |
[tensor([1, 2]), tensor([3, 4])] # Rank 1 | |
""" | |
world_size = get_world_size(group) | |
if world_size == 1: | |
return [data] | |
if group is None: | |
group = get_default_group() | |
input_device = get_data_device(data) | |
backend_device = get_comm_device(group) | |
data_on_device = cast_data_device(data, backend_device) | |
gather_list = [ | |
torch.empty_like(data, device=backend_device) | |
for _ in range(world_size) | |
] | |
torch_dist.all_gather(gather_list, data_on_device, group) | |
return cast_data_device(gather_list, input_device) # type: ignore | |
def gather(data: Tensor, | |
dst: int = 0, | |
group: Optional[ProcessGroup] = None) -> List[Optional[Tensor]]: | |
"""Gather data from the whole group to ``dst`` process. | |
Note: | |
Calling ``gather`` in non-distributed environment dose nothing | |
and just returns a list containing :attr:`data` itself. | |
Note: | |
``NCCL`` backend does not support ``gather``. | |
Note: | |
Unlike PyTorch ``torch.distributed.gather``, :meth:`gather` in | |
MMEngine does not pass in an empty list ``gather_list`` and returns | |
the ``gather_list`` directly, which is more convenient. The difference | |
between their interfaces is as below: | |
- MMEngine: gather(data, dst, group) -> gather_list | |
- PyTorch: gather(data, gather_list, dst, group) -> None | |
Args: | |
data (Tensor): Tensor to be gathered. CUDA tensor is not supported. | |
dst (int): Destination rank. Defaults to 0. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
list[Tensor]: ``dst`` process will get a list of tensor gathering from | |
the whole group. Other process will get a empty list. If in | |
non-distributed environment, just return a list containing | |
:attr:`data` itself. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = torch.arange(2, dtype=torch.int64) | |
>>> data | |
tensor([0, 1]) | |
>>> output = dist.gather(data) | |
>>> output | |
[tensor([0, 1])] | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank | |
>>> data | |
tensor([1, 2]) # Rank 0 | |
tensor([3, 4]) # Rank 1 | |
>>> output = dist.gather(data) | |
>>> output | |
[tensor([1, 2]), tensor([3, 4])] # Rank 0 | |
[] # Rank 1 | |
""" | |
world_size = get_world_size(group) | |
if world_size == 1: | |
return [data] | |
if group is None: | |
group = get_default_group() | |
input_device = get_data_device(data) | |
backend_device = get_comm_device(group) | |
if get_rank(group) == dst: | |
gather_list = [ | |
torch.empty_like(data, device=backend_device) | |
for _ in range(world_size) | |
] | |
else: | |
gather_list = [] | |
torch_dist.gather(data, gather_list, dst, group) | |
if get_rank(group) == dst: | |
return cast_data_device(gather_list, input_device) # type: ignore | |
else: | |
return gather_list | |
def broadcast(data: Tensor, | |
src: int = 0, | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Broadcast the data from ``src`` process to the whole group. | |
``data`` must have the same number of elements in all processes | |
participating in the collective. | |
Note: | |
Calling ``broadcast`` in non-distributed environment does nothing. | |
Args: | |
data (Tensor): Data to be sent if ``src`` is the rank of current | |
process, and data to be used to save received data otherwise. | |
src (int): Source rank. Defaults to 0. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = torch.arange(2, dtype=torch.int64) | |
>>> data | |
tensor([0, 1]) | |
>>> dist.broadcast(data) | |
>>> data | |
tensor([0, 1]) | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank | |
>>> data | |
tensor([1, 2]) # Rank 0 | |
tensor([3, 4]) # Rank 1 | |
>>> dist.broadcast(data) | |
>>> data | |
tensor([1, 2]) # Rank 0 | |
tensor([1, 2]) # Rank 1 | |
""" | |
if get_world_size(group) > 1: | |
if group is None: | |
group = get_default_group() | |
input_device = get_data_device(data) | |
backend_device = get_comm_device(group) | |
data_on_device = cast_data_device(data, backend_device) | |
# broadcast requires tensor is contiguous | |
data_on_device = data_on_device.contiguous() # type: ignore | |
torch_dist.broadcast(data_on_device, src, group) | |
if get_rank(group) != src: | |
cast_data_device(data_on_device, input_device, data) | |
def sync_random_seed(group: Optional[ProcessGroup] = None) -> int: | |
"""Synchronize a random seed to all processes. | |
In distributed sampling, different ranks should sample non-overlapped | |
data in the dataset. Therefore, this function is used to make sure that | |
each rank shuffles the data indices in the same order based | |
on the same seed. Then different ranks could use different indices | |
to select non-overlapped data from the same data list. | |
Args: | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
int: Random seed. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> seed = dist.sync_random_seed() | |
>>> seed # which a random number | |
587791752 | |
>>> distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> seed = dist.sync_random_seed() | |
>>> seed | |
587791752 # Rank 0 | |
587791752 # Rank 1 | |
""" | |
seed = np.random.randint(2**31) | |
if get_world_size(group) == 1: | |
return seed | |
if group is None: | |
group = get_default_group() | |
backend_device = get_comm_device(group) | |
if get_rank(group) == 0: | |
random_num = torch.tensor(seed, dtype=torch.int32).to(backend_device) | |
else: | |
random_num = torch.tensor(0, dtype=torch.int32).to(backend_device) | |
torch_dist.broadcast(random_num, src=0, group=group) | |
return random_num.item() | |
def _object_to_tensor(obj: Any) -> Tuple[Tensor, Tensor]: | |
"""Serialize picklable python object to tensor.""" | |
byte_storage = torch.ByteStorage.from_buffer(pickle.dumps(obj)) | |
# Do not replace `torch.ByteTensor` or `torch.LongTensor` with torch.tensor | |
# and specifying dtype. Otherwise, it will cause 100X slowdown. | |
# See: https://github.com/pytorch/pytorch/issues/65696 | |
byte_tensor = torch.ByteTensor(byte_storage) | |
local_size = torch.LongTensor([byte_tensor.numel()]) | |
return byte_tensor, local_size | |
def _tensor_to_object(tensor: Tensor, tensor_size: int) -> Any: | |
"""Deserialize tensor to picklable python object.""" | |
buf = tensor.cpu().numpy().tobytes()[:tensor_size] | |
return pickle.loads(buf) | |
def _broadcast_object_list(object_list: List[Any], | |
src: int = 0, | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Broadcast picklable objects in ``object_list`` to the whole group. | |
Similar to :func:`broadcast`, but Python objects can be passed in. Note | |
that all objects in ``object_list`` must be picklable in order to be | |
broadcasted. | |
""" | |
if torch_dist.distributed_c10d._rank_not_in_group(group): | |
return | |
my_rank = get_rank() | |
# Serialize object_list elements to tensors on src rank. | |
if my_rank == src: | |
tensor_list, size_list = zip( | |
*[_object_to_tensor(obj) for obj in object_list]) | |
object_sizes_tensor = torch.cat(size_list) | |
else: | |
object_sizes_tensor = torch.empty(len(object_list), dtype=torch.long) | |
# Current device selection. | |
# To preserve backwards compatibility, ``device`` is ``None`` by default. | |
# in which case we run current logic of device selection, i.e. | |
# ``current_device`` is CUDA if backend is NCCL otherwise CPU device. In | |
# the case it is not ``None`` we move the size and object tensors to be | |
# broadcasted to this device. | |
group_backend = get_backend(group) | |
is_nccl_backend = group_backend == torch_dist.Backend.NCCL | |
current_device = torch.device('cpu') | |
is_hccl_backend = group_backend == 'hccl' | |
is_cncl_backend = group_backend == 'cncl' | |
if is_hccl_backend: | |
current_device = torch.device('npu', torch.npu.current_device()) | |
object_sizes_tensor = object_sizes_tensor.to(current_device) | |
elif is_cncl_backend: | |
current_device = torch.device('mlu', torch.mlu.current_device()) | |
object_sizes_tensor = object_sizes_tensor.to(current_device) | |
elif is_nccl_backend: | |
# See note about using torch.cuda.current_device() here in | |
# docstring. We cannot simply use my_rank since rank == device is | |
# not necessarily true. | |
current_device = torch.device('cuda', torch.cuda.current_device()) | |
object_sizes_tensor = object_sizes_tensor.to(current_device) | |
# Broadcast object sizes | |
torch_dist.broadcast(object_sizes_tensor, src=src, group=group) | |
# Concatenate and broadcast serialized object tensors | |
if my_rank == src: | |
object_tensor = torch.cat(tensor_list) | |
else: | |
object_tensor = torch.empty( | |
torch.sum(object_sizes_tensor).int().item(), | |
dtype=torch.uint8, | |
) | |
if is_nccl_backend or is_hccl_backend or is_cncl_backend: | |
object_tensor = object_tensor.to(current_device) | |
torch_dist.broadcast(object_tensor, src=src, group=group) | |
# Deserialize objects using their stored sizes. | |
offset = 0 | |
if my_rank != src: | |
for i, obj_size in enumerate(object_sizes_tensor): | |
obj_view = object_tensor[offset:offset + obj_size] | |
obj_view = obj_view.type(torch.uint8) | |
if obj_view.device != torch.device('cpu'): | |
obj_view = obj_view.cpu() | |
offset += obj_size | |
object_list[i] = _tensor_to_object(obj_view, obj_size) | |
def broadcast_object_list(data: List[Any], | |
src: int = 0, | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Broadcasts picklable objects in ``object_list`` to the whole group. | |
Similar to :func:`broadcast`, but Python objects can be passed in. Note | |
that all objects in ``object_list`` must be picklable in order to be | |
broadcasted. | |
Note: | |
Calling ``broadcast_object_list`` in non-distributed environment does | |
nothing. | |
Args: | |
data (List[Any]): List of input objects to broadcast. | |
Each object must be picklable. Only objects on the ``src`` rank | |
will be broadcast, but each rank must provide lists of equal sizes. | |
src (int): Source rank from which to broadcast ``object_list``. | |
group: (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Default is ``None``. | |
device (``torch.device``, optional): If not None, the objects are | |
serialized and converted to tensors which are moved to the | |
``device`` before broadcasting. Default is ``None``. | |
Note: | |
For NCCL-based process groups, internal tensor representations of | |
objects must be moved to the GPU device before communication starts. | |
In this case, the used device is given by | |
``torch.cuda.current_device()`` and it is the user's responsibility to | |
ensure that this is correctly set so that each rank has an individual | |
GPU, via ``torch.cuda.set_device()``. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = ['foo', 12, {1: 2}] | |
>>> dist.broadcast_object_list(data) | |
>>> data | |
['foo', 12, {1: 2}] | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> if dist.get_rank() == 0: | |
>>> # Assumes world_size of 3. | |
>>> data = ["foo", 12, {1: 2}] # any picklable object | |
>>> else: | |
>>> data = [None, None, None] | |
>>> dist.broadcast_object_list(data) | |
>>> data | |
["foo", 12, {1: 2}] # Rank 0 | |
["foo", 12, {1: 2}] # Rank 1 | |
""" | |
assert isinstance(data, list) | |
if get_world_size(group) > 1: | |
if group is None: | |
group = get_default_group() | |
if digit_version(TORCH_VERSION) >= digit_version( | |
'1.8.0') and not is_npu_available(): | |
torch_dist.broadcast_object_list(data, src, group) | |
else: | |
_broadcast_object_list(data, src, group) | |
def all_reduce_dict(data: Dict[str, Tensor], | |
op: str = 'sum', | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Reduces the dict across all machines in such a way that all get the | |
final result. | |
The code is modified from https://github.com/Megvii- | |
BaseDetection/YOLOX/blob/main/yolox/utils/allreduce_norm.py. | |
Args: | |
data (dict[str, Tensor]): Data to be reduced. | |
op (str): Operation to reduce data. Defaults to 'sum'. Optional values | |
are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and | |
'bxor'. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = { | |
'key1': torch.arange(2, dtype=torch.int64), | |
'key2': torch.arange(3, dtype=torch.int64) | |
} | |
>>> dist.all_reduce_dict(data) | |
>>> data | |
{'key1': tensor([0, 1]), 'key2': tensor([0, 1, 2])} | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> data = { | |
'key1': torch.arange(2, dtype=torch.int64), | |
'key2': torch.arange(3, dtype=torch.int64) | |
} | |
>>> dist.all_reduce_dict(data) | |
>>> data | |
{'key1': tensor([0, 2]), 'key2': tensor([0, 2, 4])} # Rank 0 | |
{'key1': tensor([0, 2]), 'key2': tensor([0, 2, 4])} # Rank 1 | |
""" | |
assert isinstance(data, dict) | |
world_size = get_world_size(group) | |
if world_size > 1: | |
if group is None: | |
group = get_default_group() | |
# ensure keys are consistent across processes | |
keys = sorted(data.keys()) | |
tensor_shapes = [data[k].shape for k in keys] | |
tensor_sizes = [data[k].numel() for k in keys] | |
if digit_version(TORCH_VERSION) == digit_version('1.5.0'): | |
# `torch.cat` in torch1.5 can not concatenate different types so | |
# we fallback to convert them all to float type. | |
flatten_tensor = torch.cat( | |
[data[k].flatten().float() for k in keys]) | |
else: | |
flatten_tensor = torch.cat([data[k].flatten() for k in keys]) | |
all_reduce(flatten_tensor, op=op, group=group) | |
split_tensors = [ | |
x.reshape(shape) for x, shape in zip( | |
torch.split(flatten_tensor, tensor_sizes), tensor_shapes) | |
] | |
for k, v in zip(keys, split_tensors): | |
data[k] = v | |
def _all_gather_object(object_list: List[Any], | |
obj: Any, | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Gather picklable objects from the whole group into a list. | |
Similar to :func:`all_gather`, but Python objects can be passed in. | |
Note that the object must be picklable in order to be gathered. | |
Args: | |
object_list (list[Any]): Output list. It should be correctly sized as | |
the size of the group for this collective and will contain the | |
output. | |
object (Any): Pickable Python object to be broadcast from current | |
process. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
None. If the calling rank is part of this group, the output of the | |
collective will be populated into the input ``object_list``. If the | |
calling rank is not part of the group, the passed in ``object_list`` | |
will be unmodified. | |
""" | |
if torch_dist.distributed_c10d._rank_not_in_group(group): | |
return | |
input_tensor, local_size = _object_to_tensor(obj) | |
group_backend = get_backend(group) | |
current_device = torch.device('cpu') | |
is_nccl_backend = group_backend == torch_dist.Backend.NCCL | |
if is_nccl_backend: | |
# See note about using torch.cuda.current_device() here in docstring. | |
# We cannot simply use my_rank since rank == device is not necessarily | |
# true. | |
current_device = torch.device('cuda', torch.cuda.current_device()) | |
input_tensor = input_tensor.to(current_device) | |
local_size = local_size.to(current_device) | |
# Gather all local sizes. This is so that we can find the max size, and | |
# index until the correct size when deserializing the tensors. | |
group_size = get_world_size(group=group) | |
object_sizes_tensor = torch.zeros( | |
group_size, dtype=torch.long, device=current_device) | |
object_size_list = [ | |
object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) | |
] | |
# Allgather tensor sizes | |
torch_dist.all_gather(object_size_list, local_size, group=group) | |
max_object_size = int(max(object_size_list).item()) | |
# Resize tensor to max size across all ranks. | |
input_tensor.resize_(max_object_size) | |
coalesced_output_tensor = torch.empty( | |
max_object_size * group_size, dtype=torch.uint8, device=current_device) | |
# Output tensors are nonoverlapping views of coalesced_output_tensor | |
output_tensors = [ | |
coalesced_output_tensor[max_object_size * i:max_object_size * (i + 1)] | |
for i in range(group_size) | |
] | |
torch_dist.all_gather(output_tensors, input_tensor, group=group) | |
# Deserialize outputs back to object. | |
for i, tensor in enumerate(output_tensors): | |
tensor = tensor.type(torch.uint8) | |
if tensor.device != torch.device('cpu'): | |
tensor = tensor.cpu() | |
tensor_size = object_size_list[i] | |
object_list[i] = _tensor_to_object(tensor, tensor_size) | |
def all_gather_object(data: Any, | |
group: Optional[ProcessGroup] = None) -> List[Any]: | |
"""Gather picklable objects from the whole group into a list. Similar to | |
:func:`all_gather`, but Python objects can be passed in. Note that the | |
object must be picklable in order to be gathered. | |
Note: | |
Calling ``all_gather_object`` in non-distributed environment does | |
nothing and just returns a list containing :attr:`data` itself. | |
Note: | |
Unlike PyTorch ``torch.distributed.all_gather_object``, | |
:meth:`all_gather_object` in MMEngine does not pass in an empty list | |
``gather_list`` and returns the ``gather_list`` directly, which is | |
more convenient. The difference between their interfaces is as below: | |
- MMEngine: all_gather_object(data, group) -> gather_list | |
- PyTorch: all_gather_object(gather_list, data, group) -> None | |
Args: | |
data (Any): Pickable Python object to be broadcast from current | |
process. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
list[Tensor]: Return a list containing data from the whole group if | |
in distributed environment, otherwise a list only containing | |
:attr:`data` itself. | |
Note: | |
For NCCL-based process groups, internal tensor representations | |
of objects must be moved to the GPU device before communication starts. | |
In this case, the used device is given by | |
``torch.cuda.current_device()`` and it is the user's responsibility to | |
ensure that this is correctly set so that each rank has an individual | |
GPU, via ``torch.cuda.set_device()``. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = ['foo', 12, {1: 2}] # any picklable object | |
>>> gather_objects = dist.all_gather_object(data[dist.get_rank()]) | |
>>> output | |
['foo'] | |
>>> # distributed environment | |
>>> # We have 3 process groups, 3 ranks. | |
>>> output = dist.all_gather_object(data[dist.get_rank()]) | |
>>> output | |
['foo', 12, {1: 2}] # Rank 0 | |
['foo', 12, {1: 2}] # Rank 1 | |
['foo', 12, {1: 2}] # Rank 2 | |
""" | |
world_size = get_world_size(group) | |
if world_size == 1: | |
return [data] | |
if group is None: | |
group = get_default_group() | |
gather_list = [None] * world_size | |
if digit_version(TORCH_VERSION) >= digit_version('1.8.0'): | |
torch_dist.all_gather_object(gather_list, data, group) | |
else: | |
_all_gather_object(gather_list, data, group) | |
return gather_list | |
def _validate_output_list_for_rank(my_rank: int, dst: int, | |
gather_list: Optional[list]) -> None: | |
"""Validate whether ``gather_list`` is None in non-dst ranks.""" | |
if dst == my_rank: | |
if not gather_list: | |
raise ValueError( | |
'Argument ``gather_list`` must be specified on destination ' | |
'rank.') | |
elif gather_list: | |
raise ValueError('Argument ``gather_list`` must NOT be specified ' | |
'on non-destination ranks.') | |
def _gather_object(obj: Any, | |
object_gather_list=None, | |
dst: int = 0, | |
group: Optional[ProcessGroup] = None) -> None: | |
"""Gathers picklable objects from the whole group in a single process. | |
Similar to :func:`gather`, but Python objects can be passed in. Note that | |
the object must be picklable in order to be gathered. | |
Args: | |
obj (Any): Input object. Must be picklable. | |
object_gather_list (list[Any], optional): Output list. On the ``dst`` | |
rank, it should be correctly sized as the size of the group for | |
this collective and will contain the output. Must be ``None`` on | |
non-dst ranks. Defaults to None. | |
dst (int): Destination rank. Defaults to 0. | |
group: (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
""" | |
if torch_dist.distributed_c10d._rank_not_in_group(group): | |
return | |
# Ensure object_gather_list is specified appopriately. | |
my_rank = get_rank() | |
_validate_output_list_for_rank(my_rank, dst, object_gather_list) | |
input_tensor, local_size = _object_to_tensor(obj) | |
group_backend = get_backend(group) | |
current_device = torch.device('cpu') | |
is_nccl_backend = group_backend == torch_dist.Backend.NCCL | |
if is_nccl_backend: | |
current_device = torch.device('cuda', torch.cuda.current_device()) | |
input_tensor = input_tensor.to(current_device) | |
local_size = local_size.to(current_device) | |
# Gather all local sizes. This is so that we can find the max size, and | |
# index until the correct size when deserializing the tensors. | |
group_size = get_world_size(group=group) | |
object_sizes_tensor = torch.zeros( | |
group_size, dtype=torch.long, device=current_device) | |
object_size_list = [ | |
object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) | |
] | |
# Allgather tensor sizes. An all-gather is needed here despite this being a | |
# gather, since each rank needs to broadcast a tensor of the same (maximal) | |
# size. | |
torch_dist.all_gather(object_size_list, local_size, group=group) | |
max_object_size = int(max(object_size_list).item()) | |
# Resize tensor to max size across all ranks. | |
input_tensor.resize_(max_object_size) | |
# Avoid populating output tensors if the result won't be gathered on this | |
# rank. | |
if my_rank == dst: | |
coalesced_output_tensor = torch.empty( | |
max_object_size * group_size, | |
dtype=torch.uint8, | |
device=current_device) | |
# Output tensors are nonoverlapping views of coalesced_output_tensor | |
output_tensors = [ | |
coalesced_output_tensor[max_object_size * i:max_object_size * | |
(i + 1)] for i in range(group_size) | |
] | |
# All ranks call gather with equal-sized tensors. | |
torch_dist.gather( | |
input_tensor, | |
gather_list=output_tensors if my_rank == dst else None, | |
dst=dst, | |
group=group, | |
) | |
if my_rank != dst: | |
return | |
for i, tensor in enumerate(output_tensors): | |
tensor = tensor.type(torch.uint8) | |
tensor_size = object_size_list[i] | |
object_gather_list[i] = _tensor_to_object(tensor, tensor_size) | |
def gather_object(data: Any, | |
dst: int = 0, | |
group: Optional[ProcessGroup] = None) -> Optional[List[Any]]: | |
"""Gathers picklable objects from the whole group in a single process. | |
Similar to :func:`gather`, but Python objects can be passed in. Note that | |
the object must be picklable in order to be gathered. | |
Note: | |
``NCCL backend`` does not support ``gather_object``. | |
Note: | |
Unlike PyTorch ``torch.distributed.gather_object``, | |
:meth:`gather_object` in MMEngine does not pass in an empty list | |
``gather_list`` and returns the ``gather_list`` directly, which is | |
more convenient. The difference between their interfaces is as below: | |
- MMEngine: gather_object(data, dst, group) -> gather_list | |
- PyTorch: gather_object(data, gather_list, data, group) -> None | |
Args: | |
data (Any): Input object. Must be picklable. | |
dst (int): Destination rank. Defaults to 0. | |
group: (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Returns: | |
list[Any]. On the ``dst`` rank, return ``gather_list`` which contains | |
the output of the collective. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = ['foo', 12, {1: 2}] # any picklable object | |
>>> gather_objects = dist.gather_object(data[dist.get_rank()]) | |
>>> output | |
['foo'] | |
>>> # distributed environment | |
>>> # We have 3 process groups, 3 ranks. | |
>>> dist.gather_object(gather_objects[dist.get_rank()], dst=0) | |
>>> output | |
['foo', 12, {1: 2}] # Rank 0 | |
None # Rank 1 | |
None # Rank 2 | |
""" | |
world_size = get_world_size(group) | |
if world_size == 1: | |
return [data] | |
if group is None: | |
group = get_default_group() | |
gather_list = [None] * world_size if get_rank(group) == dst else None | |
if digit_version(TORCH_VERSION) >= digit_version('1.8.0'): | |
torch_dist.gather_object(data, gather_list, dst, group) | |
else: | |
_gather_object(data, gather_list, dst, group) | |
return gather_list | |
def collect_results(results: list, | |
size: int, | |
device: str = 'cpu', | |
tmpdir: Optional[str] = None) -> Optional[list]: | |
"""Collected results in distributed environments. | |
Args: | |
results (list[object]): Result list containing result parts to be | |
collected. Each item of ``result_part`` should be a picklable | |
object. | |
size (int): Size of the results, commonly equal to length of | |
the results. | |
device (str): Device name. Optional values are 'cpu', 'gpu' or 'npu'. | |
tmpdir (str | None): Temporal directory for collected results to | |
store. If set to None, it will create a temporal directory for it. | |
``tmpdir`` should be None when device is 'gpu' or 'npu'. | |
Defaults to None. | |
Returns: | |
list or None: The collected results. | |
Examples: | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> import mmengine.dist as dist | |
>>> if dist.get_rank() == 0: | |
data = ['foo', {1: 2}] | |
else: | |
data = [24, {'a': 'b'}] | |
>>> size = 4 | |
>>> output = dist.collect_results(data, size, device='cpu') | |
>>> output | |
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 | |
None # rank 1 | |
""" | |
if device not in ['gpu', 'cpu', 'npu']: | |
raise NotImplementedError( | |
f"device must be 'cpu' , 'gpu' or 'npu', but got {device}") | |
if device == 'gpu' or device == 'npu': | |
assert tmpdir is None, f'tmpdir should be None when device is {device}' | |
return _collect_results_device(results, size) | |
else: | |
return collect_results_cpu(results, size, tmpdir) | |
def collect_results_cpu(result_part: list, | |
size: int, | |
tmpdir: Optional[str] = None) -> Optional[list]: | |
"""Collect results under cpu mode. | |
On cpu mode, this function will save the results on different gpus to | |
``tmpdir`` and collect them by the rank 0 worker. | |
Args: | |
result_part (list): Result list containing result parts | |
to be collected. Each item of ``result_part`` should be a picklable | |
object. | |
size (int): Size of the results, commonly equal to length of | |
the results. | |
tmpdir (str | None): Temporal directory for collected results to | |
store. If set to None, it will create a random temporal directory | |
for it. Defaults to None. | |
Returns: | |
list or None: The collected results. | |
Examples: | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> import mmengine.dist as dist | |
>>> if dist.get_rank() == 0: | |
data = ['foo', {1: 2}] | |
else: | |
data = [24, {'a': 'b'}] | |
>>> size = 4 | |
>>> output = dist.collect_results_cpu(data, size) | |
>>> output | |
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 | |
None # rank 1 | |
""" | |
rank, world_size = get_dist_info() | |
if world_size == 1: | |
return result_part[:size] | |
# create a tmp dir if it is not specified | |
if tmpdir is None: | |
MAX_LEN = 512 | |
# 32 is whitespace | |
dir_tensor = torch.full((MAX_LEN, ), 32, dtype=torch.uint8) | |
if rank == 0: | |
mmengine.mkdir_or_exist('.dist_test') | |
tmpdir = tempfile.mkdtemp(dir='.dist_test') | |
tmpdir = torch.tensor( | |
bytearray(tmpdir.encode()), dtype=torch.uint8) | |
dir_tensor[:len(tmpdir)] = tmpdir | |
broadcast(dir_tensor, 0) | |
tmpdir = dir_tensor.numpy().tobytes().decode().rstrip() | |
else: | |
mmengine.mkdir_or_exist(tmpdir) | |
# dump the part result to the dir | |
with open(osp.join(tmpdir, f'part_{rank}.pkl'), 'wb') as f: # type: ignore | |
pickle.dump(result_part, f, protocol=2) | |
barrier() | |
# collect all parts | |
if rank != 0: | |
return None | |
else: | |
# load results of all parts from tmp dir | |
part_list = [] | |
for i in range(world_size): | |
path = osp.join(tmpdir, f'part_{i}.pkl') # type: ignore | |
if not osp.exists(path): | |
raise FileNotFoundError( | |
f'{tmpdir} is not an shared directory for ' | |
f'rank {i}, please make sure {tmpdir} is a shared ' | |
'directory for all ranks!') | |
with open(path, 'rb') as f: | |
part_list.append(pickle.load(f)) | |
# sort the results | |
ordered_results = [] | |
for res in zip(*part_list): | |
ordered_results.extend(list(res)) | |
# the dataloader may pad some samples | |
ordered_results = ordered_results[:size] | |
# remove tmp dir | |
shutil.rmtree(tmpdir) # type: ignore | |
return ordered_results | |
def _collect_results_device(result_part: list, size: int) -> Optional[list]: | |
"""Collect results under gpu or npu mode.""" | |
rank, world_size = get_dist_info() | |
if world_size == 1: | |
return result_part[:size] | |
# gather all result part. Note that NCCL does not support gather so use | |
# all_gather_object instead. | |
part_list = all_gather_object(result_part) | |
if rank == 0: | |
# sort the results | |
ordered_results = [] | |
for res in zip(*part_list): | |
ordered_results.extend(list(res)) | |
# the dataloader may pad some samples | |
ordered_results = ordered_results[:size] | |
return ordered_results | |
else: | |
return None | |
def collect_results_gpu(result_part: list, size: int) -> Optional[list]: | |
"""Collect results under gpu mode. | |
On gpu mode, this function will encode results to gpu tensors and use gpu | |
communication for results collection. | |
Args: | |
result_part (list[object]): Result list containing result parts | |
to be collected. Each item of ``result_part`` should be a picklable | |
object. | |
size (int): Size of the results, commonly equal to length of | |
the results. | |
Returns: | |
list or None: The collected results. | |
Examples: | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> import mmengine.dist as dist | |
>>> if dist.get_rank() == 0: | |
data = ['foo', {1: 2}] | |
else: | |
data = [24, {'a': 'b'}] | |
>>> size = 4 | |
>>> output = dist.collect_results_gpu(data, size) | |
>>> output | |
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 | |
None # rank 1 | |
""" | |
return _collect_results_device(result_part, size) | |
def _all_reduce_coalesced(tensors: List[torch.Tensor], | |
bucket_size_mb: int = -1, | |
op: str = 'sum', | |
group: Optional[ProcessGroup] = None) -> None: | |
"""All-reduce a sequence of tensors as a whole. | |
Args: | |
tensors (List[torch.Tensor]): A sequence of tensors to be | |
all-reduced. | |
bucket_size_mb (int): The limit of each chunk in megabytes | |
for grouping tensors into chunks. Defaults to -1. | |
op (str): Operation to reduce data. Defaults to 'sum'. Optional values | |
are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and | |
'bxor'. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
""" | |
if bucket_size_mb > 0: | |
bucket_size_bytes = bucket_size_mb * 1024 * 1024 | |
buckets = _take_tensors(tensors, bucket_size_bytes) | |
else: | |
buckets = OrderedDict() | |
for tensor in tensors: | |
tp = tensor.type() | |
if tp not in buckets: | |
buckets[tp] = [] | |
buckets[tp].append(tensor) | |
buckets = buckets.values() | |
for bucket in buckets: | |
flat_tensors = _flatten_dense_tensors(bucket) | |
all_reduce(flat_tensors, op=op, group=group) | |
for tensor, synced in zip( | |
bucket, _unflatten_dense_tensors(flat_tensors, bucket)): | |
tensor.copy_(synced) | |
def all_reduce_params(params: Union[List, Generator[torch.Tensor, None, None]], | |
coalesce: bool = True, | |
bucket_size_mb: int = -1, | |
op: str = 'sum', | |
group: Optional[ProcessGroup] = None) -> None: | |
"""All-reduce parameters. | |
Args: | |
params (List or Generator[torch.Tensor, None, None]): List of | |
parameters or buffers of a model. | |
coalesce (bool, optional): Whether to reduce parameters as a whole. | |
Defaults to True. | |
bucket_size_mb (int, optional): Size of bucket, the unit is MB. | |
Defaults to -1. | |
op (str): Operation to reduce data. Defaults to 'sum'. Optional values | |
are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and | |
'bxor'. | |
group (ProcessGroup, optional): The process group to work on. If None, | |
the default process group will be used. Defaults to None. | |
Examples: | |
>>> import torch | |
>>> import mmengine.dist as dist | |
>>> # non-distributed environment | |
>>> data = [torch.arange(2), torch.arange(3)] | |
>>> dist.all_reduce_params(data) | |
>>> data | |
[tensor([0, 1]), tensor([0, 1, 2])] | |
>>> # distributed environment | |
>>> # We have 2 process groups, 2 ranks. | |
>>> if dist.get_rank() == 0: | |
... data = [torch.tensor([1, 2]), torch.tensor([3, 4])] | |
... else: | |
... data = [torch.tensor([2, 3]), torch.tensor([4, 5])] | |
>>> dist.all_reduce_params(data) | |
>>> data | |
[torch.tensor([3, 5]), torch.tensor([7, 9])] | |
""" | |
world_size = get_world_size(group) | |
if world_size == 1: | |
return | |
params_data = [param.data for param in params] | |
if coalesce: | |
_all_reduce_coalesced(params_data, bucket_size_mb, op=op, group=group) | |
else: | |
for tensor in params_data: | |
all_reduce(tensor, op=op, group=group) | |