#@title NLP Entities code import re import spacy FILT_GROUPS = ["CARDINAL", "TIME", "DATE", "PERCENT", "MONEY", "QUANTITY", "ORDINAL"] POS = ["NOUN", "PROPN", "VERB"] nlp = spacy.load("en_core_web_sm") nlp.add_pipe("textrank", last=True, config={"pos_kept": POS, "token_lookback": 3}) all_stopwords = nlp.Defaults.stop_words def er_data_cleaning(raw: str) -> str: """ Strip off text for html tags and characters. :param raw: :return: str: stripped string """ # HTML tags if raw is None: raw = "" html_removed = re.sub(r"<[^<]+?>", " ", raw) # Remove / raw_line_removed = str(html_removed).replace("/", " ") # removing special entities like " , & etc. special_entites_removed = re.sub(r"&[\w]+;", "", raw_line_removed) # removing unicode characters like \u200c, \u200E etc. unicode_chars_removed = special_entites_removed.encode("ascii", "ignore").decode("utf-8") unicode_chars_removed = re.sub(r"\\u[\d]{3}[\w]", " ", unicode_chars_removed) return unicode_chars_removed.strip() def get_clean_text_blobs(text_blobs): """ Clean-up text blobs. :param text_blobs: list :return:cleaned_text_blobs: list """ cleaned_text_blobs = [] for text_blob in text_blobs: cleaned_text_blobs.append(er_data_cleaning(raw=text_blob)) return cleaned_text_blobs def get_phrases_pagerank(text_blobs, limit=1, token_len_min=2, token_len_max=3): """ Return key phrases based on PageRank. :param token_length: Length of the token in the key phrases :param text_blobs: List of text # TODO: limit param is redundant because we are returning all the key phrases. Probably get rid of it :param limit: percentage limit on total key phrases returned :return: set(key_phrases) """ try: assert 0 <= limit <= 1 text = ". ".join(text_blobs) doc = nlp(text) # doc._.textrank.pos_kept = POS # doc._.textrank.token_lookback = token_lookback total_len = len(doc._.phrases) return_phrases = int(total_len * limit) # examine the top-ranked phrases in the document out_phrases = dict() for p in doc._.phrases[:return_phrases]: # adding token_length would reduce total score from 100 tokenized_kp = p.text.split() filtered_tokens = [word for word in tokenized_kp if word not in all_stopwords] kp_length = len(filtered_tokens) if p.rank > 0 and kp_length <= token_len_max and kp_length >= token_len_min: joined_kp = " ".join(filtered_tokens) if joined_kp in out_phrases: out_phrases[joined_kp]["weight"] += p.rank out_phrases[joined_kp]["kp_length"] = kp_length else: # count is dummy value result_dict = {"weight": p.rank, "kp_length": kp_length, "count": 1} out_phrases[joined_kp] = result_dict except AssertionError as err: raise err return out_phrases def dict_normalization(interest_dictionary, target=1.0): """ Normalize the dictionary weights to target. :param interest_dictionary: List of key phrases and scores :param target: normalization score :return: normalized interest dictionary """ curr_score = 0 # exclude normalization if no output returned from pagerank if len(interest_dictionary) > 0: for kp_info in interest_dictionary.values(): curr_score += kp_info["weight"] factor = target / curr_score for kp, _ in interest_dictionary.items(): interest_dictionary[kp]["weight"] = round(interest_dictionary[kp]["weight"] * factor, 4) return interest_dictionary def get_ners(text_blobs): """ Get named entities. :param text_blobs: List of text blobs :return: named_entities """ k_ners = dict() for text_blob in text_blobs: doc = nlp(text_blob) for ent in doc.ents: if ent.label_ not in FILT_GROUPS: # increment count associated with named entity if ent.text in k_ners: k_ners[ent.text] += 1 else: k_ners[ent.text] = 1 return k_ners def return_ners_and_kp(text_blobs, ret_ne=False): """ Return named entities and key phrases corresponding to text blob. :param ret_ne: Boolean to return named entities :param text_blobs: list of text blobs :return: dict(): {NE: {tag1:count, tag2:count}, KP: {tag3:{weight: float, kp_length:count, count: int}, tag4:{weight: float, kp_length:count, count: int}} """ return_tags = dict() cleaned_text_blobs = get_clean_text_blobs(text_blobs=text_blobs) kps = get_phrases_pagerank(text_blobs=cleaned_text_blobs) kps = dict_normalization(kps) return_tags["KP"] = kps if ret_ne: ners = get_ners(text_blobs=cleaned_text_blobs) return_tags["NE"] = ners return return_tags