subhankarg's picture
Upload folder using huggingface_hub
0558aa4 verified
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from threading import Lock
from typing import Optional
from nemo.utils.metaclasses import Singleton
@dataclass()
class ModelMetadataRegistry:
"""
Dataclass for model metadata registry.
"""
guid: str
gidx: int
restoration_path: Optional[str] = None
class AppState(metaclass=Singleton):
"""
App state for the application.
"""
def __init__(self):
# method call lock
self.__lock = Lock()
# TODO: should we store global config in hydra_runner?
self._app_cfg = None
# World info
self._device_id = None
self._local_rank = None
self._global_rank = None
self._tensor_model_parallel_rank = None
self._expert_model_parallel_rank = None
self._expert_tensor_parallel_rank = None
self._pipeline_model_parallel_rank = None
self._data_parallel_rank = None
self._world_size = None
self._model_parallel_size = None
self._tensor_model_parallel_size = None
self._tensor_model_parallel_group = None
self._expert_model_parallel_size = None
self._expert_tensor_parallel_size = None
self._pipeline_model_parallel_size = None
self._virtual_pipeline_model_parallel_size = None
self._encoder_tensor_model_parallel_size = None
self._encoder_pipeline_model_parallel_size = None
self._pipeline_model_parallel_group = None
self._pipeline_model_parallel_split_rank = None
self._pipeline_model_parallel_comm_backend = None
self._is_megatron_initialized = False
self._data_parallel_size = None
self._data_parallel_group = None
self._use_tp_pp_dp_mapping = False
self._num_distributed_optimizer_instances = 1
self._megatron_checkpoint_version = None
self._use_fp8 = False
self._context_parallel_size = None
self._init_mpi_proc_gruop = False
self._nccl_communicator_config_path = None
self._use_sharp = False
self._use_gloo_process_groups = True
self._random_seed = None
# Logging info
self._log_dir = None
self._exp_dir = None
self._name = None
self._checkpoint_name = None
self._version = None
self._create_checkpoint_callback = None
self._checkpoint_callback_params = None
# Save and Restore (.nemo)
self._tmpdir_name = None
self._is_model_being_restored = False
self._nemo_file_folder = None
self._model_restore_path = None
self._all_model_restore_paths = []
self._model_guid_map = {} # type: Dict[str, ModelMetadataRegistry]
self._restore = False # TODO: are this and _is_model_being_restored both needed?
# files from a previous run to move into a new directory
self.files_to_move = []
# files to copy into log dir
self._files_to_copy = []
# command-ling arguments for run
self._cmd_args = None
# Insert NVTX ranges to categorize execution
self._nvtx_ranges = False
@property
def device_id(self):
"""Property returns the device_id
Returns:
device_id
"""
return self._device_id
@device_id.setter
def device_id(self, id):
"""Property sets the device_id.
Args:
size (int): The device id.
"""
self._device_id = id
@property
def world_size(self):
"""Property returns the total number of GPUs.
Returns:
Total number of GPUs.
"""
return self._world_size
@world_size.setter
def world_size(self, size):
"""Property sets the total number of GPUs.
Args:
size (int): Total number of GPUs.
"""
self._world_size = size
@property
def model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._model_parallel_size
@model_parallel_size.setter
def model_parallel_size(self, size):
"""Property sets the number of GPUs in each model parallel group.
Args:
size (int): Number of GPUs in each model parallel group.
"""
self._model_parallel_size = size
@property
def tensor_model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._tensor_model_parallel_size
@tensor_model_parallel_size.setter
def tensor_model_parallel_size(self, size):
"""Property sets the number of GPUs in each model parallel group.
Args:
size (int): Number of GPUs in each model parallel group.
"""
self._tensor_model_parallel_size = size
@property
def expert_model_parallel_rank(self):
"""Property returns the expert model parallel rank.
Returns:
Tensor model parallel rank.
"""
return self._expert_model_parallel_rank
@expert_model_parallel_rank.setter
def expert_model_parallel_rank(self, rank):
"""Property sets the expert model parallel rank.
Args:
rank (int): Tensor model parallel rank.
"""
self._expert_model_parallel_rank = rank
@property
def expert_model_parallel_size(self):
"""Property returns the number of GPUs in each expert parallel group.
Returns:
Number of GPUs in each expert parallel group.
"""
return self._expert_model_parallel_size
@expert_model_parallel_size.setter
def expert_model_parallel_size(self, size):
"""Property returns the number of GPUs in each expert parallel group.
Returns:
Number of GPUs in each expert parallel group.
"""
self._expert_model_parallel_size = size
@property
def expert_tensor_parallel_size(self):
"""Property returns the number of GPUs in each expert tensor parallel group.
Returns:
Number of GPUs in each expert tensor parallel group.
"""
return self._expert_tensor_parallel_size
@expert_tensor_parallel_size.setter
def expert_tensor_parallel_size(self, size):
"""Property sets the number of GPUs in each expert tensor parallel group.
Args:
size (int): Number of GPUs in each tensor expert parallel group.
"""
self._expert_tensor_parallel_size = size
@property
def expert_tensor_parallel_rank(self):
"""Property returns the expert tensor model parallel rank.
Returns:
Tensor model parallel rank.
"""
return self._expert_tensor_parallel_rank
@expert_tensor_parallel_rank.setter
def expert_tensor_parallel_rank(self, rank):
"""Property sets the expert tensor model parallel rank.
Args:
rank (int): Tensor model parallel rank.
"""
self._expert_tensor_parallel_rank = rank
@property
def pipeline_model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._pipeline_model_parallel_size
@pipeline_model_parallel_size.setter
def pipeline_model_parallel_size(self, size):
"""Property sets the number of GPUs in each model parallel group.
Args:
size (int): Number of GPUs in each model parallel group.
"""
self._pipeline_model_parallel_size = size
@property
def pipeline_model_parallel_comm_backend(self):
"""Property returns the backend communication library of pipeline communication.
Returns:
Backend communication library of pipeline communication.
"""
return self._pipeline_model_parallel_comm_backend
@pipeline_model_parallel_comm_backend.setter
def pipeline_model_parallel_comm_backend(self, backend):
"""Property sets the backend communication library of pipeline communication.
Args:
backend (str): Backend communication library of pipeline communication.
"""
self._pipeline_model_parallel_comm_backend = backend
@property
def encoder_tensor_model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._encoder_tensor_model_parallel_size
@encoder_tensor_model_parallel_size.setter
def encoder_tensor_model_parallel_size(self, size):
"""Property sets the number of GPUs in each model parallel group.
Args:
size (int): Number of GPUs in each model parallel group.
"""
self._encoder_tensor_model_parallel_size = size
@property
def encoder_pipeline_model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._encoder_pipeline_model_parallel_size
@encoder_pipeline_model_parallel_size.setter
def encoder_pipeline_model_parallel_size(self, size):
"""Property sets the number of GPUs in each model parallel group.
Args:
size (int): Number of GPUs in each model parallel group.
"""
self._encoder_pipeline_model_parallel_size = size
@property
def use_tp_pp_dp_mapping(self):
"""Property returns whether to use TP-PP-DP mapping.
Returns:
Whether to use TP-PP-DP mapping.
"""
return self._use_tp_pp_dp_mapping
@use_tp_pp_dp_mapping.setter
def use_tp_pp_dp_mapping(self, use_new_mapping):
"""Property sets whether to use TP-PP-DP mapping.
Args:
use_new_mapping (bool): Whether to use TP-PP-DP mapping.
"""
self._use_tp_pp_dp_mapping = use_new_mapping
@property
def num_distributed_optimizer_instances(self):
"""Property returns the factor by which the Partial DistOpt is sharded.
Returns:
The partial DistOpt shard factor
"""
return self._num_distributed_optimizer_instances
@num_distributed_optimizer_instances.setter
def num_distributed_optimizer_instances(self, shard_factor):
"""Property sets the factor by which the Partial DistOpt is sharded.
Args:
shard_factor (int): The partial DistOpt shard factor.
"""
self._num_distributed_optimizer_instances = shard_factor
@property
def virtual_pipeline_model_parallel_size(self):
"""Property returns the number of GPUs in each model parallel group.
Returns:
Number of GPUs in each model parallel group.
"""
return self._virtual_pipeline_model_parallel_size
@virtual_pipeline_model_parallel_size.setter
def virtual_pipeline_model_parallel_size(self, size):
"""Property sets the size of the virtual pipeline parallel model.
Args:
size (int): Number of modules in each pipeline parallel model.
"""
self._virtual_pipeline_model_parallel_size = size
@property
def data_parallel_size(self):
"""Property returns the number of GPUs in each data parallel group.
Returns:
Number of GPUs in each data parallel group.
"""
return self._data_parallel_size
@data_parallel_size.setter
def data_parallel_size(self, size):
"""Property sets the number of GPUs in each data parallel group.
Args:
size (int): Number of GPUs in each data parallel group.
"""
self._data_parallel_size = size
@property
def local_rank(self):
"""Property returns the local rank.
Returns:
Local rank.
"""
return self._local_rank
@local_rank.setter
def local_rank(self, rank):
"""Property sets the local rank.
Args:
rank (int): Local rank.
"""
self._local_rank = rank
@property
def global_rank(self):
"""Property returns the global rank.
Returns:
Global rank.
"""
return self._global_rank
@global_rank.setter
def global_rank(self, rank):
"""Property sets the global rank.
Args:
rank (int): Global rank.
"""
self._global_rank = rank
@property
def tensor_model_parallel_rank(self):
"""Property returns the tensor model parallel rank.
Returns:
Tensor model parallel rank.
"""
return self._tensor_model_parallel_rank
@tensor_model_parallel_rank.setter
def tensor_model_parallel_rank(self, rank):
"""Property sets the tensor model parallel rank.
Args:
rank (int): Tensor model parallel rank.
"""
self._tensor_model_parallel_rank = rank
@property
def tensor_model_parallel_group(self):
"""Property returns the tensor model parallel group.
Returns:
Tensor model parallel group.
"""
return self._tensor_model_parallel_group
@tensor_model_parallel_group.setter
def tensor_model_parallel_group(self, group):
"""Property sets the tensor model parallel group.
Args:
group: Tensor model parallel group.
"""
self._tensor_model_parallel_group = group
@property
def pipeline_model_parallel_rank(self):
"""Property returns the pipeline model parallel rank.
Returns:
Pipeline model parallel rank.
"""
return self._pipeline_model_parallel_rank
@pipeline_model_parallel_rank.setter
def pipeline_model_parallel_rank(self, rank):
"""Property sets the pipeline model parallel rank.
Args:
rank (int): Pipeline model parallel rank.
"""
self._pipeline_model_parallel_rank = rank
@property
def virtual_pipeline_model_parallel_rank(self):
"""Property returns the virtual pipeline parallel rank.
Returns:
Model parallel rank.
"""
return self._virtual_pipeline_model_parallel_rank
@virtual_pipeline_model_parallel_rank.setter
def virtual_pipeline_model_parallel_rank(self, rank):
"""Property sets the virtual pipeline parallel rank.
Args:
rank (int): Virtual pipeline parallel rank.
"""
self._virtual_pipeline_model_parallel_rank = rank
@property
def encoder_tensor_model_parallel_rank(self):
"""Property returns the encoder tensor model parallel rank.
Returns:
Tensor model parallel rank.
"""
return self._encoder_tensor_model_parallel_rank
@encoder_tensor_model_parallel_rank.setter
def encoder_tensor_model_parallel_rank(self, rank):
"""Property sets the encoder tensor model parallel rank.
Args:
rank (int): Tensor model parallel rank.
"""
self._encoder_tensor_model_parallel_rank = rank
@property
def encoder_pipeline_model_parallel_rank(self):
"""Property returns the encoder pipeline model parallel rank.
Returns:
Tensor model parallel rank.
"""
return self._encoder_pipeline_model_parallel_rank
@encoder_pipeline_model_parallel_rank.setter
def encoder_pipeline_model_parallel_rank(self, rank):
"""Property sets the encoder pipeline model parallel rank.
Args:
rank (int): Tensor model parallel rank.
"""
self._encoder_pipeline_model_parallel_rank = rank
@property
def pipeline_model_parallel_split_rank(self):
"""Property returns the rank at which Encoder and Decoder are split into different pipelines for
Megatrron Encoder-Decoder models.
Returns:
Pipeline model parallel split rank.
"""
return self._pipeline_model_parallel_split_rank
@pipeline_model_parallel_split_rank.setter
def pipeline_model_parallel_split_rank(self, rank):
"""Property sets the rank at which Encoder and Decoder are split into different pipelines for
Megatron Encoder-Decoder models.
Args:
rank (int): Model parallel split rank.
"""
self._pipeline_model_parallel_split_rank = rank
@property
def pipeline_model_parallel_group(self):
"""Property returns the pipeline model parallel group.
Returns:
Pipeline model parallel group.
"""
return self._pipeline_model_parallel_group
@pipeline_model_parallel_group.setter
def pipeline_model_parallel_group(self, group):
"""Property sets the pipeline model parallel group.
Args:
group: Pipeline model parallel group.
"""
self._pipeline_model_parallel_group = group
@property
def data_parallel_rank(self):
"""Property returns the data parallel rank.
Returns:
Data parallel rank.
"""
return self._data_parallel_rank
@data_parallel_rank.setter
def data_parallel_rank(self, rank):
"""Property sets the data parallel rank.
Args:
rank (int): Data parallel rank.
"""
self._data_parallel_rank = rank
@property
def data_parallel_group(self):
"""Property returns the data parallel group.
Returns:
Data parallel group.
"""
return self._data_parallel_group
@data_parallel_group.setter
def data_parallel_group(self, group):
"""Property sets the data parallel group.
Args:
group: Data parallel group.
"""
self._data_parallel_group = group
@property
def use_fp8(self):
"""Property returns the use of fp8 precision.
Returns:
Use of FP8.
"""
return self._use_fp8
@use_fp8.setter
def use_fp8(self, use_fp8):
"""Property sets the use of fp8 precision.
Args:
use_fp8: Use of FP8.
"""
self._use_fp8 = use_fp8
@property
def use_sharp(self):
"""Property returns whether to use SHARP for all-reduce operations.
Returns:
Whether to use SHARP.
"""
return self._use_sharp
@use_sharp.setter
def use_sharp(self, use_sharp):
"""Property sets whether to use SHARP for all-reduce operations.
Args:
use_sharp (bool): Whether to use SHARP.
"""
self._use_sharp = use_sharp
@property
def use_gloo_process_groups(self):
"""Property returns whether to use Gloo process groups.
Returns:
Whether to use Gloo process groups.
"""
return self._use_gloo_process_groups
@use_gloo_process_groups.setter
def use_gloo_process_groups(self, use_gloo_process_groups):
"""Property sets whether to use Gloo process groups.
Args:
use_gloo_process_groups (bool): Whether to use Gloo process groups.
"""
self._use_gloo_process_groups = use_gloo_process_groups
@property
def context_parallel_size(self):
"""Property returns the number of GPUs in each context parallel group.
Returns:
Number of GPUs in each context parallel group.
"""
return self._context_parallel_size
@context_parallel_size.setter
def context_parallel_size(self, size):
"""Property sets the number of GPUs in each context parallel group.
Args:
size (int): Number of GPUs in each context parallel group.
"""
self._context_parallel_size = size
@property
def init_mpi_proc_group(self):
"""Property sets the initialization of mpi process group.
Returns:
Initialize mpi process group.
"""
return self._init_mpi_proc_group
@init_mpi_proc_group.setter
def init_mpi_proc_group(self, init_mpi_proc_group):
"""Property sets the initialization of mpi process group.
Args:
init_mpi_proc_group: Initialize mpi process group.
"""
self._init_mpi_proc_group = init_mpi_proc_group
@property
def nccl_communicator_config_path(self):
"""Property returns the path to the nccl communicator config.
Returns:
Path to the nccl communicator config.
"""
return self._nccl_communicator_config_path
@nccl_communicator_config_path.setter
def nccl_communicator_config_path(self, path):
"""Property sets the path to the nccl communicator config.
Args:
path (str): Path to the nccl communicator config.
"""
self._nccl_communicator_config_path = path
@property
def random_seed(self):
"""Property returns the random seed.
Returns:
Random seed.
"""
return self._random_seed
@random_seed.setter
def random_seed(self, seed):
"""Property sets the random seed.
Args:
seed (int): Random seed.
"""
self._random_seed = seed
@property
def log_dir(self):
"""Returns the log_dir set by exp_manager."""
return self._log_dir
@log_dir.setter
def log_dir(self, dir):
"""Sets the log_dir property.
Args:
dir (str): Log_dir set by exp_manager.
"""
self._log_dir = dir
@property
def exp_dir(self):
"""Returns the exp_dir set by exp_manager."""
return self._exp_dir
@exp_dir.setter
def exp_dir(self, dir):
"""Sets the log_dir property.
Args:
dir (str): Log_dir set by exp_manager.
"""
self._exp_dir = dir
@property
def name(self):
"""Returns the name set by exp_manager."""
return self._name
@name.setter
def name(self, name):
"""Sets the name property.
Args:
dir (str): name set by exp_manager.
"""
self._name = name
@property
def checkpoint_name(self):
"""Returns the name set by exp_manager."""
return self._checkpoint_name
@checkpoint_name.setter
def checkpoint_name(self, name):
"""Sets the name property.
Args:
dir (str): name set by exp_manager.
"""
self._checkpoint_name = name
@property
def version(self):
"""Returns the version set by exp_manager."""
return self._version
@version.setter
def version(self, version):
"""Sets the version property.
Args:
dir (str): version set by exp_manager.
"""
self._version = version
@property
def create_checkpoint_callback(self):
"""Returns the create_checkpoint_callback set by exp_manager."""
return self._create_checkpoint_callback
@create_checkpoint_callback.setter
def create_checkpoint_callback(self, create_checkpoint_callback):
"""Sets the create_checkpoint_callback property.
Args:
dir (bool): create_checkpoint_callback set by exp_manager.
"""
self._create_checkpoint_callback = create_checkpoint_callback
@property
def checkpoint_callback_params(self):
"""Returns the version set by exp_manager."""
return self._checkpoint_callback_params
@checkpoint_callback_params.setter
def checkpoint_callback_params(self, params):
"""Sets the name property.
Args:
params (dict): checkpoint_callback_params set by exp_manager.
"""
self._checkpoint_callback_params = params
@property
def files_to_move(self):
"""Returns the list of files to move into a separate directory."""
return self._files_to_move
@files_to_move.setter
def files_to_move(self, files):
"""Sets the files_to_move property.
Args:
files (list[str]): list of filenames to move.
"""
self._files_to_move = files
@property
def files_to_copy(self):
"""Returns the list of files to copy into the log dir."""
return self._files_to_copy
@files_to_copy.setter
def files_to_copy(self, files):
"""Sets the files_to_copy property.
Args:
files (list[str]): list of filenames to copy.
"""
self._files_to_copy = files
@property
def cmd_args(self):
"""Returns the command line arguments for the current run."""
return self._cmd_args
@cmd_args.setter
def cmd_args(self, args):
"""Sets the cmd_args property.
Args:
args (list[str]): list of the command line arguments
used to run the experiment.
"""
self._cmd_args = args
@property
def model_restore_path(self):
"""Property returns the model restore path.
Returns:
Model restore path.
"""
restore_path = self._all_model_restore_paths[-1] if len(self._all_model_restore_paths) > 0 else None
return restore_path
@model_restore_path.setter
def model_restore_path(self, path):
"""Property sets the model restore path.
Args:
path (str): Model restore path.
"""
with self.__lock:
self._model_restore_path = path
self._all_model_restore_paths.append(path)
def register_model_guid(self, guid: str, restoration_path: Optional[str] = None):
"""Maps a guid to its restore path (None or last absolute path).
Args:
guid (str): Guid.
restoration_path (Optional[str]): Restore path.
"""
with self.__lock:
if guid in self._model_guid_map:
idx = self._model_guid_map[guid].gidx
else:
idx = len(self._model_guid_map)
self._model_guid_map[guid] = ModelMetadataRegistry(guid, idx, restoration_path=restoration_path)
def reset_model_guid_registry(self):
"""Resets the guid mapping."""
with self.__lock:
self._model_guid_map.clear()
def get_model_metadata_from_guid(self, guid) -> ModelMetadataRegistry:
"""Returns the global model idx and restoration path.
Args:
guid (str): Guid.
Returns:
Model metadata registry.
"""
metadata = self._model_guid_map[guid]
return metadata
@property
def is_model_being_restored(self) -> bool:
"""Property returns whether the model is being restored.
Returns:
Whether the model is being restored.
"""
return self._is_model_being_restored
@is_model_being_restored.setter
def is_model_being_restored(self, is_restored: bool):
"""Property sets whether the model is being restored.
Args:
is_restored (bool): Whether the model is being restored.
"""
self._is_model_being_restored = is_restored
@property
def nemo_file_folder(self) -> str:
"""Property returns the nemo file folder.
Returns:
Nemo file folder.
"""
return self._nemo_file_folder
@nemo_file_folder.setter
def nemo_file_folder(self, path: str):
"""Property sets the nemo file folder.
Args:
path (str): Nemo file folder.
"""
self._nemo_file_folder = path
@property
def restore(self) -> bool:
"""Property returns whether to restore the model.
Returns:
Whether to restore the model.
"""
return self._restore
@restore.setter
def restore(self, restore: bool):
"""Property sets whether to restore the model.
Args:
restore (bool): Whether to restore the model.
"""
self._restore = restore