bert_auto_tagging / nlp_entities.py
vives's picture
Update nlp_entities.py
c95dc41
#@title NLP Entities code
import re
import spacy
FILT_GROUPS = ["CARDINAL", "TIME", "DATE", "PERCENT", "MONEY", "QUANTITY", "ORDINAL"]
POS = ["NOUN", "PROPN", "VERB"]
nlp = spacy.load("en_core_web_sm")
nlp.add_pipe("textrank", last=True, config={"pos_kept": POS, "token_lookback": 3})
all_stopwords = nlp.Defaults.stop_words
def er_data_cleaning(raw: str) -> str:
"""
Strip off text for html tags and characters.
:param raw:
:return: str: stripped string
"""
# HTML tags
if raw is None:
raw = ""
html_removed = re.sub(r"<[^<]+?>", " ", raw)
# Remove /
raw_line_removed = str(html_removed).replace("/", " ")
# removing special entities like &quot; , &amp; etc.
special_entites_removed = re.sub(r"&[\w]+;", "", raw_line_removed)
# removing unicode characters like \u200c, \u200E etc.
unicode_chars_removed = special_entites_removed.encode("ascii", "ignore").decode("utf-8")
unicode_chars_removed = re.sub(r"\\u[\d]{3}[\w]", " ", unicode_chars_removed)
return unicode_chars_removed.strip()
def get_clean_text_blobs(text_blobs):
"""
Clean-up text blobs.
:param text_blobs: list
:return:cleaned_text_blobs: list
"""
cleaned_text_blobs = []
for text_blob in text_blobs:
cleaned_text_blobs.append(er_data_cleaning(raw=text_blob))
return cleaned_text_blobs
def get_phrases_pagerank(text_blobs, limit=1, token_len_min=2, token_len_max=3):
"""
Return key phrases based on PageRank.
:param token_length: Length of the token in the key phrases
:param text_blobs: List of text
# TODO: limit param is redundant because we are returning all the key phrases. Probably get rid of it
:param limit: percentage limit on total key phrases returned
:return: set(key_phrases)
"""
try:
assert 0 <= limit <= 1
text = ". ".join(text_blobs)
doc = nlp(text)
# doc._.textrank.pos_kept = POS
# doc._.textrank.token_lookback = token_lookback
total_len = len(doc._.phrases)
return_phrases = int(total_len * limit)
# examine the top-ranked phrases in the document
out_phrases = dict()
for p in doc._.phrases[:return_phrases]:
# adding token_length would reduce total score from 100
tokenized_kp = p.text.split()
filtered_tokens = [word for word in tokenized_kp if word not in all_stopwords]
kp_length = len(filtered_tokens)
if p.rank > 0 and kp_length <= token_len_max and kp_length >= token_len_min:
joined_kp = " ".join(filtered_tokens)
if joined_kp in out_phrases:
out_phrases[joined_kp]["weight"] += p.rank
out_phrases[joined_kp]["kp_length"] = kp_length
else:
# count is dummy value
result_dict = {"weight": p.rank, "kp_length": kp_length, "count": 1}
out_phrases[joined_kp] = result_dict
except AssertionError as err:
raise err
return out_phrases
def dict_normalization(interest_dictionary, target=1.0):
"""
Normalize the dictionary weights to target.
:param interest_dictionary: List of key phrases and scores
:param target: normalization score
:return: normalized interest dictionary
"""
curr_score = 0
# exclude normalization if no output returned from pagerank
if len(interest_dictionary) > 0:
for kp_info in interest_dictionary.values():
curr_score += kp_info["weight"]
factor = target / curr_score
for kp, _ in interest_dictionary.items():
interest_dictionary[kp]["weight"] = round(interest_dictionary[kp]["weight"] * factor, 4)
return interest_dictionary
def get_ners(text_blobs):
"""
Get named entities.
:param text_blobs: List of text blobs
:return: named_entities
"""
k_ners = dict()
for text_blob in text_blobs:
doc = nlp(text_blob)
for ent in doc.ents:
if ent.label_ not in FILT_GROUPS:
# increment count associated with named entity
if ent.text in k_ners:
k_ners[ent.text] += 1
else:
k_ners[ent.text] = 1
return k_ners
def return_ners_and_kp(text_blobs, ret_ne=False):
"""
Return named entities and key phrases corresponding to text blob.
:param ret_ne: Boolean to return named entities
:param text_blobs: list of text blobs
:return: dict(): {NE: {tag1:count, tag2:count},
KP: {tag3:{weight: float, kp_length:count, count: int},
tag4:{weight: float, kp_length:count, count: int}}
"""
return_tags = dict()
cleaned_text_blobs = get_clean_text_blobs(text_blobs=text_blobs)
kps = get_phrases_pagerank(text_blobs=cleaned_text_blobs)
kps = dict_normalization(kps)
return_tags["KP"] = kps
if ret_ne:
ners = get_ners(text_blobs=cleaned_text_blobs)
return_tags["NE"] = ners
return return_tags