Summarizer / extractive_summarizer /cluster_features.py
piecurus's picture
extractive
7215c40
from typing import Dict, List
import numpy as np
from numpy import ndarray
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.mixture import GaussianMixture
class ClusterFeatures(object):
"""
Basic handling of clustering features.
"""
def __init__(
self,
features: ndarray,
algorithm: str = 'kmeans',
pca_k: int = None,
random_state: int = 12345,
):
"""
:param features: the embedding matrix created by bert parent.
:param algorithm: Which clustering algorithm to use.
:param pca_k: If you want the features to be ran through pca, this is the components number.
:param random_state: Random state.
"""
if pca_k:
self.features = PCA(n_components=pca_k).fit_transform(features)
else:
self.features = features
self.algorithm = algorithm
self.pca_k = pca_k
self.random_state = random_state
def __get_model(self, k: int):
"""
Retrieve clustering model.
:param k: amount of clusters.
:return: Clustering model.
"""
if self.algorithm == 'gmm':
return GaussianMixture(n_components=k, random_state=self.random_state)
return KMeans(n_clusters=k, random_state=self.random_state)
def __get_centroids(self, model):
"""
Retrieve centroids of model.
:param model: Clustering model.
:return: Centroids.
"""
if self.algorithm == 'gmm':
return model.means_
return model.cluster_centers_
def __find_closest_args(self, centroids: np.ndarray) -> Dict:
"""
Find the closest arguments to centroid.
:param centroids: Centroids to find closest.
:return: Closest arguments.
"""
centroid_min = 1e10
cur_arg = -1
args = {}
used_idx = []
for j, centroid in enumerate(centroids):
for i, feature in enumerate(self.features):
value = np.linalg.norm(feature - centroid)
if value < centroid_min and i not in used_idx:
cur_arg = i
centroid_min = value
used_idx.append(cur_arg)
args[j] = cur_arg
centroid_min = 1e10
cur_arg = -1
return args
def calculate_elbow(self, k_max: int) -> List[float]:
"""
Calculates elbow up to the provided k_max.
:param k_max: K_max to calculate elbow for.
:return: The inertias up to k_max.
"""
inertias = []
for k in range(1, min(k_max, len(self.features))):
model = self.__get_model(k).fit(self.features)
inertias.append(model.inertia_)
return inertias
def calculate_optimal_cluster(self, k_max: int):
"""
Calculates the optimal cluster based on Elbow.
:param k_max: The max k to search elbow for.
:return: The optimal cluster size.
"""
delta_1 = []
delta_2 = []
max_strength = 0
k = 1
inertias = self.calculate_elbow(k_max)
for i in range(len(inertias)):
delta_1.append(inertias[i] - inertias[i - 1] if i > 0 else 0.0)
delta_2.append(delta_1[i] - delta_1[i - 1] if i > 1 else 0.0)
for j in range(len(inertias)):
strength = 0 if j <= 1 or j == len(inertias) - 1 else delta_2[j + 1] - delta_1[j + 1]
if strength > max_strength:
max_strength = strength
k = j + 1
return k
def cluster(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]:
"""
Clusters sentences based on the ratio.
:param ratio: Ratio to use for clustering.
:param num_sentences: Number of sentences. Overrides ratio.
:return: Sentences index that qualify for summary.
"""
if num_sentences is not None:
if num_sentences == 0:
return []
k = min(num_sentences, len(self.features))
else:
k = max(int(len(self.features) * ratio), 1)
model = self.__get_model(k).fit(self.features)
centroids = self.__get_centroids(model)
cluster_args = self.__find_closest_args(centroids)
sorted_values = sorted(cluster_args.values())
return sorted_values
def __call__(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]:
"""
Clusters sentences based on the ratio.
:param ratio: Ratio to use for clustering.
:param num_sentences: Number of sentences. Overrides ratio.
:return: Sentences index that qualify for summary.
"""
return self.cluster(ratio)