import os import re import string import contractions import datasets import evaluate import pandas as pd import torch from datasets import Dataset from tqdm import tqdm from transformers import (AutoConfig, AutoModelForSeq2SeqLM, AutoTokenizer, DataCollatorForSeq2Seq, Seq2SeqTrainer, Seq2SeqTrainingArguments) def clean_text(texts): """This fonction makes clean text for the future use""" texts = texts.lower() texts = contractions.fix(texts) texts = texts.translate(str.maketrans("", "", string.punctuation)) texts = re.sub(r"\n", " ", texts) return texts def datasetmaker(path=str): """This fonction take the jsonl file, read it to a dataframe, remove the colums not needed for the task and turn it into a file type Dataset """ data = pd.read_json(path, lines=True) df = data.drop( [ "url", "archive", "title", "date", "compression", "coverage", "density", "compression_bin", "coverage_bin", "density_bin", ], axis=1, ) tqdm.pandas() df["text"] = df.text.apply(lambda texts: clean_text(texts)) df["summary"] = df.summary.apply(lambda summary: clean_text(summary)) dataset = Dataset.from_dict(df) return dataset # voir si le model par hasard esr déjà bien # test_text = dataset['text'][0] # pipe = pipeline('summarization', model = model_ckpt) # pipe_out = pipe(test_text) # print(pipe_out[0]['summary_text'].replace('.', '.\n')) # print(dataset['summary'][0]) def generate_batch_sized_chunks(list_elements, batch_size): """this fonction split the dataset into smaller batches that we can process simultaneously Yield successive batch-sized chunks from list_of_elements.""" for i in range(0, len(list_elements), batch_size): yield list_elements[i: i + batch_size] def calculate_metric(dataset, metric, model, tokenizer, batch_size, device, column_text='text', column_summary='summary'): """this fonction evaluate the model with metric rouge and print a table of rouge scores rouge1', 'rouge2', 'rougeL', 'rougeLsum'""" article_batches = list( str(generate_batch_sized_chunks(dataset[column_text], batch_size)) ) target_batches = list( str(generate_batch_sized_chunks(dataset[column_summary], batch_size)) ) for article_batch, target_batch in tqdm( zip(article_batches, target_batches), total=len(article_batches) ): inputs = tokenizer( article_batch, max_length=1024, truncation=True, padding="max_length", return_tensors="pt", ) # parameter for length penalty ensures that the model does not # generate sequences that are too long. summaries = model.generate( input_ids=inputs["input_ids"].to(device), attention_mask=inputs["attention_mask"].to(device), length_penalty=0.8, num_beams=8, max_length=128, ) # Décode les textes # renplacer les tokens, ajouter des textes décodés avec les rédéfences # vers la métrique. decoded_summaries = [ tokenizer.decode( s, skip_special_tokens=True, clean_up_tokenization_spaces=True ) for s in summaries ] decoded_summaries = [d.replace("", " ") for d in decoded_summaries] metric.add_batch( predictions=decoded_summaries, references=target_batch) # compute et return les ROUGE scores. results = metric.compute() rouge_names = ["rouge1", "rouge2", "rougeL", "rougeLsum"] rouge_dict = dict((rn, results[rn]) for rn in rouge_names) return pd.DataFrame(rouge_dict, index=["T5"]) def convert_ex_to_features(example_batch): """this fonction takes for input a list of inputExemples and convert to InputFeatures""" input_encodings = tokenizer(example_batch['text'], max_length=1024, truncation=True) labels = tokenizer( example_batch["summary"], max_length=128, truncation=True) return { "input_ids": input_encodings["input_ids"], "attention_mask": input_encodings["attention_mask"], "labels": labels["input_ids"], } if __name__ == '__main__': # réalisation des datasets propres train_dataset = datasetmaker('data/train_extract.jsonl') test_dataset = datasetmaker("data/test_extract.jsonl") test_dataset = datasetmaker('data/test_extract.jsonl') dataset = datasets.DatasetDict({'train': train_dataset, 'dev': dev_dataset, 'test': test_dataset}) # définition de device device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # faire appel au model à entrainer tokenizer = AutoTokenizer.from_pretrained('google/mt5-small') mt5_config = AutoConfig.from_pretrained( "google/mt5-small", max_length=128, length_penalty=0.6, no_repeat_ngram_size=2, num_beams=15, ) model = (AutoModelForSeq2SeqLM .from_pretrained('google/mt5-small', config=mt5_config) .to(device)) #convertir les exemples en inputFeatures dataset_pt = dataset.map( convert_ex_to_features, remove_columns=["summary", "text"], batched=True, batch_size=128, ) data_collator = DataCollatorForSeq2Seq( tokenizer, model=model, return_tensors="pt") #définir les paramètres d'entrainement(fine tuning) training_args = Seq2SeqTrainingArguments( output_dir="t5_summary", log_level="error", num_train_epochs=10, learning_rate=5e-4, warmup_steps=0, optim="adafactor", weight_decay=0.01, per_device_train_batch_size=2, per_device_eval_batch_size=1, gradient_accumulation_steps=16, evaluation_strategy="steps", eval_steps=100, predict_with_generate=True, generation_max_length=128, save_steps=500, logging_steps=10, # push_to_hub = True ) #donner au entraineur(trainer) le model # et les éléments nécessaire pour l'entrainement trainer = Seq2SeqTrainer( model=model, args=training_args, data_collator=data_collator, # compute_metrics = calculate_metric, train_dataset=dataset_pt["train"], eval_dataset=dataset_pt["dev"].select(range(10)), tokenizer=tokenizer, ) trainer.train() rouge_metric = evaluate.load("rouge") #évluer ensuite le model selon les résultats d'entrainement score = calculate_metric( test_dataset, rouge_metric, trainer.model, tokenizer, batch_size=2, device=device, column_text="text", column_summary="summary", ) print(score) # Fine Tuning terminés et à sauvgarder # sauvegarder fine-tuned model à local os.makedirs("t5_summary", exist_ok=True) if hasattr(trainer.model, "module"): trainer.model.module.save_pretrained("t5_summary") else: trainer.model.save_pretrained("t5_summary") tokenizer.save_pretrained("t5_summary") # faire appel au model en local model = (AutoModelForSeq2SeqLM .from_pretrained("t5_summary") .to(device)) # mettre en usage : TEST # gen_kwargs = {"length_penalty" : 0.8, "num_beams" : 8, "max_length" : 128} # sample_text = dataset["test"][0]["text"] # reference = dataset["test"][0]["summary"] # pipe = pipeline("summarization", model='./summarization_t5') # print("Text :") # print(sample_text) # print("\nReference Summary :") # print(reference) # print("\nModel Summary :") # print(pipe(sample_text, **gen_kwargs)[0]["summary_text"])