|
from typing import List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
from transformers import (AlbertModel, AlbertTokenizer, BartModel, |
|
BartTokenizer, BertModel, BertTokenizer, |
|
CamembertModel, CamembertTokenizer, CTRLModel, |
|
CTRLTokenizer, DistilBertModel, DistilBertTokenizer, |
|
GPT2Model, GPT2Tokenizer, LongformerModel, |
|
LongformerTokenizer, OpenAIGPTModel, |
|
OpenAIGPTTokenizer, PreTrainedModel, |
|
PreTrainedTokenizer, RobertaModel, RobertaTokenizer, |
|
TransfoXLModel, TransfoXLTokenizer, XLMModel, |
|
XLMTokenizer, XLNetModel, XLNetTokenizer) |
|
|
|
from extractive_summarizer.bert_parent import BertParent |
|
from extractive_summarizer.cluster_features import ClusterFeatures |
|
from extractive_summarizer.sentence_handler import SentenceHandler |
|
|
|
|
|
class ModelProcessor(object): |
|
aggregate_map = { |
|
'mean': np.mean, |
|
'min': np.min, |
|
'median': np.median, |
|
'max': np.max, |
|
} |
|
|
|
def __init__( |
|
self, |
|
model: str = 'bert-large-uncased', |
|
custom_model: PreTrainedModel = None, |
|
custom_tokenizer: PreTrainedTokenizer = None, |
|
hidden: Union[List[int], int] = -2, |
|
reduce_option: str = 'mean', |
|
sentence_handler: SentenceHandler = SentenceHandler(), |
|
random_state: int = 12345, |
|
hidden_concat: bool = False, |
|
gpu_id: int = 0, |
|
): |
|
""" |
|
This is the parent Bert Summarizer model. New methods should implement this class. |
|
|
|
:param model: This parameter is associated with the inherit string parameters from the transformers library. |
|
:param custom_model: If you have a pre-trained model, you can add the model class here. |
|
:param custom_tokenizer: If you have a custom tokenizer, you can add the tokenizer here. |
|
:param hidden: This signifies which layer(s) of the BERT model you would like to use as embeddings. |
|
:param reduce_option: Given the output of the bert model, this param determines how you want to reduce results. |
|
:param sentence_handler: The handler to process sentences. If want to use coreference, instantiate and pass. |
|
CoreferenceHandler instance |
|
:param random_state: The random state to reproduce summarizations. |
|
:param hidden_concat: Whether or not to concat multiple hidden layers. |
|
:param gpu_id: GPU device index if CUDA is available. |
|
""" |
|
np.random.seed(random_state) |
|
self.model = BertParent(model, custom_model, custom_tokenizer, gpu_id) |
|
self.hidden = hidden |
|
self.reduce_option = reduce_option |
|
self.sentence_handler = sentence_handler |
|
self.random_state = random_state |
|
self.hidden_concat = hidden_concat |
|
|
|
def cluster_runner( |
|
self, |
|
content: List[str], |
|
ratio: float = 0.2, |
|
algorithm: str = 'kmeans', |
|
use_first: bool = True, |
|
num_sentences: int = None |
|
) -> Tuple[List[str], np.ndarray]: |
|
""" |
|
Runs the cluster algorithm based on the hidden state. Returns both the embeddings and sentences. |
|
|
|
:param content: Content list of sentences. |
|
:param ratio: The ratio to use for clustering. |
|
:param algorithm: Type of algorithm to use for clustering. |
|
:param use_first: Return the first sentence in the output (helpful for news stories, etc). |
|
:param num_sentences: Number of sentences to use for summarization. |
|
:return: A tuple of summarized sentences and embeddings |
|
""" |
|
if num_sentences is not None: |
|
num_sentences = num_sentences if use_first else num_sentences |
|
|
|
hidden = self.model( |
|
content, self.hidden, self.reduce_option, hidden_concat=self.hidden_concat) |
|
hidden_args = ClusterFeatures( |
|
hidden, algorithm, random_state=self.random_state).cluster(ratio, num_sentences) |
|
|
|
if use_first: |
|
|
|
if not hidden_args: |
|
hidden_args.append(0) |
|
|
|
elif hidden_args[0] != 0: |
|
hidden_args.insert(0, 0) |
|
|
|
sentences = [content[j] for j in hidden_args] |
|
embeddings = np.asarray([hidden[j] for j in hidden_args]) |
|
|
|
return sentences, embeddings |
|
|
|
def __run_clusters( |
|
self, |
|
content: List[str], |
|
ratio: float = 0.2, |
|
algorithm: str = 'kmeans', |
|
use_first: bool = True, |
|
num_sentences: int = None |
|
) -> List[str]: |
|
""" |
|
Runs clusters and returns sentences. |
|
|
|
:param content: The content of sentences. |
|
:param ratio: Ratio to use for for clustering. |
|
:param algorithm: Algorithm selection for clustering. |
|
:param use_first: Whether to use first sentence |
|
:param num_sentences: Number of sentences. Overrides ratio. |
|
:return: summarized sentences |
|
""" |
|
sentences, _ = self.cluster_runner( |
|
content, ratio, algorithm, use_first, num_sentences) |
|
return sentences |
|
|
|
def __retrieve_summarized_embeddings( |
|
self, |
|
content: List[str], |
|
ratio: float = 0.2, |
|
algorithm: str = 'kmeans', |
|
use_first: bool = True, |
|
num_sentences: int = None |
|
) -> np.ndarray: |
|
""" |
|
Retrieves embeddings of the summarized sentences. |
|
|
|
:param content: The content of sentences. |
|
:param ratio: Ratio to use for for clustering. |
|
:param algorithm: Algorithm selection for clustering. |
|
:param use_first: Whether to use first sentence |
|
:return: Summarized embeddings |
|
""" |
|
_, embeddings = self.cluster_runner( |
|
content, ratio, algorithm, use_first, num_sentences) |
|
return embeddings |
|
|
|
def calculate_elbow( |
|
self, |
|
body: str, |
|
algorithm: str = 'kmeans', |
|
min_length: int = 40, |
|
max_length: int = 600, |
|
k_max: int = None, |
|
) -> List[float]: |
|
""" |
|
Calculates elbow across the clusters. |
|
|
|
:param body: The input body to summarize. |
|
:param algorithm: The algorithm to use for clustering. |
|
:param min_length: The min length to use. |
|
:param max_length: The max length to use. |
|
:param k_max: The maximum number of clusters to search. |
|
:return: List of elbow inertia values. |
|
""" |
|
sentences = self.sentence_handler(body, min_length, max_length) |
|
|
|
if k_max is None: |
|
k_max = len(sentences) - 1 |
|
|
|
hidden = self.model(sentences, self.hidden, |
|
self.reduce_option, hidden_concat=self.hidden_concat) |
|
elbow = ClusterFeatures( |
|
hidden, algorithm, random_state=self.random_state).calculate_elbow(k_max) |
|
|
|
return elbow |
|
|
|
def calculate_optimal_k( |
|
self, |
|
body: str, |
|
algorithm: str = 'kmeans', |
|
min_length: int = 40, |
|
max_length: int = 600, |
|
k_max: int = None, |
|
): |
|
""" |
|
Calculates the optimal Elbow K. |
|
|
|
:param body: The input body to summarize. |
|
:param algorithm: The algorithm to use for clustering. |
|
:param min_length: The min length to use. |
|
:param max_length: The max length to use. |
|
:param k_max: The maximum number of clusters to search. |
|
:return: |
|
""" |
|
sentences = self.sentence_handler(body, min_length, max_length) |
|
|
|
if k_max is None: |
|
k_max = len(sentences) - 1 |
|
|
|
hidden = self.model(sentences, self.hidden, |
|
self.reduce_option, hidden_concat=self.hidden_concat) |
|
optimal_k = ClusterFeatures( |
|
hidden, algorithm, random_state=self.random_state).calculate_optimal_cluster(k_max) |
|
|
|
return optimal_k |
|
|
|
def run_embeddings( |
|
self, |
|
body: str, |
|
ratio: float = 0.2, |
|
min_length: int = 40, |
|
max_length: int = 600, |
|
use_first: bool = True, |
|
algorithm: str = 'kmeans', |
|
num_sentences: int = None, |
|
aggregate: str = None, |
|
) -> Optional[np.ndarray]: |
|
""" |
|
Preprocesses the sentences, runs the clusters to find the centroids, then combines the embeddings. |
|
|
|
:param body: The raw string body to process |
|
:param ratio: Ratio of sentences to use |
|
:param min_length: Minimum length of sentence candidates to utilize for the summary. |
|
:param max_length: Maximum length of sentence candidates to utilize for the summary |
|
:param use_first: Whether or not to use the first sentence |
|
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) |
|
:param num_sentences: Number of sentences to use. Overrides ratio. |
|
:param aggregate: One of mean, median, max, min. Applied on zero axis |
|
:return: A summary embedding |
|
""" |
|
sentences = self.sentence_handler(body, min_length, max_length) |
|
|
|
if sentences: |
|
embeddings = self.__retrieve_summarized_embeddings( |
|
sentences, ratio, algorithm, use_first, num_sentences) |
|
|
|
if aggregate is not None: |
|
assert aggregate in [ |
|
'mean', 'median', 'max', 'min'], "aggregate must be mean, min, max, or median" |
|
embeddings = self.aggregate_map[aggregate](embeddings, axis=0) |
|
|
|
return embeddings |
|
|
|
return None |
|
|
|
def run( |
|
self, |
|
body: str, |
|
ratio: float = 0.2, |
|
min_length: int = 40, |
|
max_length: int = 600, |
|
use_first: bool = True, |
|
algorithm: str = 'kmeans', |
|
num_sentences: int = None, |
|
return_as_list: bool = False |
|
) -> Union[List, str]: |
|
""" |
|
Preprocesses the sentences, runs the clusters to find the centroids, then combines the sentences. |
|
|
|
:param body: The raw string body to process |
|
:param ratio: Ratio of sentences to use |
|
:param min_length: Minimum length of sentence candidates to utilize for the summary. |
|
:param max_length: Maximum length of sentence candidates to utilize for the summary |
|
:param use_first: Whether or not to use the first sentence |
|
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) |
|
:param num_sentences: Number of sentences to use (overrides ratio). |
|
:param return_as_list: Whether or not to return sentences as list. |
|
:return: A summary sentence |
|
""" |
|
sentences = self.sentence_handler(body, min_length, max_length) |
|
|
|
if sentences: |
|
sentences = self.__run_clusters( |
|
sentences, ratio, algorithm, use_first, num_sentences) |
|
|
|
if return_as_list: |
|
return sentences |
|
else: |
|
return ' '.join(sentences) |
|
|
|
def __call__( |
|
self, |
|
body: str, |
|
ratio: float = 0.2, |
|
min_length: int = 40, |
|
max_length: int = 600, |
|
use_first: bool = True, |
|
algorithm: str = 'kmeans', |
|
num_sentences: int = None, |
|
return_as_list: bool = False, |
|
) -> str: |
|
""" |
|
(utility that wraps around the run function) |
|
Preprocesses the sentences, runs the clusters to find the centroids, then combines the sentences. |
|
|
|
:param body: The raw string body to process. |
|
:param ratio: Ratio of sentences to use. |
|
:param min_length: Minimum length of sentence candidates to utilize for the summary. |
|
:param max_length: Maximum length of sentence candidates to utilize for the summary. |
|
:param use_first: Whether or not to use the first sentence. |
|
:param algorithm: Which clustering algorithm to use. (kmeans, gmm) |
|
:param Number of sentences to use (overrides ratio). |
|
:param return_as_list: Whether or not to return sentences as list. |
|
:return: A summary sentence. |
|
""" |
|
return self.run( |
|
body, ratio, min_length, max_length, algorithm=algorithm, use_first=use_first, num_sentences=num_sentences, |
|
return_as_list=return_as_list |
|
) |
|
|
|
|
|
class Summarizer(ModelProcessor): |
|
|
|
def __init__( |
|
self, |
|
model: str = 'bert-large-uncased', |
|
custom_model: PreTrainedModel = None, |
|
custom_tokenizer: PreTrainedTokenizer = None, |
|
hidden: Union[List[int], int] = -2, |
|
reduce_option: str = 'mean', |
|
sentence_handler: SentenceHandler = SentenceHandler(), |
|
random_state: int = 12345, |
|
hidden_concat: bool = False, |
|
gpu_id: int = 0, |
|
): |
|
""" |
|
This is the main Bert Summarizer class. |
|
|
|
:param model: This parameter is associated with the inherit string parameters from the transformers library. |
|
:param custom_model: If you have a pre-trained model, you can add the model class here. |
|
:param custom_tokenizer: If you have a custom tokenizer, you can add the tokenizer here. |
|
:param hidden: This signifies which layer of the BERT model you would like to use as embeddings. |
|
:param reduce_option: Given the output of the bert model, this param determines how you want to reduce results. |
|
:param greedyness: associated with the neuralcoref library. Determines how greedy coref should be. |
|
:param language: Which language to use for training. |
|
:param random_state: The random state to reproduce summarizations. |
|
:param hidden_concat: Whether or not to concat multiple hidden layers. |
|
:param gpu_id: GPU device index if CUDA is available. |
|
""" |
|
|
|
super(Summarizer, self).__init__( |
|
model, custom_model, custom_tokenizer, hidden, reduce_option, sentence_handler, random_state, hidden_concat, gpu_id |
|
) |
|
|
|
|
|
class TransformerSummarizer(ModelProcessor): |
|
""" |
|
Another type of Summarizer class to choose keyword based model and tokenizer |
|
""" |
|
|
|
MODEL_DICT = { |
|
'Bert': (BertModel, BertTokenizer), |
|
'OpenAIGPT': (OpenAIGPTModel, OpenAIGPTTokenizer), |
|
'GPT2': (GPT2Model, GPT2Tokenizer), |
|
'CTRL': (CTRLModel, CTRLTokenizer), |
|
'TransfoXL': (TransfoXLModel, TransfoXLTokenizer), |
|
'XLNet': (XLNetModel, XLNetTokenizer), |
|
'XLM': (XLMModel, XLMTokenizer), |
|
'DistilBert': (DistilBertModel, DistilBertTokenizer), |
|
} |
|
|
|
def __init__( |
|
self, |
|
transformer_type: str = 'Bert', |
|
transformer_model_key: str = 'bert-base-uncased', |
|
transformer_tokenizer_key: str = None, |
|
hidden: Union[List[int], int] = -2, |
|
reduce_option: str = 'mean', |
|
sentence_handler: SentenceHandler = SentenceHandler(), |
|
random_state: int = 12345, |
|
hidden_concat: bool = False, |
|
gpu_id: int = 0, |
|
): |
|
""" |
|
:param transformer_type: The Transformer type, such as Bert, GPT2, DistilBert, etc. |
|
:param transformer_model_key: The transformer model key. This is the directory for the model. |
|
:param transformer_tokenizer_key: The transformer tokenizer key. This is the tokenizer directory. |
|
:param hidden: The hidden output layers to use for the summarization. |
|
:param reduce_option: The reduce option, such as mean, max, min, median, etc. |
|
:param sentence_handler: The sentence handler class to process the raw text. |
|
:param random_state: The random state to use. |
|
:param hidden_concat: Deprecated hidden concat option. |
|
:param gpu_id: GPU device index if CUDA is available. |
|
""" |
|
try: |
|
self.MODEL_DICT['Roberta'] = (RobertaModel, RobertaTokenizer) |
|
self.MODEL_DICT['Albert'] = (AlbertModel, AlbertTokenizer) |
|
self.MODEL_DICT['Camembert'] = (CamembertModel, CamembertTokenizer) |
|
self.MODEL_DICT['Bart'] = (BartModel, BartTokenizer) |
|
self.MODEL_DICT['Longformer'] = (LongformerModel, LongformerTokenizer) |
|
except Exception: |
|
pass |
|
|
|
model_clz, tokenizer_clz = self.MODEL_DICT[transformer_type] |
|
model = model_clz.from_pretrained( |
|
transformer_model_key, output_hidden_states=True) |
|
|
|
tokenizer = tokenizer_clz.from_pretrained( |
|
transformer_tokenizer_key if transformer_tokenizer_key is not None else transformer_model_key |
|
) |
|
|
|
super().__init__( |
|
None, model, tokenizer, hidden, reduce_option, sentence_handler, random_state, hidden_concat, gpu_id |
|
) |
|
|