File size: 14,772 Bytes
3ff9962 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
import contextlib
import copy
import logging
import math
import os
import re
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
import torch
from mlflow.transformers import _fetch_model_card, _write_license_information
from transformers import PreTrainedModel, PreTrainedTokenizerBase
from .mpt import MPTConfig, MPTForCausalLM
from .utils import init_empty_weights
from .huggingface_hub_utils import edit_files_for_hf_compatibility
log = logging.getLogger(__name__)
_LICENSE_FILE_PATTERN = re.compile('license(\\.[a-z]+|$)', re.IGNORECASE)
def _maybe_get_license_filename(local_dir: str, pretrained_model_name: Optional[str]=None) -> Optional[str]:
"""Returns the name of the license file if it exists in the local_dir.
Note: This is intended to be consistent with the code in MLflow.
https://github.com/mlflow/mlflow/blob/5d13d6ec620a02de9a5e31201bf1becdb9722ea5/mlflow/transformers/__init__.py#L1152
Since LLM Foundry supports local model files being used rather than fetching the files from the Hugging Face Hub,
MLflow's logic to fetch and write the license information on model save is not applicable; it will try to search for
a Hugging Face repo named after the local path. However, the user can provide the original pretrained model name,
in which case this function will use that to fetch the correct license information.
If the license file does not exist, returns None.
"""
try:
license_filename = next((file for file in os.listdir(local_dir) if _LICENSE_FILE_PATTERN.search(file)))
if pretrained_model_name is not None:
log.info(f'Overwriting license file {license_filename} with license info for model {pretrained_model_name} from Hugging Face Hub')
os.remove(os.path.join(local_dir, license_filename))
model_card = _fetch_model_card(pretrained_model_name)
local_dir_path = Path(local_dir).absolute()
_write_license_information(pretrained_model_name, model_card, local_dir_path)
license_filename = next((file for file in os.listdir(local_dir) if _LICENSE_FILE_PATTERN.search(file)))
return license_filename
except StopIteration:
return None
class HuggingFaceCheckpointer(Callback):
"""Save a huggingface formatted checkpoint during training.
Args:
save_folder (str): Top level folder to save checkpoints to (can be a
URI). It is likely that this would be the same as your save_folder.
save_interval: Union[str, int, Time]: The interval describing how often
checkpoints should be saved. If an integer, it will be assumed to be
in :attr:`.TimeUnit.EPOCH`. Otherwise, the unit must be either
:attr:`.TimeUnit.EPOCH`, :attr:`.TimeUnit.BATCH`,
:attr:`.TimeUnit.TOKEN`, or :attr:`.TimeUnit.SAMPLE`.
huggingface_folder_name (str): Folder to save each checkpoint under (can
be a format string). Default is ``ba{batch}``.
precision: The precision to save the model in. Default is ``float32``.
Options are ``bfloat16``, ``float16``, or ``float32``.
overwrite (bool): Whether to overwrite previous checkpoints.
mlflow_registered_model_name (Optional[str]): The name to register the
model under in the MLflow model registry. If ``None``, the model
will not be registered. Default is ``None``.
mlflow_logging_config (Optional[dict]): A dictionary of config arguments
that will get passed along to the MLflow ``save_model`` call.
Expected to contain ``metadata`` and ``task`` keys. If either is
unspecified, the defaults are ``'text-generation'`` and
``{'task': 'llm/v1/completions'}`` respectively. A default input example
and signature intended for text generation is also included under the
keys ``input_example`` and ``signature``.
flatten_imports (Sequence[str]): A sequence of import prefixes that will
be flattened when editing MPT files.
"""
def __init__(self, save_folder: str, save_interval: Union[str, int, Time], huggingface_folder_name: str='ba{batch}', precision: str='float32', overwrite: bool=True, mlflow_registered_model_name: Optional[str]=None, mlflow_logging_config: Optional[dict]=None, flatten_imports: Sequence[str]=('llmfoundry',)):
_, _, self.save_dir_format_str = parse_uri(save_folder)
self.overwrite = overwrite
self.precision = precision
self.dtype = {'float32': torch.float32, 'float16': torch.float16, 'bfloat16': torch.bfloat16}[precision]
self.flatten_imports = flatten_imports
self.mlflow_registered_model_name = mlflow_registered_model_name
if mlflow_logging_config is None:
mlflow_logging_config = {}
if self.mlflow_registered_model_name is not None:
import numpy as np
passed_metadata = mlflow_logging_config.get('metadata', {})
mlflow_logging_config['metadata'] = passed_metadata
mlflow_logging_config.setdefault('task', 'llm/v1/completions')
default_input_example = {'prompt': np.array(['What is Machine Learning?'])}
is_chat = mlflow_logging_config['task'].endswith('chat') or mlflow_logging_config['metadata'].get('task', '').endswith('chat')
if is_chat:
default_input_example = {'messages': np.array([{'role': 'user', 'content': 'What is Machine Learning?'}])}
mlflow_logging_config.setdefault('example_no_conversion', True)
mlflow_logging_config.setdefault('input_example', default_input_example)
self.mlflow_logging_config = mlflow_logging_config
self.huggingface_folder_name_fstr = os.path.join('huggingface', huggingface_folder_name)
self.save_interval: Time = Time.from_input(save_interval, TimeUnit.EPOCH)
self.check_interval = create_interval_scheduler(self.save_interval, include_end_of_training=True)
self.remote_ud = maybe_create_remote_uploader_downloader_from_uri(save_folder, loggers=[])
if self.remote_ud is not None:
self.remote_ud._num_concurrent_uploads = 4
self.last_checkpoint_batch: Optional[Time] = None
self.mlflow_loggers = []
def run_event(self, event: Event, state: State, logger: Logger) -> None:
if state.get_elapsed_duration() is not None and self.check_interval(state, event) and (self.last_checkpoint_batch != state.timestamp.batch):
self._save_checkpoint(state, logger)
elif event == Event.INIT:
if not isinstance(state.model, HuggingFaceModel):
raise ValueError(f'`HuggingFaceCheckpointer` is only compatible with `HuggingFaceModel`s. ' + f'Got {type(state.model)} instead.')
if self.remote_ud is not None:
self.remote_ud.init(state, logger)
state.callbacks.append(self.remote_ud)
if self.mlflow_registered_model_name is not None:
self.mlflow_loggers = [logger_destination for logger_destination in logger.destinations if isinstance(logger_destination, MLFlowLogger)]
if len(self.mlflow_loggers) == 0:
raise ValueError(f'`mlflow_registered_model_name` was set, but no `MLFlowLogger` was found in the `logger.destinations` list. ' + 'Please add an `MLFlowLogger` or set `mlflow_registered_model_name` to `None`.')
import mlflow
mlflow.environment_variables.MLFLOW_HUGGINGFACE_MODEL_MAX_SHARD_SIZE.set('5GB')
def _is_last_batch(self, state: State):
elapsed_duration = state.get_elapsed_duration()
if elapsed_duration is not None and elapsed_duration >= 1.0:
return True
assert state.max_duration is not None
if self.save_interval.unit == TimeUnit.DURATION and self.save_interval.value == 1 and (state.max_duration.unit == TimeUnit.EPOCH):
assert state.dataloader_len is not None
return int(state.timestamp.batch) % math.ceil(state.max_duration.value * state.dataloader_len) == 0
return False
def _save_checkpoint(self, state: State, logger: Logger):
del logger
self.last_checkpoint_batch = state.timestamp.batch
log.info('Saving HuggingFace formatted checkpoint')
from transformers.models.auto.configuration_auto import CONFIG_MAPPING
CONFIG_MAPPING._extra_content['mpt'] = MPTConfig
MPTConfig.register_for_auto_class()
MPTForCausalLM.register_for_auto_class('AutoModelForCausalLM')
save_dir = format_name_with_dist_and_time(str(Path(self.save_dir_format_str) / self.huggingface_folder_name_fstr), state.run_name, state.timestamp)
dir_context_mgr = tempfile.TemporaryDirectory() if self.remote_ud is not None else contextlib.nullcontext(enter_result=save_dir)
with dir_context_mgr as temp_save_dir:
assert isinstance(temp_save_dir, str)
log.debug('Gathering state dict')
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
if state.is_model_ddp:
composer_model = state.model.module
original_model: PreTrainedModel = state.model.module.model
state_dict_model = state.model.module.model
original_tokenizer = state.model.module.tokenizer
elif isinstance(state.model.model, FSDP):
composer_model = state.model
original_model: PreTrainedModel = state.model.model.module
state_dict_model = state.model.model
original_tokenizer = state.model.tokenizer
else:
composer_model = state.model
original_model: PreTrainedModel = state.model.model
state_dict_model = state.model.model
original_tokenizer = state.model.tokenizer
state_dict_context = fsdp_state_dict_type_context(original_model, state_dict_type='full') if not state.is_model_ddp and isinstance(state_dict_model, FSDP) else contextlib.nullcontext()
with state_dict_context:
state_dict = state_dict_model.state_dict()
for k, v in state_dict.items():
if isinstance(v, torch.Tensor):
state_dict[k] = v.to(dtype=self.dtype)
if dist.get_global_rank() == 0:
log.debug('Saving Hugging Face checkpoint in global rank 0')
copied_config = copy.deepcopy(original_model.config)
if copied_config.model_type == 'mpt':
copied_config.attn_config['attn_impl'] = 'torch'
copied_config.init_device = 'cpu'
log.debug(f'Creating new model instance')
if composer_model.using_peft:
active_adapter = original_model.active_adapter
base_model = original_model.get_base_model()
new_base_model_instance = type(base_model)(copied_config)
new_model_instance = type(original_model)(new_base_model_instance, original_model.peft_config[active_adapter])
new_model_instance.to(dtype=self.dtype)
else:
with init_empty_weights():
new_model_instance = type(original_model)(copied_config)
new_model_instance.load_state_dict(state_dict, assign=True)
del state_dict
log.debug('Saving Hugging Face checkpoint to disk')
new_model_instance.save_pretrained(temp_save_dir)
if original_tokenizer is not None:
assert isinstance(original_tokenizer, PreTrainedTokenizerBase)
original_tokenizer.save_pretrained(temp_save_dir)
if original_model.config.model_type == 'mpt':
log.debug('Editing MPT files for HuggingFace compatibility')
edit_files_for_hf_compatibility(temp_save_dir, self.flatten_imports)
if self.remote_ud is not None:
for filename in os.listdir(temp_save_dir):
remote_file_name = os.path.join(save_dir, filename)
remote_file_uri = self.remote_ud.remote_backend.get_uri(remote_file_name)
log.info(f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}')
self.remote_ud.upload_file(state=state, remote_file_name=remote_file_name, file_path=Path(os.path.join(temp_save_dir, filename)), overwrite=self.overwrite)
if self.mlflow_registered_model_name and self._is_last_batch(state):
components = {'model': new_model_instance}
if original_tokenizer is not None:
components['tokenizer'] = original_tokenizer
log.debug('Logging Hugging Face model to MLFlow')
for i, mlflow_logger in enumerate(self.mlflow_loggers):
log.debug(f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}')
local_save_path = str(Path(temp_save_dir) / f'mlflow_save_{i}')
import mlflow
mlflow.store._unity_catalog.registry.rest_store.get_feature_dependencies = lambda *args, **kwargs: ''
model_saving_kwargs: Dict[str, Any] = {'path': local_save_path}
if composer_model.using_peft:
model_saving_kwargs['flavor'] = 'peft'
model_saving_kwargs['save_pretrained_dir'] = temp_save_dir
model_saving_kwargs['metadata'] = self.mlflow_logging_config['metadata']
else:
model_saving_kwargs['flavor'] = 'transformers'
model_saving_kwargs['transformers_model'] = components
model_saving_kwargs.update(self.mlflow_logging_config)
mlflow_logger.save_model(**model_saving_kwargs)
license_filename = _maybe_get_license_filename(local_save_path, self.mlflow_logging_config['metadata'].get('pretrained_model_name', None))
if license_filename is not None:
mlflow_logger._mlflow_client.log_artifact(mlflow_logger._run_id, os.path.join(local_save_path, license_filename))
mlflow_logger.register_model_with_run_id(model_uri=local_save_path, name=self.mlflow_registered_model_name, await_creation_for=3600) |