File size: 5,103 Bytes
d56e301
 
b5bc1cb
c95dc41
 
 
b5bc1cb
 
 
 
d56e301
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#@title NLP Entities code
import re
import spacy
FILT_GROUPS = ["CARDINAL", "TIME", "DATE", "PERCENT", "MONEY", "QUANTITY", "ORDINAL"]
POS = ["NOUN", "PROPN", "VERB"]

nlp = spacy.load("en_core_web_sm")
nlp.add_pipe("textrank", last=True, config={"pos_kept": POS, "token_lookback": 3})
all_stopwords = nlp.Defaults.stop_words

def er_data_cleaning(raw: str) -> str:
    """
    Strip off text for html tags and characters.

    :param raw:
    :return: str: stripped string
    """
    # HTML tags
    if raw is None:
        raw = ""

    html_removed = re.sub(r"<[^<]+?>", " ", raw)
    # Remove /
    raw_line_removed = str(html_removed).replace("/", " ")
    # removing special entities like &quot; , &amp; etc.
    special_entites_removed = re.sub(r"&[\w]+;", "", raw_line_removed)
    # removing unicode characters like \u200c, \u200E etc.
    unicode_chars_removed = special_entites_removed.encode("ascii", "ignore").decode("utf-8")
    unicode_chars_removed = re.sub(r"\\u[\d]{3}[\w]", " ", unicode_chars_removed)

    return unicode_chars_removed.strip()


def get_clean_text_blobs(text_blobs):
    """
    Clean-up text blobs.

    :param text_blobs: list
    :return:cleaned_text_blobs: list
    """
    cleaned_text_blobs = []
    for text_blob in text_blobs:
        cleaned_text_blobs.append(er_data_cleaning(raw=text_blob))
    return cleaned_text_blobs


def get_phrases_pagerank(text_blobs, limit=1, token_len_min=2, token_len_max=3):
    """
    Return key phrases based on PageRank.

    :param token_length: Length of the token in the key phrases
    :param text_blobs: List of text
    # TODO: limit param is redundant because we are returning all the key phrases. Probably get rid of it
    :param limit: percentage limit on total key phrases returned
    :return: set(key_phrases)
    """
    try:
        assert 0 <= limit <= 1
        text = ". ".join(text_blobs)
        doc = nlp(text)
        # doc._.textrank.pos_kept = POS
        # doc._.textrank.token_lookback = token_lookback

        total_len = len(doc._.phrases)
        return_phrases = int(total_len * limit)

        # examine the top-ranked phrases in the document
        out_phrases = dict()

        for p in doc._.phrases[:return_phrases]:

            # adding token_length would reduce total score from 100
            tokenized_kp = p.text.split()
            filtered_tokens = [word for word in tokenized_kp if word not in all_stopwords]
            kp_length = len(filtered_tokens)
            if p.rank > 0 and kp_length <= token_len_max and kp_length >= token_len_min:
                joined_kp = " ".join(filtered_tokens)
                if joined_kp in out_phrases:
                    out_phrases[joined_kp]["weight"] += p.rank
                    out_phrases[joined_kp]["kp_length"] = kp_length
                else:

                    # count is dummy value
                    result_dict = {"weight": p.rank, "kp_length": kp_length, "count": 1}
                    out_phrases[joined_kp] = result_dict

    except AssertionError as err:
        raise err
    return out_phrases


def dict_normalization(interest_dictionary, target=1.0):
    """
    Normalize the dictionary weights to target.

    :param interest_dictionary: List of key phrases and scores
    :param target: normalization score
    :return: normalized interest dictionary
    """
    curr_score = 0
    # exclude normalization if no output returned from pagerank
    if len(interest_dictionary) > 0:
        for kp_info in interest_dictionary.values():
            curr_score += kp_info["weight"]
        factor = target / curr_score
        for kp, _ in interest_dictionary.items():
            interest_dictionary[kp]["weight"] = round(interest_dictionary[kp]["weight"] * factor, 4)
    return interest_dictionary


def get_ners(text_blobs):
    """
    Get named entities.

    :param text_blobs: List of text blobs
    :return: named_entities
    """
    k_ners = dict()
    for text_blob in text_blobs:
        doc = nlp(text_blob)

        for ent in doc.ents:
            if ent.label_ not in FILT_GROUPS:
                # increment count associated with named entity
                if ent.text in k_ners:
                    k_ners[ent.text] += 1
                else:
                    k_ners[ent.text] = 1
        return k_ners


def return_ners_and_kp(text_blobs, ret_ne=False):
    """
    Return named entities and key phrases corresponding to text blob.

    :param ret_ne: Boolean to return named entities
    :param text_blobs: list of text blobs
    :return: dict(): {NE: {tag1:count, tag2:count},
                     KP: {tag3:{weight: float, kp_length:count, count: int},
                          tag4:{weight: float, kp_length:count, count: int}}
    """
    return_tags = dict()
    cleaned_text_blobs = get_clean_text_blobs(text_blobs=text_blobs)
    kps = get_phrases_pagerank(text_blobs=cleaned_text_blobs)
    kps = dict_normalization(kps)
    return_tags["KP"] = kps
    if ret_ne:
        ners = get_ners(text_blobs=cleaned_text_blobs)
        return_tags["NE"] = ners
    return return_tags