bytetrack / yolox /core /launch.py
AK391
all files
7734d5b
raw
history blame
7.04 kB
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Code are based on
# https://github.com/facebookresearch/detectron2/blob/master/detectron2/engine/launch.py
# Copyright (c) Facebook, Inc. and its affiliates.
# Copyright (c) Megvii, Inc. and its affiliates.
from loguru import logger
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import yolox.utils.dist as comm
from yolox.utils import configure_nccl
import os
import subprocess
import sys
import time
__all__ = ["launch"]
def _find_free_port():
"""
Find an available port of current machine / node.
"""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Binding to port 0 will cause the OS to find an available port for us
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
# NOTE: there is still a chance the port could be taken by other processes.
return port
def launch(
main_func,
num_gpus_per_machine,
num_machines=1,
machine_rank=0,
backend="nccl",
dist_url=None,
args=(),
):
"""
Args:
main_func: a function that will be called by `main_func(*args)`
num_machines (int): the total number of machines
machine_rank (int): the rank of this machine (one per machine)
dist_url (str): url to connect to for distributed training, including protocol
e.g. "tcp://127.0.0.1:8686".
Can be set to auto to automatically select a free port on localhost
args (tuple): arguments passed to main_func
"""
world_size = num_machines * num_gpus_per_machine
if world_size > 1:
if int(os.environ.get("WORLD_SIZE", "1")) > 1:
dist_url = "{}:{}".format(
os.environ.get("MASTER_ADDR", None),
os.environ.get("MASTER_PORT", "None"),
)
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
world_size = int(os.environ.get("WORLD_SIZE", "1"))
_distributed_worker(
local_rank,
main_func,
world_size,
num_gpus_per_machine,
num_machines,
machine_rank,
backend,
dist_url,
args,
)
exit()
launch_by_subprocess(
sys.argv,
world_size,
num_machines,
machine_rank,
num_gpus_per_machine,
dist_url,
args,
)
else:
main_func(*args)
def launch_by_subprocess(
raw_argv,
world_size,
num_machines,
machine_rank,
num_gpus_per_machine,
dist_url,
args,
):
assert (
world_size > 1
), "subprocess mode doesn't support single GPU, use spawn mode instead"
if dist_url is None:
# ------------------------hack for multi-machine training -------------------- #
if num_machines > 1:
master_ip = subprocess.check_output(["hostname", "--fqdn"]).decode("utf-8")
master_ip = str(master_ip).strip()
dist_url = "tcp://{}".format(master_ip)
ip_add_file = "./" + args[1].experiment_name + "_ip_add.txt"
if machine_rank == 0:
port = _find_free_port()
with open(ip_add_file, "w") as ip_add:
ip_add.write(dist_url+'\n')
ip_add.write(str(port))
else:
while not os.path.exists(ip_add_file):
time.sleep(0.5)
with open(ip_add_file, "r") as ip_add:
dist_url = ip_add.readline().strip()
port = ip_add.readline()
else:
dist_url = "tcp://127.0.0.1"
port = _find_free_port()
# set PyTorch distributed related environmental variables
current_env = os.environ.copy()
current_env["MASTER_ADDR"] = dist_url
current_env["MASTER_PORT"] = str(port)
current_env["WORLD_SIZE"] = str(world_size)
assert num_gpus_per_machine <= torch.cuda.device_count()
if "OMP_NUM_THREADS" not in os.environ and num_gpus_per_machine > 1:
current_env["OMP_NUM_THREADS"] = str(1)
logger.info(
"\n*****************************************\n"
"Setting OMP_NUM_THREADS environment variable for each process "
"to be {} in default, to avoid your system being overloaded, "
"please further tune the variable for optimal performance in "
"your application as needed. \n"
"*****************************************".format(
current_env["OMP_NUM_THREADS"]
)
)
processes = []
for local_rank in range(0, num_gpus_per_machine):
# each process's rank
dist_rank = machine_rank * num_gpus_per_machine + local_rank
current_env["RANK"] = str(dist_rank)
current_env["LOCAL_RANK"] = str(local_rank)
# spawn the processes
cmd = ["python3", *raw_argv]
process = subprocess.Popen(cmd, env=current_env)
processes.append(process)
for process in processes:
process.wait()
if process.returncode != 0:
raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd)
def _distributed_worker(
local_rank,
main_func,
world_size,
num_gpus_per_machine,
num_machines,
machine_rank,
backend,
dist_url,
args,
):
assert (
torch.cuda.is_available()
), "cuda is not available. Please check your installation."
configure_nccl()
global_rank = machine_rank * num_gpus_per_machine + local_rank
logger.info("Rank {} initialization finished.".format(global_rank))
try:
dist.init_process_group(
backend=backend,
init_method=dist_url,
world_size=world_size,
rank=global_rank,
)
except Exception:
logger.error("Process group URL: {}".format(dist_url))
raise
# synchronize is needed here to prevent a possible timeout after calling init_process_group
# See: https://github.com/facebookresearch/maskrcnn-benchmark/issues/172
comm.synchronize()
if global_rank == 0 and os.path.exists(
"./" + args[1].experiment_name + "_ip_add.txt"
):
os.remove("./" + args[1].experiment_name + "_ip_add.txt")
assert num_gpus_per_machine <= torch.cuda.device_count()
torch.cuda.set_device(local_rank)
args[1].local_rank = local_rank
args[1].num_machines = num_machines
# Setup the local process group (which contains ranks within the same machine)
# assert comm._LOCAL_PROCESS_GROUP is None
# num_machines = world_size // num_gpus_per_machine
# for i in range(num_machines):
# ranks_on_i = list(range(i * num_gpus_per_machine, (i + 1) * num_gpus_per_machine))
# pg = dist.new_group(ranks_on_i)
# if i == machine_rank:
# comm._LOCAL_PROCESS_GROUP = pg
main_func(*args)