Spaces:
Runtime error
Runtime error
File size: 5,103 Bytes
d56e301 b5bc1cb c95dc41 b5bc1cb d56e301 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
#@title NLP Entities code
import re
import spacy
FILT_GROUPS = ["CARDINAL", "TIME", "DATE", "PERCENT", "MONEY", "QUANTITY", "ORDINAL"]
POS = ["NOUN", "PROPN", "VERB"]
nlp = spacy.load("en_core_web_sm")
nlp.add_pipe("textrank", last=True, config={"pos_kept": POS, "token_lookback": 3})
all_stopwords = nlp.Defaults.stop_words
def er_data_cleaning(raw: str) -> str:
"""
Strip off text for html tags and characters.
:param raw:
:return: str: stripped string
"""
# HTML tags
if raw is None:
raw = ""
html_removed = re.sub(r"<[^<]+?>", " ", raw)
# Remove /
raw_line_removed = str(html_removed).replace("/", " ")
# removing special entities like " , & etc.
special_entites_removed = re.sub(r"&[\w]+;", "", raw_line_removed)
# removing unicode characters like \u200c, \u200E etc.
unicode_chars_removed = special_entites_removed.encode("ascii", "ignore").decode("utf-8")
unicode_chars_removed = re.sub(r"\\u[\d]{3}[\w]", " ", unicode_chars_removed)
return unicode_chars_removed.strip()
def get_clean_text_blobs(text_blobs):
"""
Clean-up text blobs.
:param text_blobs: list
:return:cleaned_text_blobs: list
"""
cleaned_text_blobs = []
for text_blob in text_blobs:
cleaned_text_blobs.append(er_data_cleaning(raw=text_blob))
return cleaned_text_blobs
def get_phrases_pagerank(text_blobs, limit=1, token_len_min=2, token_len_max=3):
"""
Return key phrases based on PageRank.
:param token_length: Length of the token in the key phrases
:param text_blobs: List of text
# TODO: limit param is redundant because we are returning all the key phrases. Probably get rid of it
:param limit: percentage limit on total key phrases returned
:return: set(key_phrases)
"""
try:
assert 0 <= limit <= 1
text = ". ".join(text_blobs)
doc = nlp(text)
# doc._.textrank.pos_kept = POS
# doc._.textrank.token_lookback = token_lookback
total_len = len(doc._.phrases)
return_phrases = int(total_len * limit)
# examine the top-ranked phrases in the document
out_phrases = dict()
for p in doc._.phrases[:return_phrases]:
# adding token_length would reduce total score from 100
tokenized_kp = p.text.split()
filtered_tokens = [word for word in tokenized_kp if word not in all_stopwords]
kp_length = len(filtered_tokens)
if p.rank > 0 and kp_length <= token_len_max and kp_length >= token_len_min:
joined_kp = " ".join(filtered_tokens)
if joined_kp in out_phrases:
out_phrases[joined_kp]["weight"] += p.rank
out_phrases[joined_kp]["kp_length"] = kp_length
else:
# count is dummy value
result_dict = {"weight": p.rank, "kp_length": kp_length, "count": 1}
out_phrases[joined_kp] = result_dict
except AssertionError as err:
raise err
return out_phrases
def dict_normalization(interest_dictionary, target=1.0):
"""
Normalize the dictionary weights to target.
:param interest_dictionary: List of key phrases and scores
:param target: normalization score
:return: normalized interest dictionary
"""
curr_score = 0
# exclude normalization if no output returned from pagerank
if len(interest_dictionary) > 0:
for kp_info in interest_dictionary.values():
curr_score += kp_info["weight"]
factor = target / curr_score
for kp, _ in interest_dictionary.items():
interest_dictionary[kp]["weight"] = round(interest_dictionary[kp]["weight"] * factor, 4)
return interest_dictionary
def get_ners(text_blobs):
"""
Get named entities.
:param text_blobs: List of text blobs
:return: named_entities
"""
k_ners = dict()
for text_blob in text_blobs:
doc = nlp(text_blob)
for ent in doc.ents:
if ent.label_ not in FILT_GROUPS:
# increment count associated with named entity
if ent.text in k_ners:
k_ners[ent.text] += 1
else:
k_ners[ent.text] = 1
return k_ners
def return_ners_and_kp(text_blobs, ret_ne=False):
"""
Return named entities and key phrases corresponding to text blob.
:param ret_ne: Boolean to return named entities
:param text_blobs: list of text blobs
:return: dict(): {NE: {tag1:count, tag2:count},
KP: {tag3:{weight: float, kp_length:count, count: int},
tag4:{weight: float, kp_length:count, count: int}}
"""
return_tags = dict()
cleaned_text_blobs = get_clean_text_blobs(text_blobs=text_blobs)
kps = get_phrases_pagerank(text_blobs=cleaned_text_blobs)
kps = dict_normalization(kps)
return_tags["KP"] = kps
if ret_ne:
ners = get_ners(text_blobs=cleaned_text_blobs)
return_tags["NE"] = ners
return return_tags |