import logging from functools import partial from pathlib import Path from typing import Any, Dict, List, Tuple, Union
import click import numpy as np from datasets import Dataset, load_dataset from transformers import ( AutoModelForCausalLM, AutoTokenizer, DataCollatorForLanguageModeling, PreTrainedTokenizer, Trainer, TrainingArguments, set_seed, )
loading dataset
def load_training_dataset(path_or_dataset: str = "databricks/databricks-dolly-15k") -> Dataset: logger.info(f"Loading dataset from {path_or_dataset}") dataset = load_dataset(path_or_dataset)["train"] logger.info("Found %d rows", dataset.num_rows)
def _add_text(rec): instruction = rec["instruction"] response = rec["response"] context = rec.get("context")
if not instruction: raise ValueError(f"Expected an instruction in: {rec}")
if not response: raise ValueError(f"Expected a response in: {rec}")
if context: rec["text"] = PROMPT_WITH_INPUT_FORMAT.format(instruction=instruction, response=response, input=context) else: rec["text"] = PROMPT_NO_INPUT_FORMAT.format(instruction=instruction, response=response) return rec
dataset = dataset.map(_add_text)
return dataset
Data Preprocessing and tokenization
def preprocess_dataset(tokenizer: AutoTokenizer, max_length: int, seed=DEFAULT_SEED) -> Dataset: """Loads the training dataset and tokenizes it so it is ready for training. Args: tokenizer (AutoTokenizer): Tokenizer tied to the model. max_length (int): Maximum number of tokens to emit from tokenizer. Returns: Dataset: HuggingFace dataset """
dataset = load_training_dataset()
logger.info("Preprocessing dataset") _preprocessing_function = partial(preprocess_batch, max_length=max_length, tokenizer=tokenizer) dataset = dataset.map( _preprocessing_function, batched=True, remove_columns=["instruction", "context", "response", "text", "category"], )
Make sure we don't have any truncated records, as this would mean the end keyword is missing.
logger.info("Processed dataset has %d rows", dataset.num_rows) dataset = dataset.filter(lambda rec: len(rec["input_ids"]) < max_length) logger.info("Processed dataset has %d rows after filtering for truncated records", dataset.num_rows)
logger.info("Shuffling dataset") dataset = dataset.shuffle(seed=seed)
logger.info("Done preprocessing")
return dataset
Model Training
def train( *, input_model: str, local_output_dir: str, dbfs_output_dir: str, epochs: int, per_device_train_batch_size: int, per_device_eval_batch_size: int, lr: float, seed: int, deepspeed: str, gradient_checkpointing: bool, local_rank: str, bf16: bool, logging_steps: int, save_steps: int, eval_steps: int, test_size: Union[float, int], save_total_limit: int, warmup_steps: int, ): set_seed(seed)
model, tokenizer = get_model_tokenizer( pretrained_model_name_or_path=input_model, gradient_checkpointing=gradient_checkpointing )
Use the same max length that the model supports. Fall back to 1024 if the setting can't be found.
The configuraton for the length can be stored under different names depending on the model. Here we attempt
a few possible names we've encountered.
conf = model.config max_length = None for length_setting in ["n_positions", "max_position_embeddings", "seq_length"]: max_length = getattr(model.config, length_setting, None) if max_length: logger.info(f"Found max lenth: {max_length}") break if not max_length: max_length = 1024 logger.info(f"Using default max length: {max_length}")
processed_dataset = preprocess_dataset(tokenizer=tokenizer, max_length=max_length, seed=seed)
split_dataset = processed_dataset.train_test_split(test_size=test_size, seed=seed)
logger.info("Train data size: %d", split_dataset["train"].num_rows) logger.info("Test data size: %d", split_dataset["test"].num_rows)
data_collator = DataCollatorForCompletionOnlyLM( tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8 )
if not dbfs_output_dir: logger.warn("Will NOT save to DBFS")
training_args = TrainingArguments( output_dir=local_output_dir, per_device_train_batch_size=per_device_train_batch_size, per_device_eval_batch_size=per_device_eval_batch_size, fp16=False, bf16=bf16, learning_rate=lr, num_train_epochs=epochs, deepspeed=deepspeed, gradient_checkpointing=gradient_checkpointing, logging_dir=f"{local_output_dir}/runs", logging_strategy="steps", logging_steps=logging_steps, evaluation_strategy="steps", eval_steps=eval_steps, save_strategy="steps", save_steps=save_steps, save_total_limit=save_total_limit, load_best_model_at_end=False, report_to="tensorboard", disable_tqdm=True, remove_unused_columns=False, local_rank=local_rank, warmup_steps=warmup_steps, )
logger.info("Instantiating Trainer")
trainer = Trainer( model=model, tokenizer=tokenizer, args=training_args, train_dataset=split_dataset["train"], eval_dataset=split_dataset["test"], data_collator=data_collator, )
logger.info("Training") trainer.train()
logger.info(f"Saving Model to {local_output_dir}") trainer.save_model(output_dir=local_output_dir)
if dbfs_output_dir: logger.info(f"Saving Model to {dbfs_output_dir}") trainer.save_model(output_dir=dbfs_output_dir)
logger.info("Done.")
Feedback and reiteration
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
load pre-trained language model and tokenizer
model_name = "microsoft/CodeGPT-small-java" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name)
prepare data for fine-tuning
train_dataset = ...
fine-tune the model
training_args = TrainingArguments( output_dir='./results', evaluation_strategy = "epoch", learning_rate=2e-5, per_device_train_batch_size=16, per_device_eval_batch_size=64, num_train_epochs=1, weight_decay=0.01, push_to_hub=False, ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, data_collator=lambda data: {'input_ids': tokenizer(data['code'], padding=True, truncation=True, max_length=512).input_ids, 'labels': tokenizer(data['code'], padding=True, truncation=True, max_length=512).input_ids}, ) trainer.train()