Swe_summarizer / LexRank.py
Gabriel's picture
Update LexRank.py
fe74ce1
import numpy as np
from scipy.sparse.csgraph import connected_components
from scipy.special import softmax
import logging
logger = logging.getLogger(__name__)
def degree_centrality_scores(
similarity_matrix,
threshold=None,
increase_power=True,
):
if not (
threshold is None
or isinstance(threshold, float)
and 0 <= threshold < 1
):
raise ValueError(
'\'threshold\' should be a floating-point number '
'from the interval [0, 1) or None',
)
if threshold is None:
markov_matrix = create_markov_matrix(similarity_matrix)
else:
markov_matrix = create_markov_matrix_discrete(
similarity_matrix,
threshold,
)
scores = stationary_distribution(
markov_matrix,
increase_power=increase_power,
normalized=False,
)
return scores
def _power_method(transition_matrix, increase_power=True, max_iter=10000):
eigenvector = np.ones(len(transition_matrix))
if len(eigenvector) == 1:
return eigenvector
transition = transition_matrix.transpose()
for _ in range(max_iter):
eigenvector_next = np.dot(transition, eigenvector)
if np.allclose(eigenvector_next, eigenvector):
return eigenvector_next
eigenvector = eigenvector_next
if increase_power:
transition = np.dot(transition, transition)
logger.warning("Maximum number of iterations for power method exceeded without convergence!")
return eigenvector_next
def connected_nodes(matrix):
_, labels = connected_components(matrix)
groups = []
for tag in np.unique(labels):
group = np.where(labels == tag)[0]
groups.append(group)
return groups
def create_markov_matrix(weights_matrix):
n_1, n_2 = weights_matrix.shape
if n_1 != n_2:
raise ValueError('\'weights_matrix\' should be square')
row_sum = weights_matrix.sum(axis=1, keepdims=True)
# normalize probability distribution differently if we have negative transition values
if np.min(weights_matrix) <= 0:
return softmax(weights_matrix, axis=1)
return weights_matrix / row_sum
def create_markov_matrix_discrete(weights_matrix, threshold):
discrete_weights_matrix = np.zeros(weights_matrix.shape)
ixs = np.where(weights_matrix >= threshold)
discrete_weights_matrix[ixs] = 1
return create_markov_matrix(discrete_weights_matrix)
def stationary_distribution(
transition_matrix,
increase_power=True,
normalized=True,
):
n_1, n_2 = transition_matrix.shape
if n_1 != n_2:
raise ValueError('\'transition_matrix\' should be square')
distribution = np.zeros(n_1)
grouped_indices = connected_nodes(transition_matrix)
for group in grouped_indices:
t_matrix = transition_matrix[np.ix_(group, group)]
eigenvector = _power_method(t_matrix, increase_power=increase_power)
distribution[group] = eigenvector
if normalized:
distribution /= n_1
return distribution