import numpy as np from scipy.sparse.csgraph import connected_components from scipy.special import softmax import logging logger = logging.getLogger(__name__) def degree_centrality_scores( similarity_matrix, threshold=None, increase_power=True, ): if not ( threshold is None or isinstance(threshold, float) and 0 <= threshold < 1 ): raise ValueError( '\'threshold\' should be a floating-point number ' 'from the interval [0, 1) or None', ) if threshold is None: markov_matrix = create_markov_matrix(similarity_matrix) else: markov_matrix = create_markov_matrix_discrete( similarity_matrix, threshold, ) scores = stationary_distribution( markov_matrix, increase_power=increase_power, normalized=False, ) return scores def _power_method(transition_matrix, increase_power=True, max_iter=10000): eigenvector = np.ones(len(transition_matrix)) if len(eigenvector) == 1: return eigenvector transition = transition_matrix.transpose() for _ in range(max_iter): eigenvector_next = np.dot(transition, eigenvector) if np.allclose(eigenvector_next, eigenvector): return eigenvector_next eigenvector = eigenvector_next if increase_power: transition = np.dot(transition, transition) logger.warning("Maximum number of iterations for power method exceeded without convergence!") return eigenvector_next def connected_nodes(matrix): _, labels = connected_components(matrix) groups = [] for tag in np.unique(labels): group = np.where(labels == tag)[0] groups.append(group) return groups def create_markov_matrix(weights_matrix): n_1, n_2 = weights_matrix.shape if n_1 != n_2: raise ValueError('\'weights_matrix\' should be square') row_sum = weights_matrix.sum(axis=1, keepdims=True) # normalize probability distribution differently if we have negative transition values if np.min(weights_matrix) <= 0: return softmax(weights_matrix, axis=1) return weights_matrix / row_sum def create_markov_matrix_discrete(weights_matrix, threshold): discrete_weights_matrix = np.zeros(weights_matrix.shape) ixs = np.where(weights_matrix >= threshold) discrete_weights_matrix[ixs] = 1 return create_markov_matrix(discrete_weights_matrix) def stationary_distribution( transition_matrix, increase_power=True, normalized=True, ): n_1, n_2 = transition_matrix.shape if n_1 != n_2: raise ValueError('\'transition_matrix\' should be square') distribution = np.zeros(n_1) grouped_indices = connected_nodes(transition_matrix) for group in grouped_indices: t_matrix = transition_matrix[np.ix_(group, group)] eigenvector = _power_method(t_matrix, increase_power=increase_power) distribution[group] = eigenvector if normalized: distribution /= n_1 return distribution