chatmlTest / model /trainer.py
fangshengren's picture
Upload 59 files
f4fac26 verified
import signal
import sys
import os
import time
from typing import Union
import platform
from psutil import virtual_memory, cpu_count
import numpy as np
from torch.utils.data import DataLoader
import torch
from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn
from transformers import PreTrainedTokenizerFast
from torch_optimizer import Adafactor
# import accelerate
from accelerate import Accelerator
from accelerate.utils import set_seed
# import 自定义类和函数
from model.chat_model import TextToTextModel
from utils.logger import Logger
from model.dataset import MyDataset
from config import TrainConfig, T5ModelConfig
from utils.functions import (
get_bleu4_score,
save_model_config,
get_free_space_of_disk,
my_average,
get_path_of_suffix_files,
get_T5_config,
)
class ChatTrainer:
def __init__(self, train_config: TrainConfig, model_config: T5ModelConfig, ) -> None:
self.train_config = train_config
self.model_config = model_config
# file_name=None会自动生成以当前日期命名的log文件名
self.logger = Logger('chat_trainer', std_out=True, save2file=True, file_name=None)
self.model = None
self.accelerator = None
signal.signal(signal.SIGINT, self.process_exit_handler)
self.is_win_platform = True if platform.system().lower() == 'windows' else False
torch.manual_seed(train_config.seed)
torch.cuda.manual_seed_all(train_config.seed)
def process_exit_handler(self, signal_received, frame) -> None:
'''
进程退出时的操作,保存模型
'''
if self.accelerator and self.model:
ask = "you are pressed `ctrl+c`, do you want to save checkpoint? Yes (y) or No (n)"
self.accelerator.print(ask)
ins = input()
if ins.lower() in ('yes', 'y'):
suffix = 'exit_save_{}'.format(str(time.strftime('%Y%m%d%H%M%S', time.localtime())))
self.accelerator.wait_for_everyone()
self.accelerator.save_state(output_dir=self.train_config.train_state_dir)
self.accelerator.print('model ckeck point has been saved in {}'.format(self.train_config.train_state_dir))
sys.exit(0)
else:
print('process not in trainingg, exit.')
sys.exit(0)
def save_model(self, suffix: Union[str, int]) -> None:
'''保存模型到文件
注意:save_model不能放到is_main_process里面
e.g:
>>> self.save_model(epoch) # 在这里使用
>>> if accelerator.is_main_process:
>>> do_somthing()
'''
if self.model and self.accelerator:
# 先wait_for_everyone,再保存
self.accelerator.wait_for_everyone()
if self.accelerator.is_main_process:
unwrap_model = self.accelerator.unwrap_model(self.model)
model_dict = self.accelerator.get_state_dict(unwrap_model)
torch.save(model_dict, self.train_config.model_file.format(suffix))
def delete_early_checkpoint(self, epoch: int, keep_latest_n: int=3,) -> None:
'''
删除最早的模型,最保留最近keep_latest_n个模型文件
'''
model_save_path = self.train_config.model_file
model_save_path = model_save_path.replace('\\', '/') # 针对win的路径,将\替换为/
model_save_path = '/'.join(model_save_path.split('/')[0: -1]) # 删除末尾文件名后缀
model_files = get_path_of_suffix_files(model_save_path, suffix='.bin', with_create_time=True)
# 进程异常退出保存模型文件不在删除范围
train_save_model_fils = []
for item in model_files:
if 'exit_save' not in item[0]:
# 大于当前epoch的文件不不删除
f_epoch = int(item[0].split('.')[-2])
if epoch >= f_epoch:
print(epoch, f_epoch, item)
train_save_model_fils.append(item)
train_save_model_fils.sort(key=lambda x: x[1]) # 按照时间从小到大排序
if len(train_save_model_fils) <= keep_latest_n:
return
to_delete_files = train_save_model_fils[0: -keep_latest_n]
for item in to_delete_files:
os.remove(item[0])
def train(self, is_keep_training: bool=False, is_finetune: bool=False) -> None:
'''
is_keep_training: 是否从断点处加载状态继续训练
is_finetune: 是否微调,微调的话可能需要冻结部分参数
'''
log = self.logger
train_config = self.train_config
save_steps = self.train_config.save_steps
logging_steps = self.train_config.logging_steps
# 梯度累计的步数
accumulation_steps = train_config.gradient_accumulation_steps
set_seed(train_config.seed)
accelerator = Accelerator(
mixed_precision=train_config.mixed_precision, # 混合精度
gradient_accumulation_steps=accumulation_steps, # 梯度累积
project_dir=train_config.train_state_dir,
)
# 根据剩余内存大小决定是否完全加载数据集到内存中
unuse_mem = virtual_memory().available / (1024 ** 3) # 单位:GB
unuse_disk = get_free_space_of_disk('./')
# 剩余内存≥48GB将把数据集留在内存中,因为2个显卡+全全部装载900多万的训练数据到内存需要大概43GB的CPU内存
# 如果不放在内存中,将会使用迭代器生成数据,CPU 内存小于16GB也可以运行,但是不支持顺序打乱。
# 多GPU keep_in_memory必须=True,否则无法进行分布式训练
keep_in_memory = True if unuse_mem >= 48.0 or torch.cuda.device_count() >= 2 else False
if accelerator.is_main_process:
log.info('cpu memory available: {:.2f} GB, disk space available: {:.2f} GB, keep dataset in memory: {}.'\
.format(unuse_mem, unuse_disk, keep_in_memory), save_to_file=True)
log.info('operation: {}, keep training: {}, loading datasets ...'.format('finetune' if is_finetune else 'train', is_keep_training))
# args for dataloader
num_workers = 0
# if not self.is_win_platform:
# cpu_cnt = cpu_count(logical=False)
# gpu_cnt = torch.cuda.device_count()
# if cpu_cnt >= 8 * gpu_cnt:
# # num_workers = 4 x number of available GPUs
# num_workers = int(4 * gpu_cnt)
# else:
# num_workers = int(cpu_cnt // 2)
train_dataset = MyDataset(
parquet_file=train_config.train_file,
tokenizer_dir=train_config.tokenizer_dir,
keep_in_memory=keep_in_memory,
max_seq_len=train_config.max_seq_len,
)
valid_dataset = MyDataset(
parquet_file=train_config.validation_file,
tokenizer_dir=train_config.tokenizer_dir,
keep_in_memory=keep_in_memory,
max_seq_len=train_config.max_seq_len,
)
batch_size = train_config.batch_size_per_gpu
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
collate_fn=train_dataset.collate_fn,
pin_memory=False,
num_workers=num_workers, #设置>1会导致cpu内存缓慢增涨,最后OOM,后面再研究为什么,num_workers=4,一个epoch只减少30分钟
)
valid_dataloader = DataLoader(
valid_dataset,
batch_size=batch_size,
shuffle=False,
collate_fn=valid_dataset.collate_fn,
pin_memory=False,
num_workers=num_workers,
)
device = accelerator.device
log.info('using device: {} '.format(str(device)), save_to_file=True)
# T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]`
tokenizer = train_dataset.tokenizer
decoder_start_token_id = tokenizer.pad_token_id
# for t5, set decoder_start_token_id = pad_token_id
t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=decoder_start_token_id, eos_token_id=tokenizer.eos_token_id)
model = TextToTextModel(t5_config)
# 微调加载的模型并冻结embedding和encoder
if is_finetune:
model.load_state_dict(torch.load(train_config.finetune_from_ckp_file))
# print(model)
layers_to_freeze = [model.shared, model.encoder]
for layer in layers_to_freeze:
for param in layer.parameters():
param.requires_grad = False
# 保存模型配置,方便修改配置后恢复
save_model_config(t5_config.to_diff_dict(), train_config.model_config_file)
# T5训练,论文推荐使用Adafactor
optimizer = Adafactor(params=model.parameters(), lr=train_config.learn_rate)
# 获取当前机器有多少个GPU,默认全部使用
num_gpus_used = accelerator.state.num_processes
# 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used
# total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况
total_batch_size = train_config.batch_size_per_gpu
if num_gpus_used >= 1:
total_batch_size = num_gpus_used * train_config.batch_size_per_gpu
steps_per_epoch = int(np.ceil(len(train_dataset) // total_batch_size))
eval_steps = int(np.ceil(len(valid_dataset) // total_batch_size))
if accelerator.is_main_process:
log.info('train dataset size: {}, steps per epoch:{}; validation dataset size: {}, steps per validation: {}; datalodater num_workers: {}.'\
.format(len(train_dataset), steps_per_epoch, len(valid_dataset), eval_steps, num_workers), save_to_file=True)
lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(
optimizer=optimizer,
max_lr=train_config.div_factor * train_config.learn_rate,
epochs=train_config.epochs,
steps_per_epoch=int(np.ceil( len(train_dataset) / (batch_size * accumulation_steps) )), # 梯度累积相当于增大了batch_size
div_factor=train_config.div_factor,
cycle_momentum=False,
)
model, optimizer, lr_scheduler, train_dataloader, valid_dataloader = accelerator.prepare(
model,
optimizer,
lr_scheduler,
train_dataloader,
valid_dataloader,
)
if is_keep_training:
accelerator.load_state(input_dir=train_config.train_state_dir)
accelerator.register_for_checkpointing(lr_scheduler)
self.model = model
self.accelerator = accelerator
best_bleu4 = 0.0
best_epoch = 0
epoch_loss_list = []
# 添加进度条,只在主进程更新
if accelerator.is_main_process:
progress = Progress(TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
TimeElapsedColumn(),
TextColumn("[bold blue]{task.fields[show_info]}"),
refresh_per_second=1, # 每1秒钟更新一次,不要频繁更新
)
epoch_progress = progress.add_task(description='epoch: ', show_info='', total=train_config.epochs)
steps_progress = progress.add_task(description='steps: ', show_info='', \
total=np.ceil(steps_per_epoch / logging_steps))
eval_progress = progress.add_task(description='evaluate: ', show_info='', total=eval_steps, visible=False)
self.progress = progress
self.eval_progress = eval_progress
progress.start()
# end if
for epoch in range(train_config.epochs):
if accelerator.is_main_process:
epoch_show_txt = 'epoch: {}/{}, avg_loss: {:.6f}, best_epoch: {}, best_bleu: {}'.format(
epoch, train_config.epochs, my_average(epoch_loss_list), best_epoch, best_bleu4
)
progress.update(epoch_progress, show_info=epoch_show_txt)
progress.reset(steps_progress)
epoch_loss_list = []
model.train()
# torch.cuda.empty_cache()
for step, batch_data in enumerate(train_dataloader):
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask']
target_ids = batch_data['target_ids']
# for t5 model, all labels set to `-100` are ignored (masked)
target_ids[target_ids == decoder_start_token_id] = -100
outputs = model(
input_ids=input_ids,
attention_mask=input_mask,
labels=target_ids,
)
loss = outputs.loss.mean() / accumulation_steps
# attention here! loss.backward()
accelerator.backward(loss)
# 梯度累计
if (step + 1) % accumulation_steps == 0:
accelerator.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# 每隔save_steps步保存一次模型
if (step + 1) % save_steps == 0 or step == steps_per_epoch:
self.save_model('epoch_{}_latest'.format(epoch))
accelerator.save_state(output_dir=train_config.train_state_dir)
# ==================================以下记录loss到日志============================================
# 每n步更新一次,避免频繁的cpu-gpu数据复制
# 参考:https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#avoid-unnecessary-cpu-gpu-synchronization
if step % logging_steps == 0 or step == steps_per_epoch:
loss_cpu = loss.detach().item() * accumulation_steps
epoch_loss_list.append(loss_cpu)
info_txt = 'training loss: epoch:{}, step:{}, loss:{}, device:{}'.\
format(epoch, step, loss_cpu, str(accelerator.device))
log.info(info_txt, std_out=False, save_to_file=True) # 保存 loss 到文件
# 更新进度条
if accelerator.is_main_process:
step_show_txt = 'step: {}/{}, loss: {:.6f}'.format(step, steps_per_epoch, loss_cpu)
progress.advance(steps_progress, advance=1)
progress.update(steps_progress, show_info=step_show_txt)
# ==================================以上记录loss到日志============================================
# if step >= 20:break
# end for batch setps
model.eval()
cur_bleu4_score = self.evaluate(
model=model,
tokenizer=tokenizer,
valid_dataloader=valid_dataloader,
accelerator=accelerator,
eval_steps=eval_steps,
)
# save model
if cur_bleu4_score >= best_bleu4:
best_bleu4 = cur_bleu4_score
best_epoch = epoch
# 最多保存最近keep_latest_n_ckp个模型文件
# self.delete_early_checkpoint(epoch=epoch, keep_latest_n=train_config.keep_latest_n_ckp)
self.save_model('best')
accelerator.save_state(output_dir=train_config.train_state_dir)
# 每个epoch打印一下日志
if accelerator.is_main_process:
progress.advance(epoch_progress, advance=1)
info_txt = 'epoch log: epoch:{}, avg_loss:{}, cur_bleu4:{}, best_bleu4:{}, best_epoch:{}'.\
format(epoch, my_average(epoch_loss_list), cur_bleu4_score, best_bleu4, best_epoch)
# log.info(info_txt, std_out=True, save_to_file=True)
self.print_and_log(info_txt, accelerator)
def evaluate(self,
model: TextToTextModel,
tokenizer: PreTrainedTokenizerFast,
valid_dataloader: DataLoader,
accelerator: Accelerator,
eval_steps: int,
) -> float:
'''
评估,返回平均的bleu分数
'''
max_seq_len = self.train_config.max_seq_len
batch_decode = tokenizer.batch_decode
bleu4_scores = []
if accelerator.is_main_process:
self.progress.reset(self.eval_progress)
self.progress.update(self.eval_progress, visible=True)
with torch.no_grad():
for step, batch_data in enumerate(valid_dataloader):
if accelerator.is_main_process:
self.progress.advance(self.eval_progress, advance=1)
self.progress.update(self.eval_progress, show_info='step: {}/{}'.format(step, eval_steps))
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask']
target_ids = batch_data['target_ids']
outputs = accelerator.unwrap_model(model).my_generate(
input_ids=input_ids,
attention_mask=input_mask,
max_seq_len=max_seq_len,
)
# gather data from multi-gpus (used when in ddp mode)
outputs = accelerator.gather_for_metrics(outputs).detach().cpu().numpy()
target_ids = accelerator.gather_for_metrics(target_ids).detach().cpu().numpy()
outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False)
target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)
# print(outputs, target_ids)
bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))]
bleu4_scores.extend(bleu4_scores)
# if step >= 5: break
avg_bleu4_score = my_average(bleu4_scores)
if accelerator.is_main_process:
self.progress.update(self.eval_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score))
self.progress.update(self.eval_progress, visible=False)
return avg_bleu4_score
def test(self, best_epoch: int=0) -> None:
'''
'''
import os
train_config = self.train_config
log = self.logger
# args for dataloader
num_workers = 0 if self.is_win_platform else 4
test_dataset = MyDataset(
parquet_file=train_config.train_file,
tokenizer_dir=train_config.tokenizer_dir,
keep_in_memory=False if self.is_win_platform else True,
max_seq_len=train_config.max_seq_len,
)
test_dataloader = DataLoader(
test_dataset,
batch_size=train_config.batch_size_per_gpu,
shuffle=False,
collate_fn=test_dataset.collate_fn,
pin_memory=False,
num_workers=num_workers,
)
log.info('test dataset size: {}.'.format(len(test_dataset)), save_to_file=True)
set_seed(train_config.seed)
accelerator = Accelerator(mixed_precision=train_config.mixed_precision)
device = accelerator.device
log.info('using device: {} '.format(str(device)), save_to_file=True)
# 获取当前运行使用了多少个GPU
num_gpus_used = accelerator.state.num_processes
# 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used
# total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况
total_batch_size = train_config.batch_size_per_gpu
if num_gpus_used >= 1:
total_batch_size = num_gpus_used * train_config.batch_size_per_gpu
# T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]`
tokenizer = test_dataset.tokenizer
model_file = train_config.model_file.format(best_epoch)
if os.path.isdir(model_file):
# 传入文件夹则 from_pretrained
model = TextToTextModel.from_pretrained(model_file)
else:
# load_state_dict
t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id)
model = TextToTextModel(t5_config)
model.load_state_dict(torch.load(model_file, map_location='cpu')) # set cpu for no exception
model, test_dataloader = accelerator.prepare(
model,
test_dataloader,
)
steps = int(np.ceil(len(test_dataset) // total_batch_size))
bleu4 = 0.0
bleu4_scores = []
batch_decode = tokenizer.batch_decode
max_seq_len = self.train_config.max_seq_len
model.eval()
if accelerator.is_main_process:
progress = Progress(TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
TimeElapsedColumn(),
TextColumn("[bold blue]{task.fields[show_info]}"),
refresh_per_second=1.0,
)
steps_progress = progress.add_task(description='steps: ', show_info='', total=steps)
progress.start()
with torch.no_grad():
for step, batch_data in enumerate(test_dataloader):
if accelerator.is_main_process:
progress.advance(steps_progress, advance=1)
progress.update(steps_progress, show_info='step: {}/{}'.format(step, steps))
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask']
target_ids = batch_data['target_ids']
# s = time.time()
outputs = accelerator.unwrap_model(model).my_generate(
input_ids=input_ids,
attention_mask=input_mask,
max_seq_len=max_seq_len,
)
# accelerator.print('generate used: {}'.format(time.time() - s))
# gather data from multi-gpus (used when in ddp mode)
outputs = accelerator.gather_for_metrics(outputs).cpu().numpy()
target_ids = accelerator.gather_for_metrics(target_ids).cpu().numpy()
outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False)
target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)
# print('outputs: {}'.format(outputs[0:5]))
# print('target_ids: {}'.format(target_ids[0:5]))
# print()
bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))]
bleu4_scores.extend(bleu4_scores)
# if step >= 10: break
avg_bleu4_score = my_average(bleu4_scores)
if accelerator.is_main_process:
progress.update(steps_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score))
info_txt = 'test_dataset_size: {}, avg_bleu4_score:{}.'.format(len(test_dataset), avg_bleu4_score)
log.info(info_txt, save_to_file=True)
return avg_bleu4_score
def print_and_log(self, info: str, accelerator: Accelerator=None) -> None:
'''
使用accelerator.print, 否则多进程打印会异常
'''
if not accelerator:
print(info)
else:
accelerator.print(info)
self.logger.info(info, std_out=False, save_to_file=True)
if __name__ == '__main__':
# trainer = ChatTrainer()
train_config = TrainConfig()
model_config = T5ModelConfig()
chat_trainer = ChatTrainer(train_config=train_config, model_config=model_config)
chat_trainer.train()
# chat_trainer.test(best_epoch=0)