File size: 7,521 Bytes
a01f2fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
This module contains utils for preprocessing the text before converting it to embeddings.

- TextPreprocessorBuilder preprocesses individual strings.
    * lowering cases
    * converting numbers to words or characters
    * merging and stripping spaces
    * removing punctuation
    * removing stop words
    * lemmatizing
    * removing specific parts of speech (adverbs and interjections)
- TextSummarizer extracts the most important sentences from a long string using text-ranking.
"""
import pytextrank
import string
import spacy
import math
import nltk
import re

from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from num2words import num2words


class TextPreprocessorBuilder:
     # Define class variables as None initially
    _stop_words = set(stopwords.words('english'))
    _lemmatizer = WordNetLemmatizer()
    
    # Some of the functions are expensive. We cache the results.
    _lemmatizer_cache = {}
    _pos_remove_cache = {}


    def __init__(self, text: str):
        self.text = text


    def to_lower(self):
        # Match both words and non-word characters
        tokens = re.findall(r'\b\w+\b|\W+', self.text)
        for i, token in enumerate(tokens):
            # Check if token is a word
            if re.match(r'^\w+$', token):
                # Check if token is not an abbreviation or constant
                if not re.match(r'^[A-Z]+$', token) and not re.match(r'^[A-Z_]+$', token):
                    tokens[i] = token.lower()
        self.text = "".join(tokens)
        return self


    def num_to_word(self, min_len: int = 1):
        # Match both words and non-word characters
        tokens = re.findall(r'\b\w+\b|\W+', self.text)
        for i, token in enumerate(tokens):
            # Check if token is a number of length `min_len` or more
            if token.isdigit() and len(token) >= min_len:
                # This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
                # 740700 will become "seven hundred and forty thousand seven hundred".
                tokens[i] = num2words(int(token)).replace(",","") # Remove commas from num2words.
        self.text = "".join(tokens)
        return self


    def num_to_char_long(self, min_len: int = 1):
        # Match both words and non-word characters
        tokens = re.findall(r'\b\w+\b|\W+', self.text)
        for i, token in enumerate(tokens):
            # Check if token is a number of length `min_len` or more
            if token.isdigit() and len(token) >= min_len:
                # This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
                # 740700 will become HHHHHHEEEEEAAAAHHHAAA
                convert_token = lambda token: ''.join((chr(int(digit) + 65) * (i + 1)) for i, digit in enumerate(token[::-1]))[::-1]
                tokens[i] = convert_token(tokens[i])
        self.text = "".join(tokens)
        return self
    
    def num_to_char(self, min_len: int = 1):
        # Match both words and non-word characters
        tokens = re.findall(r'\b\w+\b|\W+', self.text)
        for i, token in enumerate(tokens):
            # Check if token is a number of length `min_len` or more
            if token.isdigit() and len(token) >= min_len:
                # This is done to pay better attention to numbers (e.g. ticket numbers, thread numbers, post numbers)
                # 740700 will become HEAHAA
                tokens[i] = ''.join(chr(int(digit) + 65) for digit in token)
        self.text = "".join(tokens)
        return self
    
    def merge_spaces(self):
        self.text = re.sub(' +', ' ', self.text)
        return self
    
    def strip(self):
        self.text = self.text.strip()
        return self
        
    def remove_punctuation(self):
        self.text = self.text.translate(str.maketrans('', '', string.punctuation))
        return self

    def remove_stopwords(self):
        self.text = "".join([word for word in re.findall(r'\b\w+\b|\W+', self.text) if word not in TextPreprocessorBuilder._stop_words])
        return self
    
    def remove_specific_pos(self):
        """
        In the English language, adverbs and interjections rarely provide meaningul information.
        Removing them improves the embedding precision. Don't tell JK Rowling, though.
        """
        processed_text = TextPreprocessorBuilder._pos_remove_cache.get(self.text)
        if processed_text:
            self.text = processed_text
            return self

        # Match both words and non-word characters
        tokens = re.findall(r'\b\w+\b|\W+', self.text)

        # Exclude adverbs and interjections
        excluded_tags = ['RB', 'RBR', 'RBS', 'UH']

        for i, token in enumerate(tokens):
            # Check if token is a word
            if re.match(r'^\w+$', token):
                # Part-of-speech tag the word
                pos = nltk.pos_tag([token])[0][1]
                # If the word's POS tag is in the excluded list, remove the word
                if pos in excluded_tags:
                    tokens[i] = ''

        new_text = "".join(tokens)
        TextPreprocessorBuilder._pos_remove_cache[self.text] = new_text
        self.text = new_text

        return self

    def lemmatize(self):
        processed_text = TextPreprocessorBuilder._lemmatizer_cache.get(self.text)
        if processed_text:
            self.text = processed_text
            return self
        
        new_text = "".join([TextPreprocessorBuilder._lemmatizer.lemmatize(word) for word in re.findall(r'\b\w+\b|\W+', self.text)])
        TextPreprocessorBuilder._lemmatizer_cache[self.text] = new_text
        self.text = new_text

        return self

    def build(self):
        return self.text

class TextSummarizer:
    _nlp_pipeline = None
    _cache = {}

    @staticmethod
    def _load_nlp_pipeline():
        # Lazy-load it.
        if TextSummarizer._nlp_pipeline is None:
            TextSummarizer._nlp_pipeline = spacy.load('en_core_web_sm')
            TextSummarizer._nlp_pipeline.add_pipe("textrank", last=True)
        return TextSummarizer._nlp_pipeline

    @staticmethod
    def process_long_text(text: str, min_num_sent: int) -> list[str]:
        """
        This function applies a text summarization process on a given text string, extracting 
        the most important sentences based on the principle that 20% of the content is responsible
        for 80% of the meaning (the Pareto Principle).

        Returns:
        list: A list of the most important sentences
        """

        # Attempt to get the result from cache
        cache_key = (text, min_num_sent)
        cached_result = TextSummarizer._cache.get(cache_key, None)
        if cached_result is not None:
            return cached_result

        nlp_pipeline = TextSummarizer._load_nlp_pipeline()
        doc = nlp_pipeline(text)

        num_sent = len(list(doc.sents))
        result = []

        if num_sent >= min_num_sent:

            limit_phrases = math.ceil(len(doc._.phrases) * 0.20)  # 20% of the phrases, rounded up
            limit_sentences = math.ceil(num_sent * 0.20)  # 20% of the sentences, rounded up
            result = [str(sent) for sent in doc._.textrank.summary(limit_phrases=limit_phrases, limit_sentences=limit_sentences)]

        else:
            result = [text]
        
        # Store the result in cache before returning it
        TextSummarizer._cache[cache_key] = result
        return result