Spaces:
Runtime error
Runtime error
import signal | |
import sys | |
import os | |
import time | |
from typing import Union | |
import platform | |
from psutil import virtual_memory, cpu_count | |
import numpy as np | |
from torch.utils.data import DataLoader | |
import torch | |
from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn | |
from transformers import PreTrainedTokenizerFast | |
from torch_optimizer import Adafactor | |
# import accelerate | |
from accelerate import Accelerator | |
from accelerate.utils import set_seed | |
# import 自定义类和函数 | |
from model.chat_model import TextToTextModel | |
from utils.logger import Logger | |
from model.dataset import MyDataset | |
from config import TrainConfig, T5ModelConfig | |
from utils.functions import ( | |
get_bleu4_score, | |
save_model_config, | |
get_free_space_of_disk, | |
my_average, | |
get_path_of_suffix_files, | |
get_T5_config, | |
) | |
class ChatTrainer: | |
def __init__(self, train_config: TrainConfig, model_config: T5ModelConfig, ) -> None: | |
self.train_config = train_config | |
self.model_config = model_config | |
# file_name=None会自动生成以当前日期命名的log文件名 | |
self.logger = Logger('chat_trainer', std_out=True, save2file=True, file_name=None) | |
self.model = None | |
self.accelerator = None | |
signal.signal(signal.SIGINT, self.process_exit_handler) | |
self.is_win_platform = True if platform.system().lower() == 'windows' else False | |
torch.manual_seed(train_config.seed) | |
torch.cuda.manual_seed_all(train_config.seed) | |
def process_exit_handler(self, signal_received, frame) -> None: | |
''' | |
进程退出时的操作,保存模型 | |
''' | |
if self.accelerator and self.model: | |
ask = "you are pressed `ctrl+c`, do you want to save checkpoint? Yes (y) or No (n)" | |
self.accelerator.print(ask) | |
ins = input() | |
if ins.lower() in ('yes', 'y'): | |
suffix = 'exit_save_{}'.format(str(time.strftime('%Y%m%d%H%M%S', time.localtime()))) | |
self.accelerator.wait_for_everyone() | |
self.accelerator.save_state(output_dir=self.train_config.train_state_dir) | |
self.accelerator.print('model ckeck point has been saved in {}'.format(self.train_config.train_state_dir)) | |
sys.exit(0) | |
else: | |
print('process not in trainingg, exit.') | |
sys.exit(0) | |
def save_model(self, suffix: Union[str, int]) -> None: | |
'''保存模型到文件 | |
注意:save_model不能放到is_main_process里面 | |
e.g: | |
>>> self.save_model(epoch) # 在这里使用 | |
>>> if accelerator.is_main_process: | |
>>> do_somthing() | |
''' | |
if self.model and self.accelerator: | |
# 先wait_for_everyone,再保存 | |
self.accelerator.wait_for_everyone() | |
if self.accelerator.is_main_process: | |
unwrap_model = self.accelerator.unwrap_model(self.model) | |
model_dict = self.accelerator.get_state_dict(unwrap_model) | |
torch.save(model_dict, self.train_config.model_file.format(suffix)) | |
def delete_early_checkpoint(self, epoch: int, keep_latest_n: int=3,) -> None: | |
''' | |
删除最早的模型,最保留最近keep_latest_n个模型文件 | |
''' | |
model_save_path = self.train_config.model_file | |
model_save_path = model_save_path.replace('\\', '/') # 针对win的路径,将\替换为/ | |
model_save_path = '/'.join(model_save_path.split('/')[0: -1]) # 删除末尾文件名后缀 | |
model_files = get_path_of_suffix_files(model_save_path, suffix='.bin', with_create_time=True) | |
# 进程异常退出保存模型文件不在删除范围 | |
train_save_model_fils = [] | |
for item in model_files: | |
if 'exit_save' not in item[0]: | |
# 大于当前epoch的文件不不删除 | |
f_epoch = int(item[0].split('.')[-2]) | |
if epoch >= f_epoch: | |
print(epoch, f_epoch, item) | |
train_save_model_fils.append(item) | |
train_save_model_fils.sort(key=lambda x: x[1]) # 按照时间从小到大排序 | |
if len(train_save_model_fils) <= keep_latest_n: | |
return | |
to_delete_files = train_save_model_fils[0: -keep_latest_n] | |
for item in to_delete_files: | |
os.remove(item[0]) | |
def train(self, is_keep_training: bool=False, is_finetune: bool=False) -> None: | |
''' | |
is_keep_training: 是否从断点处加载状态继续训练 | |
is_finetune: 是否微调,微调的话可能需要冻结部分参数 | |
''' | |
log = self.logger | |
train_config = self.train_config | |
save_steps = self.train_config.save_steps | |
logging_steps = self.train_config.logging_steps | |
# 梯度累计的步数 | |
accumulation_steps = train_config.gradient_accumulation_steps | |
set_seed(train_config.seed) | |
accelerator = Accelerator( | |
mixed_precision=train_config.mixed_precision, # 混合精度 | |
gradient_accumulation_steps=accumulation_steps, # 梯度累积 | |
project_dir=train_config.train_state_dir, | |
) | |
# 根据剩余内存大小决定是否完全加载数据集到内存中 | |
unuse_mem = virtual_memory().available / (1024 ** 3) # 单位:GB | |
unuse_disk = get_free_space_of_disk('./') | |
# 剩余内存≥48GB将把数据集留在内存中,因为2个显卡+全全部装载900多万的训练数据到内存需要大概43GB的CPU内存 | |
# 如果不放在内存中,将会使用迭代器生成数据,CPU 内存小于16GB也可以运行,但是不支持顺序打乱。 | |
# 多GPU keep_in_memory必须=True,否则无法进行分布式训练 | |
keep_in_memory = True if unuse_mem >= 48.0 or torch.cuda.device_count() >= 2 else False | |
if accelerator.is_main_process: | |
log.info('cpu memory available: {:.2f} GB, disk space available: {:.2f} GB, keep dataset in memory: {}.'\ | |
.format(unuse_mem, unuse_disk, keep_in_memory), save_to_file=True) | |
log.info('operation: {}, keep training: {}, loading datasets ...'.format('finetune' if is_finetune else 'train', is_keep_training)) | |
# args for dataloader | |
num_workers = 0 | |
# if not self.is_win_platform: | |
# cpu_cnt = cpu_count(logical=False) | |
# gpu_cnt = torch.cuda.device_count() | |
# if cpu_cnt >= 8 * gpu_cnt: | |
# # num_workers = 4 x number of available GPUs | |
# num_workers = int(4 * gpu_cnt) | |
# else: | |
# num_workers = int(cpu_cnt // 2) | |
train_dataset = MyDataset( | |
parquet_file=train_config.train_file, | |
tokenizer_dir=train_config.tokenizer_dir, | |
keep_in_memory=keep_in_memory, | |
max_seq_len=train_config.max_seq_len, | |
) | |
valid_dataset = MyDataset( | |
parquet_file=train_config.validation_file, | |
tokenizer_dir=train_config.tokenizer_dir, | |
keep_in_memory=keep_in_memory, | |
max_seq_len=train_config.max_seq_len, | |
) | |
batch_size = train_config.batch_size_per_gpu | |
train_dataloader = DataLoader( | |
train_dataset, | |
batch_size=batch_size, | |
shuffle=True, | |
collate_fn=train_dataset.collate_fn, | |
pin_memory=False, | |
num_workers=num_workers, #设置>1会导致cpu内存缓慢增涨,最后OOM,后面再研究为什么,num_workers=4,一个epoch只减少30分钟 | |
) | |
valid_dataloader = DataLoader( | |
valid_dataset, | |
batch_size=batch_size, | |
shuffle=False, | |
collate_fn=valid_dataset.collate_fn, | |
pin_memory=False, | |
num_workers=num_workers, | |
) | |
device = accelerator.device | |
log.info('using device: {} '.format(str(device)), save_to_file=True) | |
# T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` | |
tokenizer = train_dataset.tokenizer | |
decoder_start_token_id = tokenizer.pad_token_id | |
# for t5, set decoder_start_token_id = pad_token_id | |
t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=decoder_start_token_id, eos_token_id=tokenizer.eos_token_id) | |
model = TextToTextModel(t5_config) | |
# 微调加载的模型并冻结embedding和encoder | |
if is_finetune: | |
model.load_state_dict(torch.load(train_config.finetune_from_ckp_file)) | |
# print(model) | |
layers_to_freeze = [model.shared, model.encoder] | |
for layer in layers_to_freeze: | |
for param in layer.parameters(): | |
param.requires_grad = False | |
# 保存模型配置,方便修改配置后恢复 | |
save_model_config(t5_config.to_diff_dict(), train_config.model_config_file) | |
# T5训练,论文推荐使用Adafactor | |
optimizer = Adafactor(params=model.parameters(), lr=train_config.learn_rate) | |
# 获取当前机器有多少个GPU,默认全部使用 | |
num_gpus_used = accelerator.state.num_processes | |
# 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used | |
# total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况 | |
total_batch_size = train_config.batch_size_per_gpu | |
if num_gpus_used >= 1: | |
total_batch_size = num_gpus_used * train_config.batch_size_per_gpu | |
steps_per_epoch = int(np.ceil(len(train_dataset) // total_batch_size)) | |
eval_steps = int(np.ceil(len(valid_dataset) // total_batch_size)) | |
if accelerator.is_main_process: | |
log.info('train dataset size: {}, steps per epoch:{}; validation dataset size: {}, steps per validation: {}; datalodater num_workers: {}.'\ | |
.format(len(train_dataset), steps_per_epoch, len(valid_dataset), eval_steps, num_workers), save_to_file=True) | |
lr_scheduler = torch.optim.lr_scheduler.OneCycleLR( | |
optimizer=optimizer, | |
max_lr=train_config.div_factor * train_config.learn_rate, | |
epochs=train_config.epochs, | |
steps_per_epoch=int(np.ceil( len(train_dataset) / (batch_size * accumulation_steps) )), # 梯度累积相当于增大了batch_size | |
div_factor=train_config.div_factor, | |
cycle_momentum=False, | |
) | |
model, optimizer, lr_scheduler, train_dataloader, valid_dataloader = accelerator.prepare( | |
model, | |
optimizer, | |
lr_scheduler, | |
train_dataloader, | |
valid_dataloader, | |
) | |
if is_keep_training: | |
accelerator.load_state(input_dir=train_config.train_state_dir) | |
accelerator.register_for_checkpointing(lr_scheduler) | |
self.model = model | |
self.accelerator = accelerator | |
best_bleu4 = 0.0 | |
best_epoch = 0 | |
epoch_loss_list = [] | |
# 添加进度条,只在主进程更新 | |
if accelerator.is_main_process: | |
progress = Progress(TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
TimeRemainingColumn(), | |
TimeElapsedColumn(), | |
TextColumn("[bold blue]{task.fields[show_info]}"), | |
refresh_per_second=1, # 每1秒钟更新一次,不要频繁更新 | |
) | |
epoch_progress = progress.add_task(description='epoch: ', show_info='', total=train_config.epochs) | |
steps_progress = progress.add_task(description='steps: ', show_info='', \ | |
total=np.ceil(steps_per_epoch / logging_steps)) | |
eval_progress = progress.add_task(description='evaluate: ', show_info='', total=eval_steps, visible=False) | |
self.progress = progress | |
self.eval_progress = eval_progress | |
progress.start() | |
# end if | |
for epoch in range(train_config.epochs): | |
if accelerator.is_main_process: | |
epoch_show_txt = 'epoch: {}/{}, avg_loss: {:.6f}, best_epoch: {}, best_bleu: {}'.format( | |
epoch, train_config.epochs, my_average(epoch_loss_list), best_epoch, best_bleu4 | |
) | |
progress.update(epoch_progress, show_info=epoch_show_txt) | |
progress.reset(steps_progress) | |
epoch_loss_list = [] | |
model.train() | |
# torch.cuda.empty_cache() | |
for step, batch_data in enumerate(train_dataloader): | |
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
target_ids = batch_data['target_ids'] | |
# for t5 model, all labels set to `-100` are ignored (masked) | |
target_ids[target_ids == decoder_start_token_id] = -100 | |
outputs = model( | |
input_ids=input_ids, | |
attention_mask=input_mask, | |
labels=target_ids, | |
) | |
loss = outputs.loss.mean() / accumulation_steps | |
# attention here! loss.backward() | |
accelerator.backward(loss) | |
# 梯度累计 | |
if (step + 1) % accumulation_steps == 0: | |
accelerator.clip_grad_norm_(model.parameters(), 1.0) | |
optimizer.step() | |
lr_scheduler.step() | |
optimizer.zero_grad() | |
# 每隔save_steps步保存一次模型 | |
if (step + 1) % save_steps == 0 or step == steps_per_epoch: | |
self.save_model('epoch_{}_latest'.format(epoch)) | |
accelerator.save_state(output_dir=train_config.train_state_dir) | |
# ==================================以下记录loss到日志============================================ | |
# 每n步更新一次,避免频繁的cpu-gpu数据复制 | |
# 参考:https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#avoid-unnecessary-cpu-gpu-synchronization | |
if step % logging_steps == 0 or step == steps_per_epoch: | |
loss_cpu = loss.detach().item() * accumulation_steps | |
epoch_loss_list.append(loss_cpu) | |
info_txt = 'training loss: epoch:{}, step:{}, loss:{}, device:{}'.\ | |
format(epoch, step, loss_cpu, str(accelerator.device)) | |
log.info(info_txt, std_out=False, save_to_file=True) # 保存 loss 到文件 | |
# 更新进度条 | |
if accelerator.is_main_process: | |
step_show_txt = 'step: {}/{}, loss: {:.6f}'.format(step, steps_per_epoch, loss_cpu) | |
progress.advance(steps_progress, advance=1) | |
progress.update(steps_progress, show_info=step_show_txt) | |
# ==================================以上记录loss到日志============================================ | |
# if step >= 20:break | |
# end for batch setps | |
model.eval() | |
cur_bleu4_score = self.evaluate( | |
model=model, | |
tokenizer=tokenizer, | |
valid_dataloader=valid_dataloader, | |
accelerator=accelerator, | |
eval_steps=eval_steps, | |
) | |
# save model | |
if cur_bleu4_score >= best_bleu4: | |
best_bleu4 = cur_bleu4_score | |
best_epoch = epoch | |
# 最多保存最近keep_latest_n_ckp个模型文件 | |
# self.delete_early_checkpoint(epoch=epoch, keep_latest_n=train_config.keep_latest_n_ckp) | |
self.save_model('best') | |
accelerator.save_state(output_dir=train_config.train_state_dir) | |
# 每个epoch打印一下日志 | |
if accelerator.is_main_process: | |
progress.advance(epoch_progress, advance=1) | |
info_txt = 'epoch log: epoch:{}, avg_loss:{}, cur_bleu4:{}, best_bleu4:{}, best_epoch:{}'.\ | |
format(epoch, my_average(epoch_loss_list), cur_bleu4_score, best_bleu4, best_epoch) | |
# log.info(info_txt, std_out=True, save_to_file=True) | |
self.print_and_log(info_txt, accelerator) | |
def evaluate(self, | |
model: TextToTextModel, | |
tokenizer: PreTrainedTokenizerFast, | |
valid_dataloader: DataLoader, | |
accelerator: Accelerator, | |
eval_steps: int, | |
) -> float: | |
''' | |
评估,返回平均的bleu分数 | |
''' | |
max_seq_len = self.train_config.max_seq_len | |
batch_decode = tokenizer.batch_decode | |
bleu4_scores = [] | |
if accelerator.is_main_process: | |
self.progress.reset(self.eval_progress) | |
self.progress.update(self.eval_progress, visible=True) | |
with torch.no_grad(): | |
for step, batch_data in enumerate(valid_dataloader): | |
if accelerator.is_main_process: | |
self.progress.advance(self.eval_progress, advance=1) | |
self.progress.update(self.eval_progress, show_info='step: {}/{}'.format(step, eval_steps)) | |
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
target_ids = batch_data['target_ids'] | |
outputs = accelerator.unwrap_model(model).my_generate( | |
input_ids=input_ids, | |
attention_mask=input_mask, | |
max_seq_len=max_seq_len, | |
) | |
# gather data from multi-gpus (used when in ddp mode) | |
outputs = accelerator.gather_for_metrics(outputs).detach().cpu().numpy() | |
target_ids = accelerator.gather_for_metrics(target_ids).detach().cpu().numpy() | |
outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
# print(outputs, target_ids) | |
bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))] | |
bleu4_scores.extend(bleu4_scores) | |
# if step >= 5: break | |
avg_bleu4_score = my_average(bleu4_scores) | |
if accelerator.is_main_process: | |
self.progress.update(self.eval_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score)) | |
self.progress.update(self.eval_progress, visible=False) | |
return avg_bleu4_score | |
def test(self, best_epoch: int=0) -> None: | |
''' | |
''' | |
import os | |
train_config = self.train_config | |
log = self.logger | |
# args for dataloader | |
num_workers = 0 if self.is_win_platform else 4 | |
test_dataset = MyDataset( | |
parquet_file=train_config.train_file, | |
tokenizer_dir=train_config.tokenizer_dir, | |
keep_in_memory=False if self.is_win_platform else True, | |
max_seq_len=train_config.max_seq_len, | |
) | |
test_dataloader = DataLoader( | |
test_dataset, | |
batch_size=train_config.batch_size_per_gpu, | |
shuffle=False, | |
collate_fn=test_dataset.collate_fn, | |
pin_memory=False, | |
num_workers=num_workers, | |
) | |
log.info('test dataset size: {}.'.format(len(test_dataset)), save_to_file=True) | |
set_seed(train_config.seed) | |
accelerator = Accelerator(mixed_precision=train_config.mixed_precision) | |
device = accelerator.device | |
log.info('using device: {} '.format(str(device)), save_to_file=True) | |
# 获取当前运行使用了多少个GPU | |
num_gpus_used = accelerator.state.num_processes | |
# 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used | |
# total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况 | |
total_batch_size = train_config.batch_size_per_gpu | |
if num_gpus_used >= 1: | |
total_batch_size = num_gpus_used * train_config.batch_size_per_gpu | |
# T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` | |
tokenizer = test_dataset.tokenizer | |
model_file = train_config.model_file.format(best_epoch) | |
if os.path.isdir(model_file): | |
# 传入文件夹则 from_pretrained | |
model = TextToTextModel.from_pretrained(model_file) | |
else: | |
# load_state_dict | |
t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id) | |
model = TextToTextModel(t5_config) | |
model.load_state_dict(torch.load(model_file, map_location='cpu')) # set cpu for no exception | |
model, test_dataloader = accelerator.prepare( | |
model, | |
test_dataloader, | |
) | |
steps = int(np.ceil(len(test_dataset) // total_batch_size)) | |
bleu4 = 0.0 | |
bleu4_scores = [] | |
batch_decode = tokenizer.batch_decode | |
max_seq_len = self.train_config.max_seq_len | |
model.eval() | |
if accelerator.is_main_process: | |
progress = Progress(TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
TimeRemainingColumn(), | |
TimeElapsedColumn(), | |
TextColumn("[bold blue]{task.fields[show_info]}"), | |
refresh_per_second=1.0, | |
) | |
steps_progress = progress.add_task(description='steps: ', show_info='', total=steps) | |
progress.start() | |
with torch.no_grad(): | |
for step, batch_data in enumerate(test_dataloader): | |
if accelerator.is_main_process: | |
progress.advance(steps_progress, advance=1) | |
progress.update(steps_progress, show_info='step: {}/{}'.format(step, steps)) | |
input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
target_ids = batch_data['target_ids'] | |
# s = time.time() | |
outputs = accelerator.unwrap_model(model).my_generate( | |
input_ids=input_ids, | |
attention_mask=input_mask, | |
max_seq_len=max_seq_len, | |
) | |
# accelerator.print('generate used: {}'.format(time.time() - s)) | |
# gather data from multi-gpus (used when in ddp mode) | |
outputs = accelerator.gather_for_metrics(outputs).cpu().numpy() | |
target_ids = accelerator.gather_for_metrics(target_ids).cpu().numpy() | |
outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
# print('outputs: {}'.format(outputs[0:5])) | |
# print('target_ids: {}'.format(target_ids[0:5])) | |
# print() | |
bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))] | |
bleu4_scores.extend(bleu4_scores) | |
# if step >= 10: break | |
avg_bleu4_score = my_average(bleu4_scores) | |
if accelerator.is_main_process: | |
progress.update(steps_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score)) | |
info_txt = 'test_dataset_size: {}, avg_bleu4_score:{}.'.format(len(test_dataset), avg_bleu4_score) | |
log.info(info_txt, save_to_file=True) | |
return avg_bleu4_score | |
def print_and_log(self, info: str, accelerator: Accelerator=None) -> None: | |
''' | |
使用accelerator.print, 否则多进程打印会异常 | |
''' | |
if not accelerator: | |
print(info) | |
else: | |
accelerator.print(info) | |
self.logger.info(info, std_out=False, save_to_file=True) | |
if __name__ == '__main__': | |
# trainer = ChatTrainer() | |
train_config = TrainConfig() | |
model_config = T5ModelConfig() | |
chat_trainer = ChatTrainer(train_config=train_config, model_config=model_config) | |
chat_trainer.train() | |
# chat_trainer.test(best_epoch=0) |