NER_RC / src /scripts /functionsner.py
SantiagoMoreno-Col's picture
Add files to repo
42d6a0f
raw
history blame
16.1 kB
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 11 16:46:45 2022
@author: Santiago Moreno
"""
from upsampling import upsampling_ner
from flair.datasets import ColumnCorpus
from flair.data import Corpus
from flair.trainers import ModelTrainer
from flair.models import SequenceTagger
from flair.embeddings import TransformerWordEmbeddings
from torch.optim.lr_scheduler import OneCycleLR
from flair.data import Sentence
from sklearn.model_selection import StratifiedGroupKFold
from distutils.dir_util import copy_tree
import numpy as np
import torch
import pandas as pd
import json
import os
import operator
import flair
import argparse
default_path = os.path.dirname(os.path.abspath(__file__))
tagger_document = 0
tagger_sentence = 0
def check_create(path):
import os
if not (os.path.isdir(path)):
os.makedirs(path)
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'True','true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'False', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
def copy_data(original_path):
data_folder = default_path + '/../../data/train'
copy_tree(original_path, data_folder)
def characterize_data():
data_folder = default_path + '/../../data/train'
columns = {0: 'text', 1:'ner'}
# init a corpus using column format, data folder and the names of the train, dev and test files
try:
corpus: Corpus = ColumnCorpus(data_folder, columns,
train_file='train.txt',
test_file='test.txt' )
#dev_file='dev.txt')
except:
print('Invalid input document in training')
return 8
# 2. what tag do we want to predict?
tag_type = 'ner'
#tag_dictionary = corpus.make_label_dictionary(label_type=tag_type)
tag_dictionary = corpus.get_label_distribution()
return tag_dictionary
#return corpus
def upsampling_data(entities_to_upsample, probability, entities):
print('-'*20,'upsampling','-'*20)
data_folder = default_path + '/../../data/train'
columns = {'text':0, 'ner':1}
for m in ["SiS","LwTR","MR","SR", "MBT"]:
upsampler = upsampling_ner(data_folder+'/train.txt', entities+['O'], columns)
data, data_labels = upsampler.get_dataset()
new_samples, new_labels = upsampler.upsampling(entities_to_upsample,probability,[m])
data += new_samples
data_labels += new_labels
with open(data_folder+'/train.txt', mode='w', encoding='utf-8') as f:
for l,sentence in enumerate(data):
for j,word in enumerate(sentence):
f.write(word+' '+ data_labels[l][j])
f.write('\n')
if l < (len(data)-1):
f.write('\n')
print('-'*20,'upsampling complete','-'*20)
def usage_cuda(cuda):
if cuda:
flair.device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
if flair.device == torch.device('cpu'): return 'Error handling GPU, CPU will be used'
elif flair.device == torch.device('cuda:0'): return 'GPU detected, GPU will be used'
else:
flair.device = torch.device('cpu')
return 'CPU will be used'
def training_model(name, epochs=20):
#FUNCION
data_folder = default_path + '/../../data/train'
path_model = default_path + '/../../models/{}'.format(name)
if (os.path.isdir(path_model)): print('WARNING, model already exists will be overwritten')
columns = {0: 'text', 1:'ner'}
# init a corpus using column format, data folder and the names of the train, dev and test files
try:
corpus: Corpus = ColumnCorpus(data_folder, columns,
train_file='train.txt',
test_file='test.txt' )
#dev_file='dev.txt')
except:
print('Invalid input document in training')
return 8
# 2. what tag do we want to predict?
tag_type = 'ner'
# 3. make the tag dictionary from the corpus
#tag_dictionary = corpus.make_label_dictionary(label_type=tag_type)
tag_dictionary = corpus.make_label_dictionary(label_type=tag_type)
try:
embeddings = TransformerWordEmbeddings(
model='xlm-roberta-large',
layers="-1",
subtoken_pooling="first",
fine_tune=True,
use_context=True,
)
except:
print('Error while loading embeddings from RoBERTa')
return 5
# 5. initialize bare-bones sequence tagger (no CRF, no RNN, no reprojection)
try:
tagger_train = SequenceTagger(
hidden_size=256,
embeddings=embeddings,
tag_dictionary=tag_dictionary,
tag_type='ner',
use_crf=False,
use_rnn=False,
reproject_embeddings=False,
)
except:
print('Error making tagger')
return 6
# 6. initialize trainer with AdamW optimizer
trainer = ModelTrainer(tagger_train, corpus)
# 7. run training with XLM parameters (20 epochs, small LR)
try:
trainer.train(path_model,
learning_rate=5.0e-6,
mini_batch_size=1,
mini_batch_chunk_size=1,
max_epochs=epochs,
scheduler=OneCycleLR,
embeddings_storage_mode='cpu',
optimizer=torch.optim.AdamW,
)
except:
pass
print('Error training the model, try setting CUDA False')
return 7
print("Model {} trained and saved in {}".format(name,'models/{}'.format(name)))
def tag_sentence(sentence, name):
results={'Sentence_tagged':'', 'Highligth':{}}
Highligth_dict={"text": "", "entities": []}
#--------------Load the trained model-------------------------
path_model = default_path + '/../../models/{}'.format(name)
global tagger_sentence
if (not tagger_sentence):
try:
tagger_sentence = SequenceTagger.load(path_model+'/best-model.pt')
except:
try:
tagger_sentence = SequenceTagger.load(path_model+'/final-model.pt')
except:
print('Invalid model')
return 1
#------------------Tagged sentence---------------------
print('-'*20,'Tagging','-'*20)
sentence_f = Sentence(sentence)
tagger_sentence.predict(sentence_f)
sentence_tokenized = []
Highligth_dict['text'] = sentence_f.to_plain_string()
for indx,token in enumerate(sentence_f.tokens):
t = token.get_label()
if t.value == 'O':
sentence_tokenized += [token.text]
else:
sentence_tokenized += [t.shortstring]
token_info={
'entity': t.value ,
'index' : indx,
'word' : token.text,
'start': token.start_position,
'end' : token.end_position
}
Highligth_dict["entities"].append(token_info)
sen_tagged = ' ' .join(sentence_tokenized)
results['Highligth'] = Highligth_dict
results['Sentence_tagged'] = sen_tagged
print('-'*20,'Tagged complete','-'*20)
return results
def use_model(name, path_data, output_dir):
#--------------Load the trained model-------------------------
path_model = default_path + '/../../models/{}'.format(name)
if not (os.path.isdir(path_model)):
print('Model does not exists')
return 10
if not os.path.isfile(path_data):
print('Input file is not a file')
return 9
global tagger_document
if (not tagger_document):
try:
tagger_document = SequenceTagger.load(path_model+'/best-model.pt')
except:
try:
tagger_document = SequenceTagger.load(path_model+'/final-model.pt')
except:
print('Invalid model')
return 1
#-----------------Load the document-------------------------
try:
data = pd.read_json(path_data, orient ='index', encoding='utf-8')[0]
except:
print('Can\'t open the input file')
return 2
if len(data) <= 0:
print(f"length of document greater than 0 expected, got: {len(data)}")
return 2
try:
sentences=data['sentences']
t = sentences[0]['text']
except:
print('Invalid JSON format in document {}'.format(path_data))
return 3
print('-'*20,'Tagging','-'*20)
#-----------------Tagged the document-------------------------
results = {'text':"", 'text_labeled':"",'sentences':[], 'entities': []}
indx_prev = 0
pos_prev = 0
for s in sentences:
sentence = Sentence(s['text'])
tagger_document.predict(sentence, mini_batch_size = 1)
sen_dict_temp = {'text':sentence.to_plain_string(), 'text_labeled':'', 'tokens':[]}
#return sentence
sentence_tokenized = []
for indx,token in enumerate(sentence.tokens):
token_dict = {'text':token.text, 'label':token.get_label('ner').value}
sen_dict_temp['tokens'].append(token_dict)
t = token.get_label('ner')
if t.value == 'O':
sentence_tokenized += [token.text]
else:
sentence_tokenized += [t.shortstring]
token_info={
'entity': t.value ,
'index' : indx + indx_prev,
'word' : token.text,
'start': token.start_position + pos_prev,
'end' : token.end_position +pos_prev
}
results["entities"].append(token_info)
indx_prev += len(sentence.tokens)
pos_prev += len(sentence.to_plain_string())
sen_tagged = ' ' .join(sentence_tokenized)
sen_dict_temp['text_labeled'] = sen_tagged
results['sentences'].append(sen_dict_temp)
results['text'] += sentence.to_plain_string()
#return sentence
results['text_labeled'] += sen_tagged
#-----------------Save the results-------------------------
try:
with open(output_dir, "w", encoding='utf-8') as write_file:
json.dump(results, write_file)
print('-'*20,'Tagged complete','-'*20)
print('Document tagged saved in {}'.format(output_dir))
except:
print('Error in output file')
return 11
return results
def json_to_txt(path_data_documents):
#-------------List the documents in the path------------
documents=os.listdir(path_data_documents)
if len(documents) <= 0:
print('There are not documents in the folder')
return 4
data_from_documents={'id':[],'document':[],'sentence':[],'word':[],'tag':[]}
#--------------Verify each documment-------------
for num,doc in enumerate(documents):
data=path_data_documents+'/'+doc
df = pd.read_json(data, orient ='index')[0]
try:
sentences = df['sentences']
t = sentences[0]['text']
t = sentences[0]['id']
t = sentences[0]['tokens']
j = t[0]['text']
j = t[0]['begin']
j = t[0]['end']
tags = df['mentions']
if tags:
tg = tags[0]['id']
tg = tags[0]['begin']
tg = tags[0]['end']
tg = tags[0]['type']
except:
print('Invalid JSON input format in document {}'.format(doc))
return 3
#-----------------Organize the data----------------
for s in sentences:
id_senten=s['id']
for tk in s['tokens']:
if len(tk['text'])==1:
#if ord(tk['text'])>=48 and ord(tk['text'])<=57 and ord(tk['text'])>=65 and ord(tk['text'])<=90 and ord(tk['text'])>=97 and ord(tk['text'])<=122:
tk_beg=tk['begin']
tk_end=tk['end']
data_from_documents['id'].append('d'+str(num)+'_'+id_senten)
data_from_documents['document'].append(doc)
data_from_documents['word'].append(tk['text'])
data_from_documents['sentence'].append(s['text'])
data_from_documents['tag'].append('O')
for tg in tags:
if id_senten == tg['id'].split('-')[0] and tk['begin']>=tg['begin'] and tk['begin']<tg['end']:
data_from_documents['tag'][-1]=tg['type']
break
else:
tk_beg=tk['begin']
tk_end=tk['end']
data_from_documents['id'].append('d'+str(num)+'_'+id_senten)
data_from_documents['document'].append(doc)
data_from_documents['word'].append(tk['text'])
data_from_documents['sentence'].append(s['text'])
data_from_documents['tag'].append('O')
for tg in tags:
if id_senten == tg['id'].split('-')[0] and tk['begin']>=tg['begin'] and tk['begin']<tg['end']:
data_from_documents['tag'][-1]=tg['type']
break
X=np.array(data_from_documents['word'])
y=np.array(data_from_documents['tag'])
groups=np.array(data_from_documents['id'])
#-------------------Save the data in CONLL format--------------
group_kfold = StratifiedGroupKFold(n_splits=10, shuffle=True, random_state=42)
group_kfold.get_n_splits(X, y, groups)
for train_index, test_index in group_kfold.split(X, y, groups):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
groups_train, groups_test = groups[train_index], groups[test_index]
break
X_write=[X_train,X_test]
y_write=[y_train,y_test]
groups_write=[groups_train, groups_test]
archivos=['train','test']
for k in range(2):
X_temp = X_write[k]
y_temp = y_write[k]
groups_temp = groups_write[k]
arch=archivos[k]
id_in=groups_temp[0]
data_folder = default_path + '/../../data/train'
check_create(data_folder)
count = 0
with open(data_folder + '/{}.txt'.format(arch), mode='w', encoding='utf-8') as f:
for i in range(len(X_temp)):
if groups_temp[i] != id_in:
id_in=groups_temp[i]
f.write('\n')
count = 0
count += 1
f.write(X_temp[i]+' '+ y_temp[i])
f.write('\n')
if count >= 150:
count = 0
f.write('\n')
# print("Before check")
# checkpoint = "xlm-roberta-large"
# config = AutoConfig.from_pretrained(checkpoint)
# with init_empty_weights():
# model = AutoModelForSequenceClassification.from_config(config)
# print("After check")
# try:
# tagger = load_checkpoint_and_dispatch(model, path_model+'/best-model.pt', device_map="auto")
# except:
# try:
# tagger = load_checkpoint_and_dispatch(model, path_model+'/final-model.pt', device_map="auto")
# except:
# print('Invalid model')
# return 1