from typing import Dict, List import numpy as np from numpy import ndarray from sklearn.cluster import KMeans from sklearn.decomposition import PCA from sklearn.mixture import GaussianMixture class ClusterFeatures(object): """ Basic handling of clustering features. """ def __init__( self, features: ndarray, algorithm: str = 'kmeans', pca_k: int = None, random_state: int = 12345, ): """ :param features: the embedding matrix created by bert parent. :param algorithm: Which clustering algorithm to use. :param pca_k: If you want the features to be ran through pca, this is the components number. :param random_state: Random state. """ if pca_k: self.features = PCA(n_components=pca_k).fit_transform(features) else: self.features = features self.algorithm = algorithm self.pca_k = pca_k self.random_state = random_state def __get_model(self, k: int): """ Retrieve clustering model. :param k: amount of clusters. :return: Clustering model. """ if self.algorithm == 'gmm': return GaussianMixture(n_components=k, random_state=self.random_state) return KMeans(n_clusters=k, random_state=self.random_state) def __get_centroids(self, model): """ Retrieve centroids of model. :param model: Clustering model. :return: Centroids. """ if self.algorithm == 'gmm': return model.means_ return model.cluster_centers_ def __find_closest_args(self, centroids: np.ndarray) -> Dict: """ Find the closest arguments to centroid. :param centroids: Centroids to find closest. :return: Closest arguments. """ centroid_min = 1e10 cur_arg = -1 args = {} used_idx = [] for j, centroid in enumerate(centroids): for i, feature in enumerate(self.features): value = np.linalg.norm(feature - centroid) if value < centroid_min and i not in used_idx: cur_arg = i centroid_min = value used_idx.append(cur_arg) args[j] = cur_arg centroid_min = 1e10 cur_arg = -1 return args def calculate_elbow(self, k_max: int) -> List[float]: """ Calculates elbow up to the provided k_max. :param k_max: K_max to calculate elbow for. :return: The inertias up to k_max. """ inertias = [] for k in range(1, min(k_max, len(self.features))): model = self.__get_model(k).fit(self.features) inertias.append(model.inertia_) return inertias def calculate_optimal_cluster(self, k_max: int): """ Calculates the optimal cluster based on Elbow. :param k_max: The max k to search elbow for. :return: The optimal cluster size. """ delta_1 = [] delta_2 = [] max_strength = 0 k = 1 inertias = self.calculate_elbow(k_max) for i in range(len(inertias)): delta_1.append(inertias[i] - inertias[i - 1] if i > 0 else 0.0) delta_2.append(delta_1[i] - delta_1[i - 1] if i > 1 else 0.0) for j in range(len(inertias)): strength = 0 if j <= 1 or j == len(inertias) - 1 else delta_2[j + 1] - delta_1[j + 1] if strength > max_strength: max_strength = strength k = j + 1 return k def cluster(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]: """ Clusters sentences based on the ratio. :param ratio: Ratio to use for clustering. :param num_sentences: Number of sentences. Overrides ratio. :return: Sentences index that qualify for summary. """ if num_sentences is not None: if num_sentences == 0: return [] k = min(num_sentences, len(self.features)) else: k = max(int(len(self.features) * ratio), 1) model = self.__get_model(k).fit(self.features) centroids = self.__get_centroids(model) cluster_args = self.__find_closest_args(centroids) sorted_values = sorted(cluster_args.values()) return sorted_values def __call__(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]: """ Clusters sentences based on the ratio. :param ratio: Ratio to use for clustering. :param num_sentences: Number of sentences. Overrides ratio. :return: Sentences index that qualify for summary. """ return self.cluster(ratio)