MedNERN-CR-JA / predict.py
gabrielandrade2's picture
Add normalization methods
c837b79
raw
history blame
5.44 kB
# %%
import argparse
from tqdm import tqdm
import unicodedata
import re
import pickle
import torch
import NER_medNLP as ner
from EntityNormalizer import EntityNormalizer, DiseaseDict, DrugDict
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# %% global変数として使う
dict_key = {}
# %%
def to_xml(data):
with open("key_attr.pkl", "rb") as tf:
key_attr = pickle.load(tf)
text = data['text']
count = 0
for i, entities in enumerate(data['entities_predicted']):
if entities == "":
return
span = entities['span']
type_id = id_to_tags[entities['type_id']].split('_')
tag = type_id[0]
if not type_id[1] == "":
attr = ' ' + value_to_key(type_id[1], key_attr) + '=' + '"' + type_id[1] + '"'
else:
attr = ""
if 'norm' in entities:
attr = attr + ' norm="' + str(entities['norm']) + '"'
add_tag = "<" + str(tag) + str(attr) + ">"
text = text[:span[0] + count] + add_tag + text[span[0] + count:]
count += len(add_tag)
add_tag = "</" + str(tag) + ">"
text = text[:span[1] + count] + add_tag + text[span[1] + count:]
count += len(add_tag)
return text
def predict_entities(modelpath, sentences_list, len_num_entity_type):
# model = ner.BertForTokenClassification_pl.load_from_checkpoint(
# checkpoint_path = modelpath + ".ckpt"
# )
# bert_tc = model.bert_tc.cuda()
model = ner.BertForTokenClassification_pl(modelpath, num_labels=81, lr=1e-5)
bert_tc = model.bert_tc.to(device)
MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'
tokenizer = ner.NER_tokenizer_BIO.from_pretrained(
MODEL_NAME,
num_entity_type=len_num_entity_type # Entityの数を変え忘れないように!
)
# entities_list = [] # 正解の固有表現を追加していく
entities_predicted_list = [] # 抽出された固有表現を追加していく
text_entities_set = []
for dataset in sentences_list:
text_entities = []
for sample in tqdm(dataset):
text = sample
encoding, spans = tokenizer.encode_plus_untagged(
text, return_tensors='pt'
)
encoding = {k: v.to(device) for k, v in encoding.items()}
with torch.no_grad():
output = bert_tc(**encoding)
scores = output.logits
scores = scores[0].cpu().numpy().tolist()
# 分類スコアを固有表現に変換する
entities_predicted = tokenizer.convert_bert_output_to_entities(
text, scores, spans
)
# entities_list.append(sample['entities'])
entities_predicted_list.append(entities_predicted)
text_entities.append({'text': text, 'entities_predicted': entities_predicted})
text_entities_set.append(text_entities)
return text_entities_set
def combine_sentences(text_entities_set, insert: str):
documents = []
for text_entities in tqdm(text_entities_set):
document = []
for t in text_entities:
document.append(to_xml(t))
documents.append('\n'.join(document))
return documents
def value_to_key(value, key_attr): # attributeから属性名を取得
global dict_key
if dict_key.get(value) != None:
return dict_key[value]
for k in key_attr.keys():
for v in key_attr[k]:
if value == v:
dict_key[v] = k
return k
# %%
def normalize_entities(text_entities_set):
disease_normalizer = EntityNormalizer(DiseaseDict(), matching_threshold=50)
drug_normalizer = EntityNormalizer(DrugDict(), matching_threshold=50)
for entry in text_entities_set:
for text_entities in entry:
entities = text_entities['entities_predicted']
for entity in entities:
tag = id_to_tags[entity['type_id']].split('_')[0]
normalizer = drug_normalizer if tag == 'm-key' \
else disease_normalizer if tag == 'd' \
else None
if normalizer is None:
continue
normalization, score = normalizer.normalize(entity['name'])
entity['norm'] = str(normalization)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Predict entities from text')
parser.add_argument('--normalize', action=argparse.BooleanOptionalAction, help='Enable entity normalization')
args = parser.parse_args()
with open("id_to_tags.pkl", "rb") as tf:
id_to_tags = pickle.load(tf)
with open("key_attr.pkl", "rb") as tf:
key_attr = pickle.load(tf)
with open('text.txt') as f:
articles_raw = f.read()
article_norm = unicodedata.normalize('NFKC', articles_raw)
sentences_raw = [s for s in re.split(r'\n', articles_raw) if s != '']
sentences_norm = [s for s in re.split(r'\n', article_norm) if s != '']
text_entities_set = predict_entities("sociocom/RealMedNLP_CR_JA", [sentences_norm], len(id_to_tags))
for i, texts_ent in enumerate(text_entities_set[0]):
texts_ent['text'] = sentences_raw[i]
if args.normalize:
normalize_entities(text_entities_set)
documents = combine_sentences(text_entities_set, '\n')
print(documents[0])