|
|
|
|
|
import os |
|
|
from typing import Optional, Tuple |
|
|
|
|
|
import torch |
|
|
import torch.distributed as dist |
|
|
from transformers.utils import strtobool |
|
|
|
|
|
from .logger import get_logger |
|
|
|
|
|
logger = get_logger() |
|
|
|
|
|
|
|
|
def use_hf_hub(): |
|
|
return strtobool(os.environ.get('USE_HF', '0')) |
|
|
|
|
|
|
|
|
def is_deepspeed_enabled(): |
|
|
return strtobool(os.environ.get('ACCELERATE_USE_DEEPSPEED', '0')) |
|
|
|
|
|
|
|
|
def use_torchacc() -> bool: |
|
|
return strtobool(os.getenv('USE_TORCHACC', '0')) |
|
|
|
|
|
|
|
|
def get_dist_setting() -> Tuple[int, int, int, int]: |
|
|
"""return rank, local_rank, world_size, local_world_size""" |
|
|
rank = int(os.getenv('RANK', -1)) |
|
|
local_rank = int(os.getenv('LOCAL_RANK', -1)) |
|
|
world_size = int(os.getenv('WORLD_SIZE') or os.getenv('_PATCH_WORLD_SIZE') or 1) |
|
|
|
|
|
local_world_size = int(os.getenv('LOCAL_WORLD_SIZE', None) or os.getenv('LOCAL_SIZE', 1)) |
|
|
return rank, local_rank, world_size, local_world_size |
|
|
|
|
|
|
|
|
def get_node_setting(): |
|
|
node_rank = int(os.getenv('NODE_RANK', 0)) |
|
|
nnodes = int(os.getenv('NNODES', 1)) |
|
|
return node_rank, nnodes |
|
|
|
|
|
|
|
|
def is_local_master(): |
|
|
local_rank = get_dist_setting()[1] |
|
|
return local_rank in {-1, 0} |
|
|
|
|
|
|
|
|
def is_master(): |
|
|
rank = get_dist_setting()[0] |
|
|
return rank in {-1, 0} |
|
|
|
|
|
|
|
|
def torchacc_trim_graph(): |
|
|
return strtobool(os.getenv('TORCHACC_TRIM_GRAPH', '0')) |
|
|
|
|
|
|
|
|
def is_dist(): |
|
|
"""Determine if the training is distributed""" |
|
|
if use_torchacc(): |
|
|
return False |
|
|
rank, local_rank, _, _ = get_dist_setting() |
|
|
return rank >= 0 and local_rank >= 0 |
|
|
|
|
|
|
|
|
def is_mp() -> bool: |
|
|
if use_torchacc(): |
|
|
return False |
|
|
if strtobool(os.environ.get('USE_FAST_INFERENCE', 'false')): |
|
|
return False |
|
|
from swift.utils import get_device_count |
|
|
n_gpu = get_device_count() |
|
|
local_world_size = get_dist_setting()[3] |
|
|
assert n_gpu % local_world_size == 0, f'n_gpu: {n_gpu}, local_world_size: {local_world_size}' |
|
|
if n_gpu // local_world_size >= 2: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def is_mp_ddp() -> bool: |
|
|
|
|
|
if is_dist() and is_mp(): |
|
|
logger.info('Using MP(device_map) + DDP') |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def is_dist_ta() -> bool: |
|
|
"""Determine if the TorchAcc training is distributed""" |
|
|
_, _, world_size, _ = get_dist_setting() |
|
|
if use_torchacc() and world_size > 1: |
|
|
if not dist.is_initialized(): |
|
|
import torchacc as ta |
|
|
|
|
|
dist.init_process_group(backend=ta.dist.BACKEND_NAME) |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
|
|
|
def is_pai_training_job() -> bool: |
|
|
return 'PAI_TRAINING_JOB_ID' in os.environ |
|
|
|
|
|
|
|
|
def get_pai_tensorboard_dir() -> Optional[str]: |
|
|
return os.environ.get('PAI_OUTPUT_TENSORBOARD') |
|
|
|