File size: 1,695 Bytes
2571f24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
#!/usr/bin/env python3
"""Distributed helpers."""
import torch
import torch.distributed as dist
_LOCAL_PROCESS_GROUP = None
def all_reduce(tensors, average=True):
"""
All reduce the provided tensors from all processes across machines.
Args:
tensors (list): tensors to perform all reduce across all processes in
all machines.
average (bool): scales the reduced tensor by the number of overall
processes across all machines.
"""
for tensor in tensors:
dist.all_reduce(tensor, async_op=False)
if average:
world_size = dist.get_world_size()
for tensor in tensors:
tensor.mul_(1.0 / world_size)
return tensors
def is_master_proc(num_gpus=8):
"""
Determines if the current process is the master process.
"""
if torch.distributed.is_initialized():
return dist.get_rank() % num_gpus == 0
else:
return True
def get_world_size():
"""
Get the size of the world.
"""
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()
def init_distributed_training(cfg):
"""
Initialize variables needed for distributed training.
"""
if cfg.NUM_GPUS <= 1:
return
num_gpus_per_machine = cfg.NUM_GPUS
num_machines = dist.get_world_size() // num_gpus_per_machine
for i in range(num_machines):
ranks_on_i = list(
range(i * num_gpus_per_machine, (i + 1) * num_gpus_per_machine)
)
pg = dist.new_group(ranks_on_i)
if i == cfg.SHARD_ID:
global _LOCAL_PROCESS_GROUP
_LOCAL_PROCESS_GROUP = pg
|