# -*- coding: utf-8 -*- """ Created on Tue Oct 11 16:46:45 2022 @author: Santiago Moreno """ from upsampling import upsampling_ner from flair.datasets import ColumnCorpus from flair.data import Corpus from flair.trainers import ModelTrainer from flair.models import SequenceTagger from flair.embeddings import TransformerWordEmbeddings from torch.optim.lr_scheduler import OneCycleLR from flair.data import Sentence from sklearn.model_selection import StratifiedGroupKFold from distutils.dir_util import copy_tree import numpy as np import torch import pandas as pd import json import os import operator import flair import argparse default_path = os.path.dirname(os.path.abspath(__file__)) tagger_document = 0 tagger_sentence = 0 def check_create(path): import os if not (os.path.isdir(path)): os.makedirs(path) def str2bool(v): if isinstance(v, bool): return v if v.lower() in ('yes', 'True','true', 't', 'y', '1'): return True elif v.lower() in ('no', 'False', 'false', 'f', 'n', '0'): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') def copy_data(original_path): data_folder = default_path + '/../../data/NER/train' copy_tree(original_path, data_folder) def characterize_data(): data_folder = default_path + '/../../data/NER/train' columns = {0: 'text', 1:'ner'} # init a corpus using column format, data folder and the names of the train, dev and test files try: corpus: Corpus = ColumnCorpus(data_folder, columns, train_file='train.txt', test_file='test.txt' ) #dev_file='dev.txt') except: print('Invalid input document in training') return 8 # 2. what tag do we want to predict? tag_type = 'ner' #tag_dictionary = corpus.make_label_dictionary(label_type=tag_type) tag_dictionary = corpus.get_label_distribution() return tag_dictionary #return corpus def upsampling_data(entities_to_upsample, probability, entities): print('-'*20,'upsampling','-'*20) data_folder = default_path + '/../../data/NER/train' columns = {'text':0, 'ner':1} for m in ["SiS","LwTR","MR","SR", "MBT"]: upsampler = upsampling_ner(data_folder+'/train.txt', entities+['O'], columns) data, data_labels = upsampler.get_dataset() new_samples, new_labels = upsampler.upsampling(entities_to_upsample,probability,[m]) data += new_samples data_labels += new_labels with open(data_folder+'/train.txt', mode='w', encoding='utf-8') as f: for l,sentence in enumerate(data): for j,word in enumerate(sentence): f.write(word+' '+ data_labels[l][j]) f.write('\n') if l < (len(data)-1): f.write('\n') print('-'*20,'upsampling complete','-'*20) def usage_cuda(cuda): if cuda: flair.device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') if flair.device == torch.device('cpu'): return 'Error handling GPU, CPU will be used' elif flair.device == torch.device('cuda:0'): return 'GPU detected, GPU will be used' else: flair.device = torch.device('cpu') return 'CPU will be used' def training_model(name, epochs=20): #FUNCION data_folder = default_path + '/../../data/NER/train' path_model = default_path + '/../../models/NER/{}'.format(name) if (os.path.isdir(path_model)): print('WARNING, model already exists will be overwritten') columns = {0: 'text', 1:'ner'} # init a corpus using column format, data folder and the names of the train, dev and test files try: corpus: Corpus = ColumnCorpus(data_folder, columns, train_file='train.txt', test_file='test.txt' ) #dev_file='dev.txt') except: print('Invalid input document in training') return 8 # 2. what tag do we want to predict? tag_type = 'ner' # 3. make the tag dictionary from the corpus #tag_dictionary = corpus.make_label_dictionary(label_type=tag_type) tag_dictionary = corpus.make_label_dictionary(label_type=tag_type) try: embeddings = TransformerWordEmbeddings( model='xlm-roberta-large', layers="-1", subtoken_pooling="first", fine_tune=True, use_context=True, ) except: print('Error while loading embeddings from RoBERTa') return 5 # 5. initialize bare-bones sequence tagger (no CRF, no RNN, no reprojection) try: tagger_train = SequenceTagger( hidden_size=256, embeddings=embeddings, tag_dictionary=tag_dictionary, tag_type='ner', use_crf=False, use_rnn=False, reproject_embeddings=False, ) except: print('Error making tagger') return 6 # 6. initialize trainer with AdamW optimizer trainer = ModelTrainer(tagger_train, corpus) # 7. run training with XLM parameters (20 epochs, small LR) try: trainer.train(path_model, learning_rate=5.0e-6, mini_batch_size=1, mini_batch_chunk_size=1, max_epochs=epochs, scheduler=OneCycleLR, embeddings_storage_mode='cpu', optimizer=torch.optim.AdamW, ) except: pass print('Error training the model, try setting CUDA False') return 7 print("Model {} trained and saved in {}".format(name,'models/{}'.format(name))) def tag_sentence(sentence, name): results={'Sentence_tagged':'', 'Highligth':{}} Highligth_dict={"text": "", "entities": []} #--------------Load the trained model------------------------- path_model = default_path + '/../../models/NER/{}'.format(name) global tagger_sentence if (not tagger_sentence): try: tagger_sentence = SequenceTagger.load(path_model+'/best-model.pt') except: try: tagger_sentence = SequenceTagger.load(path_model+'/final-model.pt') except: print('Invalid model') return 1 #------------------Tagged sentence--------------------- print('-'*20,'Tagging','-'*20) sentence_f = Sentence(sentence) tagger_sentence.predict(sentence_f) sentence_tokenized = [] Highligth_dict['text'] = sentence_f.to_plain_string() for indx,token in enumerate(sentence_f.tokens): t = token.get_label() if t.value == 'O': sentence_tokenized += [token.text] else: sentence_tokenized += [t.shortstring] token_info={ 'entity': t.value , 'index' : indx, 'word' : token.text, 'start': token.start_position, 'end' : token.end_position } Highligth_dict["entities"].append(token_info) sen_tagged = ' ' .join(sentence_tokenized) results['Highligth'] = Highligth_dict results['Sentence_tagged'] = sen_tagged print('-'*20,'Tagged complete','-'*20) return results def use_model(name, path_data, output_dir): #--------------Load the trained model------------------------- path_model = default_path + '/../../models/NER/{}'.format(name) if not (os.path.isdir(path_model)): print('Model does not exists') return 10 if not os.path.isfile(path_data): print('Input file is not a file') return 9 global tagger_document if (not tagger_document): try: tagger_document = SequenceTagger.load(path_model+'/best-model.pt') except: try: tagger_document = SequenceTagger.load(path_model+'/final-model.pt') except: print('Invalid model') return 1 #-----------------Load the document------------------------- try: data = pd.read_json(path_data, orient ='index', encoding='utf-8')[0] except: print('Can\'t open the input file') return 2 if len(data) <= 0: print(f"length of document greater than 0 expected, got: {len(data)}") return 2 try: sentences=data['sentences'] t = sentences[0]['text'] except: print('Invalid JSON format in document {}'.format(path_data)) return 3 print('-'*20,'Tagging','-'*20) #-----------------Tagged the document------------------------- results = {'text':"", 'text_labeled':"",'sentences':[], 'entities': []} indx_prev = 0 pos_prev = 0 for s in sentences: sentence = Sentence(s['text']) tagger_document.predict(sentence, mini_batch_size = 1) sen_dict_temp = {'text':sentence.to_plain_string(), 'text_labeled':'', 'tokens':[]} #return sentence sentence_tokenized = [] for indx,token in enumerate(sentence.tokens): token_dict = {'text':token.text, 'label':token.get_label('ner').value} sen_dict_temp['tokens'].append(token_dict) t = token.get_label('ner') if t.value == 'O': sentence_tokenized += [token.text] else: sentence_tokenized += [t.shortstring] token_info={ 'entity': t.value , 'index' : indx + indx_prev, 'word' : token.text, 'start': token.start_position + pos_prev, 'end' : token.end_position +pos_prev } results["entities"].append(token_info) indx_prev += len(sentence.tokens) pos_prev += len(sentence.to_plain_string()) sen_tagged = ' ' .join(sentence_tokenized) sen_dict_temp['text_labeled'] = sen_tagged results['sentences'].append(sen_dict_temp) results['text'] += sentence.to_plain_string() #return sentence results['text_labeled'] += sen_tagged #-----------------Save the results------------------------- try: with open(output_dir, "w", encoding='utf-8') as write_file: json.dump(results, write_file) print('-'*20,'Tagged complete','-'*20) print('Document tagged saved in {}'.format(output_dir)) except: print('Error in output file') return 11 return results def json_to_txt(path_data_documents): #-------------List the documents in the path------------ documents=os.listdir(path_data_documents) if len(documents) <= 0: print('There are not documents in the folder') return 4 data_from_documents={'id':[],'document':[],'sentence':[],'word':[],'tag':[]} #--------------Verify each documment------------- for num,doc in enumerate(documents): data=path_data_documents+'/'+doc df = pd.read_json(data, orient ='index')[0] try: sentences = df['sentences'] t = sentences[0]['text'] t = sentences[0]['id'] t = sentences[0]['tokens'] j = t[0]['text'] j = t[0]['begin'] j = t[0]['end'] tags = df['mentions'] if tags: tg = tags[0]['id'] tg = tags[0]['begin'] tg = tags[0]['end'] tg = tags[0]['type'] except: print('Invalid JSON input format in document {}'.format(doc)) return 3 #-----------------Organize the data---------------- for s in sentences: id_senten=s['id'] for tk in s['tokens']: if len(tk['text'])==1: #if ord(tk['text'])>=48 and ord(tk['text'])<=57 and ord(tk['text'])>=65 and ord(tk['text'])<=90 and ord(tk['text'])>=97 and ord(tk['text'])<=122: tk_beg=tk['begin'] tk_end=tk['end'] data_from_documents['id'].append('d'+str(num)+'_'+id_senten) data_from_documents['document'].append(doc) data_from_documents['word'].append(tk['text']) data_from_documents['sentence'].append(s['text']) data_from_documents['tag'].append('O') for tg in tags: if id_senten == tg['id'].split('-')[0] and tk['begin']>=tg['begin'] and tk['begin']=tg['begin'] and tk['begin']= 150: count = 0 f.write('\n') # print("Before check") # checkpoint = "xlm-roberta-large" # config = AutoConfig.from_pretrained(checkpoint) # with init_empty_weights(): # model = AutoModelForSequenceClassification.from_config(config) # print("After check") # try: # tagger = load_checkpoint_and_dispatch(model, path_model+'/best-model.pt', device_map="auto") # except: # try: # tagger = load_checkpoint_and_dispatch(model, path_model+'/final-model.pt', device_map="auto") # except: # print('Invalid model') # return 1