|
|
|
|
|
"""Distributed helpers.""" |
|
|
|
import torch |
|
import torch.distributed as dist |
|
|
|
_LOCAL_PROCESS_GROUP = None |
|
|
|
|
|
def all_reduce(tensors, average=True): |
|
""" |
|
All reduce the provided tensors from all processes across machines. |
|
Args: |
|
tensors (list): tensors to perform all reduce across all processes in |
|
all machines. |
|
average (bool): scales the reduced tensor by the number of overall |
|
processes across all machines. |
|
""" |
|
|
|
for tensor in tensors: |
|
dist.all_reduce(tensor, async_op=False) |
|
if average: |
|
world_size = dist.get_world_size() |
|
for tensor in tensors: |
|
tensor.mul_(1.0 / world_size) |
|
return tensors |
|
|
|
|
|
def is_master_proc(num_gpus=8): |
|
""" |
|
Determines if the current process is the master process. |
|
""" |
|
if torch.distributed.is_initialized(): |
|
return dist.get_rank() % num_gpus == 0 |
|
else: |
|
return True |
|
|
|
|
|
def get_world_size(): |
|
""" |
|
Get the size of the world. |
|
""" |
|
if not dist.is_available(): |
|
return 1 |
|
if not dist.is_initialized(): |
|
return 1 |
|
return dist.get_world_size() |
|
|
|
|
|
def init_distributed_training(cfg): |
|
""" |
|
Initialize variables needed for distributed training. |
|
""" |
|
if cfg.NUM_GPUS <= 1: |
|
return |
|
num_gpus_per_machine = cfg.NUM_GPUS |
|
num_machines = dist.get_world_size() // num_gpus_per_machine |
|
for i in range(num_machines): |
|
ranks_on_i = list( |
|
range(i * num_gpus_per_machine, (i + 1) * num_gpus_per_machine) |
|
) |
|
pg = dist.new_group(ranks_on_i) |
|
if i == cfg.SHARD_ID: |
|
global _LOCAL_PROCESS_GROUP |
|
_LOCAL_PROCESS_GROUP = pg |
|
|