import logging from datetime import timedelta import torch import torch.distributed as dist import torch.multiprocessing as mp from detectron2.utils import comm import deepspeed import os __all__ = ["DEFAULT_TIMEOUT", "launch_deepspeed", "launch_deepspeed_multinodes"] DEFAULT_TIMEOUT = timedelta(minutes=30) def _find_free_port(): import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Binding to port 0 will cause the OS to find an available port for us sock.bind(("", 0)) port = sock.getsockname()[1] sock.close() # NOTE: there is still a chance the port could be taken by other processes. return port def launch_deepspeed( main_func, num_gpus_per_machine, num_machines=1, machine_rank=0, dist_url=None, args=(), timeout=DEFAULT_TIMEOUT, ): """ Modified by Jialian Wu from https://github.com/facebookresearch/detectron2/blob/main/detectron2/engine/launch.py Launch multi-gpu or distributed training. This function must be called on all machines involved in the training. It will spawn child processes (defined by ``num_gpus_per_machine``) on each machine. Args: main_func: a function that will be called by `main_func(*args)` num_gpus_per_machine (int): number of GPUs per machine num_machines (int): the total number of machines machine_rank (int): the rank of this machine dist_url (str): url to connect to for distributed jobs, including protocol e.g. "tcp://127.0.0.1:8686". Can be set to "auto" to automatically select a free port on localhost timeout (timedelta): timeout of the distributed workers args (tuple): arguments passed to main_func """ world_size = num_machines * num_gpus_per_machine if world_size > 1: if dist_url == "auto": assert num_machines == 1, "dist_url=auto not supported in multi-machine jobs." port = _find_free_port() dist_url = f"tcp://127.0.0.1:{port}" if num_machines > 1 and dist_url.startswith("file://"): logger = logging.getLogger(__name__) logger.warning( "file:// is not a reliable init_method in multi-machine jobs. Prefer tcp://" ) mp.spawn( _distributed_worker, nprocs=num_gpus_per_machine, args=( main_func, world_size, num_gpus_per_machine, machine_rank, dist_url, args, timeout, ), daemon=False, ) else: main_func(*args) def _distributed_worker( local_rank, main_func, world_size, num_gpus_per_machine, machine_rank, dist_url, args, timeout=DEFAULT_TIMEOUT, ): ''' Modified by Jialian Wu from https://github.com/facebookresearch/detectron2/blob/main/detectron2/engine/launch.py Adaptation for deepspeed ''' assert torch.cuda.is_available(), "cuda is not available. Please check your installation." global_rank = machine_rank * num_gpus_per_machine + local_rank assert dist_url.startswith('tcp://') master_address = dist_url.split('tcp://')[1].split(':')[0] master_port = dist_url.split('tcp://')[1].split(':')[1] os.environ['RANK'] = str(global_rank) os.environ['LOCAL_RANK'] = str(local_rank) os.environ['WORLD_SIZE'] = str(world_size) os.environ['MASTER_ADDR'] = master_address os.environ['MASTER_PORT'] = master_port try: deepspeed.init_distributed() except Exception as e: logger = logging.getLogger(__name__) logger.error("Process group URL: {}".format(dist_url)) raise e # Setup the local process group (which contains ranks within the same machine) assert comm._LOCAL_PROCESS_GROUP is None num_machines = world_size // num_gpus_per_machine for i in range(num_machines): ranks_on_i = list(range(i * num_gpus_per_machine, (i + 1) * num_gpus_per_machine)) pg = dist.new_group(ranks_on_i) if i == machine_rank: comm._LOCAL_PROCESS_GROUP = pg assert num_gpus_per_machine <= torch.cuda.device_count() torch.cuda.set_device(local_rank) # synchronize is needed here to prevent a possible timeout after calling init_process_group # See: https://github.com/facebookresearch/maskrcnn-benchmark/issues/172 comm.synchronize() main_func(*args) def get_mpi_rank(): if 'RANK' in os.environ: return int(os.environ['RANK']) return int(os.environ.get('OMPI_COMM_WORLD_RANK', '0')) def get_mpi_size(): if 'WORLD_SIZE' in os.environ: return int(os.environ['WORLD_SIZE']) return int(os.environ.get('OMPI_COMM_WORLD_SIZE', '1')) def get_mpi_local_rank(): if 'LOCAL_RANK' in os.environ: return int(os.environ['LOCAL_RANK']) return int(os.environ.get('OMPI_COMM_WORLD_LOCAL_RANK', '0')) def launch_deepspeed_multinodes( main_func, dist_url=None, args=(), ): """ Launch multi-node training via deepspeed. """ assert torch.cuda.is_available(), "cuda is not available. Please check your installation." assert dist_url.startswith('tcp://') master_address = dist_url.split('tcp://')[1].split(':')[0] master_port = dist_url.split('tcp://')[1].split(':')[1] os.environ['RANK'] = str(get_mpi_rank()) os.environ['LOCAL_RANK'] = str(get_mpi_local_rank()) os.environ['WORLD_SIZE'] = str(get_mpi_size()) os.environ['MASTER_ADDR'] = master_address os.environ['MASTER_PORT'] = master_port try: deepspeed.init_distributed() except Exception as e: logger = logging.getLogger(__name__) logger.error("Process group URL: {}".format(dist_url)) raise e torch.cuda.set_device(get_mpi_local_rank()) comm.synchronize() main_func(*args)