JustinLin610
update
8437114
raw history blame
No virus
30.6 kB
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import io
import logging
import os
import pickle
import random
import socket
import struct
import subprocess
import warnings
from argparse import Namespace
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional
import sys
import time
import torch
import torch.distributed as dist
from fairseq.dataclass.configs import DistributedTrainingConfig, FairseqConfig
from omegaconf import open_dict
try:
import torch_xla.core.xla_model as xm
except ImportError:
xm = None
# Flag to indicate if we're using Megatron
# NOTE: this is a temporary hack until we move away from Megatron's model parallel init
_USE_MEGATRON = False
# Whether to use XLA ops (e.g., on TPUs) instead of CUDA ops.
_USE_XLA = False
logger = logging.getLogger(__name__)
def is_master(cfg: DistributedTrainingConfig):
return cfg.distributed_rank == 0
def infer_init_method(cfg: DistributedTrainingConfig, force_distributed=False):
if cfg.distributed_init_method is not None or cfg.tpu:
return
num_pipelines_per_node = None
if cfg.pipeline_model_parallel:
num_pipeline_devices, num_pipelines_per_node = _pipeline_parallel_pre_init(cfg)
if all(
key in os.environ
for key in ["MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "RANK"]
):
# support torch.distributed.launch
_infer_torch_distributed_launch_init(cfg)
elif cfg.distributed_port > 0:
# we can determine the init method automatically for Slurm
_infer_slurm_init(cfg, num_pipelines_per_node)
elif cfg.distributed_world_size > 1 or force_distributed:
# fallback for single node with multiple GPUs
_infer_single_node_init(cfg)
if cfg.pipeline_model_parallel:
_pipeline_parallel_post_init(cfg, num_pipeline_devices, num_pipelines_per_node)
elif not cfg.distributed_no_spawn:
with open_dict(cfg):
cfg.distributed_num_procs = min(
torch.cuda.device_count(), cfg.distributed_world_size
)
def _infer_torch_distributed_launch_init(cfg: DistributedTrainingConfig):
cfg.distributed_init_method = "env://"
cfg.distributed_world_size = int(os.environ["WORLD_SIZE"])
cfg.distributed_rank = int(os.environ["RANK"])
# processes are created by torch.distributed.launch
cfg.distributed_no_spawn = True
def _infer_slurm_init(cfg: DistributedTrainingConfig, num_pipelines_per_node):
node_list = os.environ.get("SLURM_STEP_NODELIST")
if node_list is None:
node_list = os.environ.get("SLURM_JOB_NODELIST")
if node_list is not None:
try:
hostnames = subprocess.check_output(
["scontrol", "show", "hostnames", node_list]
)
cfg.distributed_init_method = "tcp://{host}:{port}".format(
host=hostnames.split()[0].decode("utf-8"),
port=cfg.distributed_port,
)
nnodes = int(os.environ.get("SLURM_NNODES"))
ntasks_per_node = os.environ.get("SLURM_NTASKS_PER_NODE")
if ntasks_per_node is not None:
ntasks_per_node = int(ntasks_per_node)
else:
ntasks = int(os.environ.get("SLURM_NTASKS"))
nnodes = int(os.environ.get("SLURM_NNODES"))
assert ntasks % nnodes == 0
ntasks_per_node = int(ntasks / nnodes)
if ntasks_per_node == 1:
gpus_per_node = torch.cuda.device_count()
node_id = int(os.environ.get("SLURM_NODEID"))
cfg.distributed_rank = node_id * gpus_per_node
cfg.distributed_world_size = nnodes * gpus_per_node
elif cfg.pipeline_model_parallel:
assert ntasks_per_node == num_pipelines_per_node, (
"SLURM --ntasks-per-node must match number of pipelines per "
"node (={})".format(num_pipelines_per_node)
)
cfg.distributed_no_spawn = True
# For 4-way MP on nodes with 8 GPUs, ranks will be [0, 1] on
# the first node, [1, 2] on the second node, etc. This
# matches torch.distributed.launch.
node_id = int(os.environ.get("SLURM_NODEID"))
local_id = int(os.environ.get("SLURM_LOCALID"))
cfg.distributed_rank = node_id * num_pipelines_per_node + local_id
# In the above example, device_id will always be in [0, 1],
# which also matches torch.distributed.launch.
cfg.device_id = local_id
# We also want to set distributed_world_size to be the total
# number of pipelines across all nodes.
cfg.distributed_world_size = nnodes * num_pipelines_per_node
else:
assert ntasks_per_node == cfg.distributed_world_size // nnodes
cfg.distributed_no_spawn = True
cfg.distributed_rank = int(os.environ.get("SLURM_PROCID"))
cfg.device_id = int(os.environ.get("SLURM_LOCALID"))
except subprocess.CalledProcessError as e: # scontrol failed
raise e
except FileNotFoundError: # Slurm is not installed
pass
def _infer_single_node_init(cfg: DistributedTrainingConfig):
assert (
cfg.distributed_world_size <= torch.cuda.device_count()
), f"world size is {cfg.distributed_world_size} but have {torch.cuda.device_count()} available devices"
port = random.randint(10000, 20000)
cfg.distributed_init_method = "tcp://localhost:{port}".format(port=port)
def _pipeline_parallel_pre_init(cfg: DistributedTrainingConfig):
from fairseq import utils
balance_exists = (
cfg.pipeline_balance is not None
or cfg.pipeline_encoder_balance is not None
or cfg.pipeline_decoder_balance is not None
)
devices_exist = (
cfg.pipeline_devices is not None
or cfg.pipeline_encoder_devices is not None
or cfg.pipeline_decoder_devices is not None
)
if not balance_exists:
raise ValueError(
"--pipeline-balance is currently required for pipeline model parallelism"
)
if not devices_exist:
raise ValueError(
"--pipeline-devices is currently required for pipeline model parallelism"
)
cfg.pipeline_balance = utils.eval_str_list(cfg.pipeline_balance, type=int)
if cfg.pipeline_devices is not None:
cfg.pipeline_devices = utils.eval_str_list(cfg.pipeline_devices, type=int)
num_pipeline_devices = len(set(cfg.pipeline_devices))
else:
cfg.pipeline_encoder_devices = utils.eval_str_list(
cfg.pipeline_encoder_devices, type=int
)
cfg.pipeline_decoder_devices = utils.eval_str_list(
cfg.pipeline_decoder_devices, type=int
)
num_pipeline_devices = len(
set(cfg.pipeline_encoder_devices + cfg.pipeline_decoder_devices)
)
gpus_per_node = torch.cuda.device_count()
assert (
gpus_per_node >= num_pipeline_devices
and gpus_per_node % num_pipeline_devices == 0
), (
"the number of unique device IDs in --pipeline-devices must evenly divide "
"the number of GPUs per node (multi-node pipelining is not yet supported)"
)
num_pipelines_per_node = gpus_per_node // num_pipeline_devices
return num_pipeline_devices, num_pipelines_per_node
def _pipeline_parallel_post_init(
cfg: DistributedTrainingConfig, num_pipeline_devices, num_pipelines_per_node
):
if not cfg.distributed_no_spawn:
# When distributed_no_spawn is False, we expect distributed_rank and
# distributed_world_size to be based on the total number of GPUs, so
# we need to correct them to be based on the number of pipelines.
assert cfg.distributed_world_size % num_pipeline_devices == 0
cfg.distributed_world_size = (
cfg.distributed_world_size // num_pipeline_devices
)
# In the case of 4-way MP on nodes with 8 GPUs, we want
# distributed_rank to be the starting GPU index for each pipeline
# i.e., 0, 2, ...
gpus_per_node = torch.cuda.device_count()
assert cfg.distributed_rank % gpus_per_node == 0
assert cfg.distributed_rank % num_pipeline_devices == 0
with open_dict(cfg):
cfg.distributed_rank = cfg.distributed_rank // num_pipeline_devices
# launch one process per pipeline
cfg.distributed_num_procs = num_pipelines_per_node
# if we have 4-way MP on a node with 8 GPUs, we want device_ids to be 0
# and 4, indicating the starting device IDs for each pipeline
cfg.device_id *= num_pipeline_devices
if cfg.device_id > 0:
# if there's multiple pipelines on a node (e.g., 4-way MP on an 8
# GPU node), we need to adjust pipeline_devices accordingly
logger.debug(
"setting CUDA device={} on rank {}".format(
cfg.device_id, cfg.distributed_rank
)
)
torch.cuda.set_device(cfg.device_id)
with open_dict(cfg):
cfg.pipeline_devices = [cfg.device_id + d for d in cfg.pipeline_devices]
logger.info(
"setting pipeline_devices={} on rank {}".format(
cfg.pipeline_devices, cfg.distributed_rank
)
)
def distributed_init(cfg: FairseqConfig):
if isinstance(cfg, Namespace):
from fairseq.dataclass.utils import convert_namespace_to_omegaconf
cfg = convert_namespace_to_omegaconf(cfg)
if not cfg.common.tpu:
if torch.distributed.is_available() and torch.distributed.is_initialized():
warnings.warn(
"Distributed is already initialized, cannot initialize twice!"
)
else:
logger.info(
"distributed init (rank {}): {}".format(
cfg.distributed_training.distributed_rank,
cfg.distributed_training.distributed_init_method,
)
)
logger.info('Start init')
max_time_wait = 600
for i in range(max_time_wait):
try:
dist.init_process_group(
backend=cfg.distributed_training.distributed_backend,
init_method=cfg.distributed_training.distributed_init_method,
world_size=cfg.distributed_training.distributed_world_size,
rank=cfg.distributed_training.distributed_rank,
)
logger.info(
"initialized host {} as rank {}".format(
socket.gethostname(),
cfg.distributed_training.distributed_rank,
)
)
if torch.distributed.is_initialized():
print("single-machine distributed training is initialized.")
break
except ValueError:
# This is caused by TCPStore failure.
print('Retry: {}, with value error {}'.format(
i + 1, sys.exc_info()[0]))
time.sleep(5)
if i == max_time_wait - 1:
print('k8s resource wait too long time')
exit(-1)
except Exception:
print('Retry: {}, with value error {}'.format(
i + 1, sys.exc_info()[0]))
exit(-1)
# perform a dummy all-reduce to initialize the NCCL communicator
if torch.cuda.is_available():
dist.all_reduce(torch.zeros(1).cuda())
cfg.distributed_training.distributed_rank = torch.distributed.get_rank()
else:
assert xm.xrt_world_size() == cfg.distributed_training.distributed_world_size
global _USE_XLA
_USE_XLA = True
cfg.distributed_training.device_id = xm.get_local_ordinal()
cfg.distributed_training.distributed_rank = xm.get_ordinal()
xm.rendezvous("distributed_init") # wait for all workers
if is_master(cfg.distributed_training):
logging.getLogger().setLevel(logging.INFO)
else:
logging.getLogger().setLevel(logging.WARNING)
if cfg.common.model_parallel_size > 1:
try:
from fairseq.model_parallel.megatron.mpu import (
initialize_model_parallel,
model_parallel_cuda_manual_seed,
)
except ImportError:
raise ImportError(
"\n\nPlease install the megatron submodule:"
"\n\n git submodule update --init "
"fairseq/model_parallel/megatron"
)
global _USE_MEGATRON
_USE_MEGATRON = True
initialize_model_parallel(cfg.common.model_parallel_size)
model_parallel_cuda_manual_seed(cfg.common.seed)
model_part_number = get_model_parallel_rank()
cfg.checkpoint.checkpoint_suffix += "-model_part-{0}".format(model_part_number)
if hasattr(cfg, "model") and getattr(cfg.model, "base_layers", 0) > 0:
cfg.checkpoint.checkpoint_suffix = f"-rank-{cfg.distributed_training.distributed_rank}"
return cfg.distributed_training.distributed_rank
def distributed_main(i, main, cfg: FairseqConfig, kwargs):
cfg.distributed_training.device_id = i
if torch.cuda.is_available() and not cfg.common.cpu and not cfg.common.tpu:
torch.cuda.set_device(cfg.distributed_training.device_id)
if cfg.distributed_training.distributed_rank is None: # torch.multiprocessing.spawn
cfg.distributed_training.distributed_rank = kwargs.pop("start_rank", 0) + i
cfg.distributed_training.distributed_rank = distributed_init(cfg)
after_distributed_init_fn = kwargs.pop("after_distributed_init_fn", None)
if after_distributed_init_fn:
cfg = after_distributed_init_fn(cfg)
main(cfg, **kwargs)
if torch.distributed.is_initialized():
torch.distributed.barrier(get_global_group())
def call_main(cfg: FairseqConfig, main, **kwargs):
if cfg.distributed_training.distributed_init_method is None:
infer_init_method(cfg.distributed_training)
if cfg.distributed_training.distributed_init_method is not None:
# distributed training
if not cfg.distributed_training.distributed_no_spawn:
start_rank = cfg.distributed_training.distributed_rank
cfg.distributed_training.distributed_rank = None # assign automatically
kwargs["start_rank"] = start_rank
torch.multiprocessing.spawn(
fn=distributed_main,
args=(main, cfg, kwargs),
nprocs=min(
torch.cuda.device_count(),
cfg.distributed_training.distributed_world_size,
),
join=True,
)
else:
distributed_main(cfg.distributed_training.device_id, main, cfg, kwargs)
elif cfg.common.tpu and cfg.distributed_training.distributed_world_size > 1:
import torch_xla.distributed.xla_multiprocessing as xmp
torch.multiprocessing.set_sharing_strategy("file_system")
xmp.spawn(
fn=distributed_main,
args=(main, cfg, kwargs),
# tpu-comment:
# 8 devices in one TPU VM, is the max processes to be spawned.
# The rest is driven by xm.distributed.xla_dist
nprocs=min(cfg.distributed_training.distributed_world_size, 8),
)
else:
# single GPU main
main(cfg, **kwargs)
def use_xla():
global _USE_XLA
return _USE_XLA
def new_groups(grouped_ranks: List[List[int]]):
if use_xla():
return ("tpu", grouped_ranks)
else:
groups = [dist.new_group(g) for g in grouped_ranks]
my_group_idx = _find_my_group_index(grouped_ranks)
return groups[my_group_idx]
def _find_my_group_index(grouped_ranks):
my_rank = get_global_rank()
for i, group in enumerate(grouped_ranks):
if my_rank in group:
return i
raise RuntimeError
def _find_my_group(grouped_ranks):
index = _find_my_group_index(grouped_ranks)
return grouped_ranks[index]
def get_rank(group):
if use_xla():
assert group[0] == "tpu"
my_group = _find_my_group(group[1])
return my_group.index(get_global_rank())
else:
return dist.get_rank(group=group)
def get_world_size(group):
if use_xla():
assert group[0] == "tpu"
my_group = _find_my_group(group[1])
return len(my_group)
elif torch.distributed.is_initialized():
return dist.get_world_size(group=group)
else:
return 1
def get_global_group():
if use_xla():
return new_groups([list(range(get_global_world_size()))])
elif torch.distributed.is_initialized():
if not hasattr(get_global_group, "_global_group"):
# ideally we could use torch.distributed.group.WORLD, but it seems
# to cause random NCCL hangs in some cases
get_global_group._global_group = dist.new_group()
return get_global_group._global_group
else:
return None
def get_global_rank():
if use_xla():
return xm.get_ordinal()
elif torch.distributed.is_initialized():
return torch.distributed.get_rank()
else:
return 0
def get_global_world_size():
if use_xla():
return xm.xrt_world_size()
elif torch.distributed.is_initialized():
return torch.distributed.get_world_size()
else:
return 1
def get_data_parallel_group():
"""Get the data parallel group the caller rank belongs to."""
global _USE_MEGATRON
if _USE_MEGATRON:
from fairseq.model_parallel.megatron import mpu
return mpu.get_data_parallel_group()
else:
return get_global_group()
def get_data_parallel_rank():
"""Return my rank for the data parallel group."""
return get_rank(get_data_parallel_group())
def get_data_parallel_world_size():
"""Return world size for the data parallel group."""
return get_world_size(get_data_parallel_group())
def get_model_parallel_group():
global _USE_MEGATRON
if _USE_MEGATRON:
from fairseq.model_parallel.megatron import mpu
return mpu.get_model_parallel_group()
else:
return None
def get_model_parallel_rank():
"""Return my rank for the model parallel group."""
return get_rank(get_model_parallel_group())
def get_model_parallel_world_size():
"""Return world size for the model parallel group."""
return get_world_size(get_model_parallel_group())
def all_reduce(tensor, group, op="sum"):
if use_xla():
assert isinstance(group, tuple) and group[0] == "tpu"
tensor = [tensor] # wrap in a list to make xm.all_reduce in-place
return xm.all_reduce(op, tensor, groups=group[1])[0]
else:
if op == "sum":
op = dist.ReduceOp.SUM
elif op == "max":
op = dist.ReduceOp.MAX
else:
raise NotImplementedError
dist.all_reduce(tensor, op=op, group=group)
return tensor
def broadcast(tensor, src, group):
if use_xla():
# XLA doesn't support broadcast, hack it with all_reduce
if get_rank(group) != src:
tensor.zero_()
all_reduce(tensor, group)
else:
dist.broadcast(tensor, src=src, group=group)
def all_to_all(tensor, group):
"""Perform an all-to-all operation on a 1D Tensor."""
assert tensor.dim() == 1
split_count = get_world_size(group=group)
assert tensor.numel() % split_count == 0
if use_xla():
assert isinstance(group, tuple) and group[0] == "tpu"
return xm.all_to_all(
tensor,
split_dimension=0,
concat_dimension=0,
split_count=split_count,
groups=group[1],
)
else:
output = torch.zeros_like(tensor)
dist.all_to_all_single(output, tensor, group=group)
return output
def all_gather(tensor, group, return_tensor=False):
"""Perform an all-gather operation."""
if use_xla():
result = xm.all_gather(tensor, groups=group[1])
world_size = get_world_size(group=group)
result = result.view(world_size, *tensor.size())
if return_tensor:
return result
else:
return [result[i] for i in range(world_size)]
else:
world_size = get_world_size(group=group)
rank = get_rank(group=group)
tensor_list = [
tensor if i == rank else torch.empty_like(tensor) for i in range(world_size)
]
dist.all_gather(tensor_list, tensor, group=group)
if return_tensor:
return torch.stack(tensor_list, dim=0)
else:
return tensor_list
def all_gather_list(data, group=None, max_size=16384):
"""Gathers arbitrary data from all nodes into a list.
Similar to :func:`~torch.distributed.all_gather` but for arbitrary Python
data. Note that *data* must be picklable and any CUDA tensors will be moved
to CPU and returned on CPU as well.
Args:
data (Any): data from the local worker to be gathered on other workers
group: group of the collective
max_size (int, optional): maximum size of the data to be gathered
across workers
"""
from fairseq import utils
if group is None:
group = get_global_group()
torch.distributed.barrier(group=group)
rank = get_rank(group=group)
world_size = get_world_size(group=group)
buffer_size = max_size * world_size
if (
not hasattr(all_gather_list, "_buffer")
or all_gather_list._buffer.numel() < buffer_size
):
all_gather_list._buffer = torch.cuda.ByteTensor(buffer_size)
all_gather_list._cpu_buffer = torch.ByteTensor(max_size).pin_memory()
buffer = all_gather_list._buffer
buffer.zero_()
cpu_buffer = all_gather_list._cpu_buffer
data = utils.move_to_cpu(data)
enc = pickle.dumps(data)
enc_size = len(enc)
header_size = 4 # size of header that contains the length of the encoded data
size = header_size + enc_size
if size > max_size:
raise ValueError(
"encoded data size ({}) exceeds max_size ({})".format(size, max_size)
)
header = struct.pack(">I", enc_size)
cpu_buffer[:size] = torch.ByteTensor(list(header + enc))
start = rank * max_size
buffer[start : start + size].copy_(cpu_buffer[:size])
all_reduce(buffer, group=group)
buffer = buffer.cpu()
try:
result = []
for i in range(world_size):
out_buffer = buffer[i * max_size : (i + 1) * max_size]
(enc_size,) = struct.unpack(">I", bytes(out_buffer[:header_size].tolist()))
if enc_size > 0:
result.append(
pickle.loads(
bytes(out_buffer[header_size : header_size + enc_size].tolist())
)
)
return result
except pickle.UnpicklingError:
raise Exception(
"Unable to unpickle data from other workers. all_gather_list requires all "
"workers to enter the function together, so this error usually indicates "
"that the workers have fallen out of sync somehow. Workers can fall out of "
"sync if one of them runs out of memory, or if there are other conditions "
"in your training script that can cause one worker to finish an epoch "
"while other workers are still iterating over their portions of the data. "
"Try rerunning with --ddp-backend=legacy_ddp and see if that helps."
)
def all_reduce_dict(data: Mapping[str, Any], device, group) -> Dict[str, Any]:
"""
AllReduce a dictionary of values across workers. We separately
reduce items that are already on the device and items on CPU for
better performance.
Args:
data (Mapping[str, Any]): dictionary of data to all-reduce, but
cannot be a nested dictionary
device (torch.device): device for the reduction
group: group of the collective
"""
data_keys = list(data.keys())
# We want to separately reduce items that are already on the
# device and items on CPU for performance reasons.
cpu_data = OrderedDict()
device_data = OrderedDict()
for k in data_keys:
t = data[k]
if not torch.is_tensor(t):
cpu_data[k] = torch.tensor(t, dtype=torch.double)
elif t.device.type != device.type:
cpu_data[k] = t.to(dtype=torch.double)
else:
device_data[k] = t.to(dtype=torch.double)
def _all_reduce_dict(data: OrderedDict):
if len(data) == 0:
return data
buf = torch.cat([t.view(-1) for t in data.values()]).to(device=device)
all_reduce(buf, group=group)
split_buf = torch.split(buf, [t.numel() for t in data.values()])
reduced_data = [t.view_as(orig) for t, orig in zip(split_buf, data.values())]
return OrderedDict(zip(data.keys(), reduced_data))
cpu_data = _all_reduce_dict(cpu_data)
device_data = _all_reduce_dict(device_data)
def get_from_stack(key):
if key in cpu_data:
return cpu_data[key]
elif key in device_data:
return device_data[key]
raise KeyError
return OrderedDict([(key, get_from_stack(key)) for key in data_keys])
def broadcast_tensors(
tensors: Optional[List[torch.Tensor]],
src_rank: int,
group: object,
dist_device: Optional[torch.device] = None,
) -> List[torch.Tensor]:
"""
Broadcasts a list of tensors without other (non-src) ranks needing to know
the dtypes/shapes of the tensors.
"""
if dist_device is None:
if torch.distributed.get_backend(group) == "nccl":
dist_device = torch.device("cuda")
else:
dist_device = torch.device("cpu")
# share metadata first to simplify transfer
is_src_rank = (get_rank(group) == src_rank)
if is_src_rank:
metadata = [
{"size": t.size(), "dtype": t.dtype, "device": t.device} for t in tensors
]
metadata = _broadcast_object_slow(metadata, src_rank, group, dist_device)
else:
metadata = _broadcast_object_slow(None, src_rank, group, dist_device)
out_tensors = []
for i, meta in enumerate(metadata):
if is_src_rank:
tensor = tensors[i]
broadcast(tensors[i].to(dist_device), src=src_rank, group=group)
else:
tensor = torch.zeros(
[meta["size"].numel()], dtype=meta["dtype"], device=dist_device
)
broadcast(tensor, src=src_rank, group=group)
tensor = tensor.view(meta["size"]).to(meta["device"])
out_tensors.append(tensor)
return out_tensors
def broadcast_object(
obj: Any,
src_rank: int,
group: object,
dist_device: Optional[torch.device] = None,
) -> Any:
"""Broadcast an arbitrary Python object to other workers."""
if dist_device is None:
if torch.distributed.get_backend(group) == "nccl":
dist_device = torch.device("cuda")
else:
dist_device = torch.device("cpu")
if get_rank(group) == src_rank:
# split the tensors from the non-tensors so we can broadcast them
# directly, avoiding unnecessary serialization/deserialization
tensors = []
obj = _split_tensors_from_obj(obj, tensors)
obj = _broadcast_object_slow(obj, src_rank, group, dist_device)
tensors = broadcast_tensors(tensors, src_rank, group, dist_device)
else:
obj = _broadcast_object_slow(None, src_rank, group, dist_device)
tensors = broadcast_tensors(None, src_rank, group, dist_device)
return _put_tensors_in_obj(obj, tensors)
def _broadcast_object_slow(
obj: Any, src_rank: int, group: object, dist_device: torch.device,
) -> Any:
if get_rank(group) == src_rank:
# Emit data
buffer = io.BytesIO()
torch.save(obj, buffer)
buffer = torch.ByteTensor(buffer.getbuffer()).to(dist_device)
length = torch.LongTensor([len(buffer)]).to(dist_device)
broadcast(length, src=src_rank, group=group)
broadcast(buffer, src=src_rank, group=group)
else:
# Fetch from the source
length = torch.LongTensor([0]).to(dist_device)
broadcast(length, src=src_rank, group=group)
buffer = torch.ByteTensor(int(length.item())).to(dist_device)
broadcast(buffer, src=src_rank, group=group)
buffer = io.BytesIO(buffer.cpu().numpy())
obj = torch.load(buffer, map_location="cpu")
return obj
@dataclass(frozen=True)
class _TensorPlaceholder:
index: int
def _split_tensors_from_obj(obj: Any, tensors: List[torch.Tensor]) -> Any:
if torch.is_tensor(obj):
placeholder = _TensorPlaceholder(index=len(tensors))
tensors.append(obj)
return placeholder
elif isinstance(obj, dict):
return {k: _split_tensors_from_obj(v, tensors) for k, v in obj.items()}
elif isinstance(obj, list):
return [_split_tensors_from_obj(v, tensors) for v in obj]
elif isinstance(obj, tuple):
return tuple(_split_tensors_from_obj(v, tensors) for v in obj)
elif isinstance(obj, set):
return {_split_tensors_from_obj(v, tensors) for v in obj}
else:
return obj
def _put_tensors_in_obj(obj: Any, tensors: List[torch.Tensor]) -> Any:
if isinstance(obj, _TensorPlaceholder):
return tensors[obj.index]
elif isinstance(obj, dict):
return {k: _put_tensors_in_obj(v, tensors) for k, v in obj.items()}
elif isinstance(obj, list):
return [_put_tensors_in_obj(v, tensors) for v in obj]
elif isinstance(obj, tuple):
return tuple(_put_tensors_in_obj(v, tensors) for v in obj)
elif isinstance(obj, set):
return {_put_tensors_in_obj(v, tensors) for v in obj}
else:
return obj