#!/usr/bin/env python # coding=utf-8 # Copyright 2021 The HuggingFace Team All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # You can also adapt this script on your own masked language modeling task. Pointers for this are left as comments. import os import sys import time from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path from typing import Optional import jax import jax.numpy as jnp import numpy as np import optax from datasets import load_dataset from flax import jax_utils, traverse_util from flax.training import train_state from prefetch_generator import BackgroundGenerator from flax.training.common_utils import get_metrics, onehot, shard from torch.utils.data import DataLoader from tqdm import tqdm from transformers import ( # BartTokenizerFast,; BartTokenizer,; BatchEncoding,; CONFIG_MAPPING,; FLAX_MODEL_FOR_MASKED_LM_MAPPING,; PreTrainedTokenizerBase, DebertaV2Tokenizer, HfArgumentParser, is_tensorboard_available, set_seed, TrainingArguments, ) from configuration_rotobart import * from data_collator import DataCollatorForDenoisingTasks, SentenceTokenize # RotoBART imports from modeling_flax_rotobart import * ## TODO: import from rotobart file # from transformers.models.bart.configuration_bart import shift_tokens_right # MODEL_CONFIG_CLASSES = list(FLAX_MODEL_FOR_MASKED_LM_MAPPING.keys()) # MODEL_TYPES = tuple(conf.model_type for conf in MODEL_CONFIG_CLASSES) @dataclass class ModelArguments: """ Arguments pertaining to which model/config/tokenizer we are going to fine-tune, or train from scratch. """ model_name_or_path: Optional[str] = field( default=None, metadata={ "help": "The model checkpoint for weights initialization." "Don't set if you want to train a model from scratch." }, ) # model_type: Optional[str] = field( # default=None, # metadata={"help": "If training from scratch, pass a model type from the list: " + ", ".join(MODEL_TYPES)}, # ) config_name: Optional[str] = field( default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"} ) tokenizer_name: Optional[str] = field( default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"} ) cache_dir: Optional[str] = field( default=None, metadata={"help": "Where do you want to store the pretrained models downloaded from s3"} ) use_fast_tokenizer: bool = field( default=True, metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."}, ) dtype: Optional[str] = field( default="float32", metadata={ "help": "Floating-point format in which the model weights should be initialized and trained. Choose one of `[float32, float16, bfloat16]`." }, ) encoder_layers: Optional[int] = field( default=2, metadata={"help": "Number of encoder layers"}, ) decoder_layers: Optional[int] = field( default=2, metadata={"help": "Number of decoder layers"}, ) encoder_ffn_dim: Optional[int] = field( default=256, metadata={"help": "Dimension of encoder feedforward network"}, ) decoder_ffn_dim: Optional[int] = field( default=256, metadata={"help": "Dimension of decoder feedforward network"}, ) decoder_ffn_dim: Optional[int] = field(default=256, metadata={"help": "Dimension of decoder feedforward network"}) d_model: Optional[int] = field(default=1024, metadata={"help": "Dimension of model"}) vocab_size: Optional[int] = field(default=128100, metadata={"help": "Vocab size"}) max_position_embeddings: Optional[int] = field(default=1024, metadata={"help": "Max position embeddings"}) encoder_layerdrop: Optional[float] = field(default=0.0, metadata={"help": "Max position embeddings"}) decoder_layerdrop: Optional[float] = field(default=0.0, metadata={"help": "Max position embeddings"}) use_bf16: bool = field(default=False, metadata={"help": "Train in bf16 or not"}) grad_accum: Optional[int] = field(default=4, metadata={"help": "Number of steps to accumulate gradients over"}) @dataclass class DataTrainingArguments: """ Arguments pertaining to what data we are going to input our model for training and eval. """ dataset_name: Optional[str] = field( default=None, metadata={"help": "The name of the dataset to use (via the datasets library)."} ) dataset_path: Optional[str] = field(default="./pile.py", metadata={"help": "Path to custom dataset file."}) dataset_config_name: Optional[str] = field( default=None, metadata={"help": "The configuration name of the dataset to use (via the datasets library)."} ) train_file: Optional[str] = field(default=None, metadata={"help": "The input training data file (a text file)."}) validation_file: Optional[str] = field( default=None, metadata={"help": "An optional input evaluation data file to evaluate the perplexity on (a text file)."}, ) train_ref_file: Optional[str] = field( default=None, metadata={"help": "An optional input train ref data file for whole word masking in Chinese."}, ) validation_ref_file: Optional[str] = field( default=None, metadata={"help": "An optional input validation ref data file for whole word masking in Chinese."}, ) overwrite_cache: bool = field(default=False, metadata={"help": "Overwrite the cached training and evaluation sets"}) validation_split_percentage: Optional[int] = field( default=5, metadata={"help": "The percentage of the train set used as validation set in case there's no validation split"}, ) max_seq_length: Optional[int] = field( default=None, metadata={ "help": "The maximum total input sequence length after tokenization and masking. Sequences longer than this will be truncated. Default to the max input length of the model." }, ) preprocessing_num_workers: Optional[int] = field( default=None, metadata={"help": "The number of processes to use for the preprocessing."}, ) mlm_probability: float = field( default=0.15, metadata={"help": "Ratio of tokens to mask for span masked language modeling loss"} ) mean_noise_span_length: float = field( default=3.0, metadata={"help": "Mean span length of masked tokens"}, ) shuffle_buffer_size: int = field( default=1000, metadata={"help": "The number of examples to pre-load for shuffling."} ) auth_token: bool = field( default=False, metadata={"help": "Use authorisation token"} ) num_train_steps: int = field(default=50000, metadata={"help": "The number of training steps."}) num_eval_samples: int = field(default=50000, metadata={"help": "The number of samples to be used for evaluation"}) use_wandb: bool = field(default=False, metadata={"help": "Use Weights & Biases for experiment tracking"}) testing: bool = field(default=False, metadata={"help": "If testing, only 1 train batch will be used"}) colab_tpu: bool = field(default=False, metadata={"help": "Whether you are training on a colab TPU"}) def generate_batch_splits(samples_idx: jnp.ndarray, batch_size: int) -> jnp.ndarray: num_samples = len(samples_idx) samples_to_remove = num_samples % batch_size if samples_to_remove != 0: samples_idx = samples_idx[:-samples_to_remove] sections_split = num_samples // batch_size batch_idx = np.split(samples_idx, sections_split) return batch_idx def write_train_metric(summary_writer, train_metrics, train_time, step): summary_writer.scalar("train_time", train_time, step) train_metrics = get_metrics(train_metrics) for key, vals in train_metrics.items(): tag = f"train_{key}" for i, val in enumerate(vals): summary_writer.scalar(tag, val, step - len(vals) + i + 1) def write_eval_metric(summary_writer, eval_metrics, step): for metric_name, value in eval_metrics.items(): summary_writer.scalar(f"eval_{metric_name}", value, step) if __name__ == "__main__": # See all possible arguments in src/transformers/training_args.py # or by passing the --help flag to this script. # We now keep distinct sets of args, for a cleaner separation of concerns. parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments)) if len(sys.argv) == 2 and sys.argv[1].endswith(".json"): # If we pass only one argument to the script and it's the path to a json file, # let's parse it to get our arguments. model_args, data_args, training_args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1])) else: model_args, data_args, training_args = parser.parse_args_into_dataclasses() if ( os.path.exists(training_args.output_dir) and os.listdir(training_args.output_dir) and training_args.do_train and not training_args.overwrite_output_dir ): raise ValueError( f"Output directory ({training_args.output_dir}) already exists and is not empty." "Use --overwrite_output_dir to overcome." ) # Make our output dir os.makedirs(training_args.output_dir, exist_ok=True) # Setup Colab TPU if data_args.colab_tpu: print("Setting up colab TPU") import jax.tools.colab_tpu jax.tools.colab_tpu.setup_tpu() print(f"Colab TPU setup complete, jax.device_count: {jax.device_count()}") print(f"\nDEVICE COUNT: {jax.local_device_count()}\n") # Set seed before initializing model. set_seed(training_args.seed) wikitext = data_args.dataset_path == "wikitext" if not wikitext: # Load Datasets # Train Dataset - Stream The Pile dataset print("Loading train data") train_dataset = load_dataset( data_args.dataset_path, split="train", cache_dir=model_args.cache_dir, streaming=True, use_auth_token=data_args.auth_token, ) print("Loading eval data") # Test Dataset - Stream The Pile ataset eval_dataset = load_dataset( data_args.dataset_path, split="validation", streaming=True, use_auth_token=data_args.auth_token, cache_dir=model_args.cache_dir, ) else: # Load Datasets # Train Dataset - Stream The Pile dataset print("Loading train data") train_dataset = load_dataset( data_args.dataset_path, "wikitext-103-raw-v1", split="train", cache_dir=model_args.cache_dir, streaming=True ) print("Loading eval data") # Test Dataset - Stream The Pile dataset eval_dataset = load_dataset( data_args.dataset_path, "wikitext-103-raw-v1", split="validation", streaming=True, cache_dir=model_args.cache_dir, ) # Sentence Tokenization # Used for Sentence Permutation sent_tok = SentenceTokenize() sent_tokenized_train_dataset = train_dataset.map(sent_tok, batched=True, batch_size=1) sent_tokenized_eval_dataset = eval_dataset.map(sent_tok, batched=True, batch_size=1) # Do Tokenization tokenizer = DebertaV2Tokenizer.from_pretrained( model_args.tokenizer_name, unk_token="", sep_token="", pad_token="", cls_token="", mask_token="", eos_token="", bos_token="", ) max_seq_length = min(data_args.max_seq_length, tokenizer.model_max_length) text_column_name = "text" def tokenize_function(examples): return tokenizer( examples[text_column_name], truncation=True, max_length=max_seq_length, padding="max_length", return_token_type_ids=False ) # sent_tokenized_train_dataset = sent_tokenized_train_dataset.filter(lambda example: len(examples[text_column_name]) > 0) # sent_tokenized_eval_dataset = sent_tokenized_eval_dataset.filter(lambda example: len(examples[text_column_name]) > 0) tokenized_train_dataset = sent_tokenized_train_dataset.map(tokenize_function, batched=True, batch_size=1) tokenized_eval_dataset = sent_tokenized_eval_dataset.map(tokenize_function, batched=True, batch_size=1) # Shuffle the training dataset shuffled_train_dataset = tokenized_train_dataset.shuffle(buffer_size=data_args.shuffle_buffer_size, seed=training_args.seed) # items returned by shuffled dataset is nested in a list, we need to flatten it # e.g: {"input_ids": [[1,2,3,4]]} -> {"input_ids": [1,2,3,4]} def flatten(example): for k, v in example.items(): if isinstance(v[0], list): example[k] = v[0] else: example[k] = v return example shuffled_train_dataset = shuffled_train_dataset.map(flatten) # Log to Weights and Biases if data_args.use_wandb and jax.process_index() == 0: import wandb wandb.init(entity="wandb", project="hf-flax-rotobart", sync_tensorboard=False) wandb.config.update(training_args) # optional, log your configs wandb.config.update(model_args) # optional, log your configs wandb.config.update(data_args) # optional, log your configs # Set up model logging to Weights & Biases model_artifact = wandb.Artifact(f'{wandb.run.id}', type='model') model_artifact.add_dir(training_args.output_dir) # Enable tensorboard only on the master node has_tensorboard = is_tensorboard_available() if has_tensorboard and jax.process_index() == 0: try: from flax.metrics.tensorboard import SummaryWriter summary_writer = SummaryWriter(log_dir=Path(training_args.output_dir)) except ImportError as ie: has_tensorboard = False logger.warning( f"Unable to display metrics through TensorBoard because some package are not installed: {ie}" ) else: logger.warning( "Unable to display metrics through TensorBoard because the package is not installed: " "Please run pip install tensorboard to enable." ) # Initialize our training rng = jax.random.PRNGKey(training_args.seed) dropout_rngs = jax.random.split(rng, jax.local_device_count()) # Load Model # TODO: Leverage AutoConfig config = RotoBARTConfig( encoder_layers=model_args.encoder_layers, encoder_ffn_dim=model_args.encoder_ffn_dim, decoder_layers=model_args.decoder_layers, decoder_ffn_dim=model_args.decoder_ffn_dim, d_model=model_args.d_model, vocab_size=model_args.vocab_size, max_position_embeddings=model_args.max_position_embeddings, encoder_layerdrop=model_args.encoder_layerdrop, decoder_layerdrop=model_args.decoder_layerdrop, ) # TODO: Load model from config # model = FlaxAutoModelForMaskedLM.from_config(config, seed=training_args.seed, dtype=getattr(jnp, model_args.dtype)) model = FlaxRotoBARTForConditionalGeneration( config=config, seed=training_args.seed, dtype=getattr(jnp, model_args.dtype) ) # Convert model to bf16 if model_args.use_bf16: def to_bf16(t): return jax.tree_map(lambda x: x.astype(jnp.bfloat16) if x.dtype == jnp.float32 else x, t) model.params = to_bf16(model.params) # Store some constant num_epochs = int(training_args.num_train_epochs) train_batch_size = int(training_args.per_device_train_batch_size) * jax.device_count() eval_batch_size = int(training_args.per_device_eval_batch_size) * jax.device_count() # Log batch sizes to Weights and Biases if data_args.use_wandb and jax.process_index() == 0: wandb.config.update({'virtual_train_bs': int(model_args.grad_accum * train_batch_size)}) wandb.config.update({'train_bs': train_batch_size}) wandb.config.update({'eval_bs': eval_batch_size}) wandb.config.update({'num_train_steps': data_args.num_train_steps}) # define number steps per stream epoch num_train_steps = data_args.num_train_steps # Create learning rate schedule warmup_fn = optax.linear_schedule( init_value=0.0, end_value=training_args.learning_rate, transition_steps=training_args.warmup_steps ) decay_fn = optax.linear_schedule( init_value=training_args.learning_rate, end_value=0, transition_steps=num_train_steps - training_args.warmup_steps, ) linear_decay_lr_schedule_fn = optax.join_schedules( schedules=[warmup_fn, decay_fn], boundaries=[training_args.warmup_steps] ) # We use Optax's "masking" functionality to not apply weight decay # to bias and LayerNorm scale parameters. decay_mask_fn returns a # mask boolean with the same structure as the parameters. # The mask is True for parameters that should be decayed. # Note that this mask is specifically adapted for FlaxBERT-like models. # For other models, one should correct the layer norm parameter naming # accordingly. def decay_mask_fn(params): flat_params = traverse_util.flatten_dict(params) layer_norm_params = [ (name, "scale") for name in ["self_attn_layer_norm", "layernorm_embedding", "final_layer_norm"] ] flat_mask = {path: (path[-1] != "bias" and path[-2:] not in layer_norm_params) for path in flat_params} return traverse_util.unflatten_dict(flat_mask) # create optimizer if training_args.adafactor: # We use the default parameters here to initialize adafactor, # For more details about the parameters please check # https://github.com/deepmind/optax/blob/ed02befef9bf81cbbf236be3d2b0e032e9ed4a40/optax/_src/alias.py#L74 optimizer = optax.adafactor( learning_rate=linear_decay_lr_schedule_fn, weight_decay_rate=training_args.weight_decay, ) else: optimizer = optax.adamw( learning_rate=linear_decay_lr_schedule_fn, b1=training_args.adam_beta1, b2=training_args.adam_beta2, eps=training_args.adam_epsilon, weight_decay=training_args.weight_decay, mask=decay_mask_fn, ) clip = 1.0 my_optimizer = optax.chain( optax.clip_by_global_norm(clip), optimizer, optax.apply_every(model_args.grad_accum), ) # Setup train state state = train_state.TrainState.create(apply_fn=model.__call__, params=model.params, tx=my_optimizer) num_parameters = sum(p.size for p in jax.tree_leaves(model.params)) print(f"Number of Parameters: {num_parameters}") # Log num model parameters to Weights and Biases if data_args.use_wandb and jax.process_index() == 0: wandb.config.update({'model_parameters' : num_parameters}) def loss_fn(logits, labels): shift_logits = logits[..., :-1, :] shift_labels = labels[..., 1:] loss = optax.softmax_cross_entropy(shift_logits, onehot(shift_labels, shift_logits.shape[-1])) return loss.mean() # Define gradient update step fn def train_step(state, batch, dropout_rng): dropout_rng, new_dropout_rng = jax.random.split(dropout_rng) def compute_loss(params): labels = batch.pop("labels") logits = state.apply_fn(**batch, params=params, dropout_rng=dropout_rng, train=True)[0] loss = loss_fn(logits, labels) return loss grad_fn = jax.value_and_grad(compute_loss) loss, grad = grad_fn(state.params) grad = jax.lax.pmean(grad, "batch") new_state = state.apply_gradients(grads=grad) metrics = jax.lax.pmean( {"loss": loss, "learning_rate": linear_decay_lr_schedule_fn(state.step)}, axis_name="batch" ) return new_state, metrics, new_dropout_rng # Create parallel version of the train step p_train_step = jax.pmap(train_step, "batch", donate_argnums=(0,)) # Define eval fn def eval_step(params, batch): labels = batch.pop("labels") logits = model(**batch, params=params, train=False)[0] # compute loss, ignore padded input tokens label_mask = jnp.where(labels > 0, 1.0, 0.0) loss = optax.softmax_cross_entropy(logits, onehot(labels, logits.shape[-1])) * label_mask # compute accuracy accuracy = jnp.equal(jnp.argmax(logits, axis=-1), labels) * label_mask # summarize metrics metrics = {"loss": loss.sum(), "accuracy": accuracy.sum(), "normalizer": label_mask.sum()} metrics = jax.lax.psum(metrics, axis_name="batch") return metrics p_eval_step = jax.pmap(eval_step, "batch", donate_argnums=(0,)) # Replicate the train state on each device state = jax_utils.replicate(state) train_time = 0 train_start = time.time() train_metrics = [] eval_metrics = [] data_collator = DataCollatorForDenoisingTasks(tokenizer) training_iter = BackgroundGenerator( DataLoader(shuffled_train_dataset.with_format("torch"), batch_size=train_batch_size, collate_fn=data_collator), max_prefetch=128, ) eval_iter = BackgroundGenerator( DataLoader(tokenized_eval_dataset.with_format("torch"), batch_size=eval_batch_size, collate_fn=data_collator), max_prefetch=128, ) max_seq_length = min(data_args.max_seq_length, tokenizer.model_max_length) print("Start training") steps = tqdm(range(num_train_steps), desc="Training...", position=0) for step in range(num_train_steps): # ======================== Training ================================ model_inputs = next(training_iter) # Model forward model_inputs = shard(model_inputs.data) # model_inputs = shard(samples.data) state, train_metric, dropout_rngs = p_train_step(state, model_inputs, dropout_rngs) train_metrics.append(train_metric) if step % training_args.logging_steps == 0 and step > 0: steps.write( f"Step... ({step} | Loss: {train_metric['loss'].mean()}, Learning Rate: {train_metric['learning_rate'].mean()})" ) train_time += time.time() - train_start # if has_tensorboard and jax.process_index() == 0: # write_train_metric(summary_writer, train_metrics, train_time, step) if jax.process_index() == 0: train_metrics = get_metrics(train_metrics) log_dict={} for key, vals in train_metrics.items(): tag = f"train/train_{key}" for i, val in enumerate(vals): log_dict[tag] = val # wandb.log(log_dict) train_metrics = [] # ======================== Evaluating ============================== if step % training_args.eval_steps == 0 and step > 0: num_eval_batches = data_args.num_eval_samples // eval_batch_size #breakpoint() #print(f'Step={step}') #print(f'Eval_step={training_args.eval_steps}') #print(f'Num_eval_batches={num_eval_batches}') #print(f'Num_eval_samples={data_args.num_eval_samples}') for _ in tqdm(range(num_eval_batches), desc="Evaluating ...", position=1): #print("$") # process input samples model_inputs = next(eval_iter) # Model forward model_inputs = shard(model_inputs.data) metrics = p_eval_step(state.params, model_inputs) eval_metrics.append(metrics) # normalize eval metrics eval_metrics = get_metrics(eval_metrics) eval_metrics = jax.tree_map(jnp.sum, eval_metrics) eval_normalizer = eval_metrics.pop("normalizer") eval_metrics = jax.tree_map(lambda x: x / eval_normalizer, eval_metrics) # Update progress bar steps.desc = f"Step... ({step + 1}/{num_train_steps} | Loss: {eval_metrics['loss']}, Acc: {eval_metrics['accuracy']})" # print(f"Step... ({step + 1}/{num_train_steps} | Loss: {eval_metrics['loss']}, Acc: {eval_metrics['accuracy']})") # if has_tensorboard and jax.process_index() == 0: # write_eval_metric(summary_writer, eval_metrics, step) if jax.process_index() == 0: log_dict[f"eval/eval_loss"] = np.asarray(eval_metrics['loss']) log_dict[f"eval/eval_accuracy"] = np.asarray(eval_metrics['accuracy']) eval_metrics = [] # save checkpoint after each epoch and push checkpoint to the hub if jax.process_index() == 0 and str(training_args.save_strategy) == "IntervalStrategy.EPOCH": print(f'Saving model in training_args.output_dir') params = jax.device_get(jax.tree_map(lambda x: x[0], state.params)) model.save_pretrained( training_args.output_dir, params=params, push_to_hub=training_args.push_to_hub, commit_message=f"Saving weights and logs of step {step+1}", ) # Log model to Weights and Biases too if data_args.use_wandb: print('Saving checkpoint to W&B Artifacts') wandb.log_artifact(model_artifact, aliases=[f'{step}']) # save checkpoint on steps and push checkpoint to the hub #print(f"should I save? training save steps = {training_args.save_steps} and step = {step}. Strategy = {training_args.save_strategy}, {str(training_args.save_strategy)}") if (training_args.save_steps % (step + 1)) == 0 and str(training_args.save_strategy) == "IntervalStrategy.STEPS": print(f'Saving model in training_args.output_dir') params = jax.device_get(jax.tree_map(lambda x: x[0], state.params)) model.save_pretrained( training_args.output_dir, params=params, push_to_hub=training_args.push_to_hub, commit_message=f"Saving weights and logs of step {step+1}", ) # Log model to Weights and Biases too if data_args.use_wandb: print('Saving checkpoint to W&B Artifacts') wandb.log_artifact(model_artifact, aliases=[f'{step}']) # Log all train metrics, and sometimes eval metrics, to W&B # print(log_dict) if data_args.use_wandb: wandb.log(log_dict) # update tqdm bar steps.update(1)