import numpy as np from numpy.linalg import norm class DirectedCentralityRnak(object): def __init__(self, document_feats, extract_num=20, beta=0.2, lambda1=1, lambda2=0.8, alpha=1, processors=8): self.extract_num = extract_num self.processors = processors self.beta = beta self.lambda1 = lambda1 self.lambda2 = lambda2 self.alpha = alpha self.candidate_phrases = [x['candidate_phrases'] for x in document_feats] self.doc_embeddings = [x['sentence_embeddings'] for x in document_feats] self.tokens_embeddings = [x['candidate_phrases_embeddings'] for x in document_feats] def flat_list(self, l): return [x for ll in l for x in ll] def extract_summary(self,): paired_scores = self.rank() rank_list_phrases = [] for candidate, paired_score in zip(self.candidate_phrases, paired_scores): candidates = [] for i in range(len(candidate)): phrase = candidate[i] candidates.append([phrase, paired_score[i][0], paired_score[i][1]]) rank_list_phrases.append(candidates) predicted_candidation = [] for i in range(len(rank_list_phrases)): final_score = [] position_weight = 1 / (np.array(list(range(1, len(rank_list_phrases[i]) + 1)))) position_weight = np.exp(position_weight) / np.sum(np.exp(position_weight)) cnt = 0 for candidate, index, score in rank_list_phrases[i]: final_score.append([candidate, score * position_weight[cnt]]) cnt += 1 final_score.sort(key = lambda x: x[1], reverse = True) candidates = [x[0].strip() for x in final_score] predicted_candidation.append(candidates) return predicted_candidation def pairdown(self, scores, pair_indice, length): out_matrix = np.ones((length, length)) for pair in pair_indice: out_matrix[pair[0][0]][pair[0][1]] = scores[pair[1]] out_matrix[pair[0][1]][pair[0][0]] = scores[pair[1]] return out_matrix def get_similarity_matrix(self, sentence_embeddings): pairs = [] scores = [] cnt = 0 for i in range(len(sentence_embeddings)-1): for j in range(i, len(sentence_embeddings)): if type(sentence_embeddings[i]) == float or type(sentence_embeddings[j]) == float: scores.append(0) else: scores.append(np.dot(sentence_embeddings[i], sentence_embeddings[j])) pairs.append(([i, j], cnt)) cnt += 1 return self.pairdown(scores, pairs, len(sentence_embeddings)) def compute_scores(self, similarity_matrix, edge_threshold=0): forward_scores = [1e-10 for i in range(len(similarity_matrix))] backward_scores = [1e-10 for i in range(len(similarity_matrix))] edges = [] n = len(similarity_matrix) alpha = self.alpha for i in range(len(similarity_matrix)): for j in range(i+1, len(similarity_matrix[i])): edge_score = similarity_matrix[i][j] # boundary_position_function db_i = min(i, alpha * (n-i)) db_j = min(j, alpha * (n-j)) if edge_score > edge_threshold: if db_i < db_j: forward_scores[i] += edge_score backward_scores[j] += edge_score edges.append((i,j,edge_score)) else: forward_scores[j] += edge_score backward_scores[i] += edge_score edges.append((j,i,edge_score)) return np.asarray(forward_scores), np.asarray(backward_scores), edges def _rank_part(self, similarity_matrix, doc_vector, candidate_phrases_embeddings): min_score = np.min(similarity_matrix) max_score = np.max(similarity_matrix) threshold = min_score + self.beta * (max_score - min_score) new_matrix = similarity_matrix - threshold dist = [] for emb in candidate_phrases_embeddings: if type(doc_vector) == float or type(emb) == float: dist.append(0) else: dist.append(1/np.sum(np.abs(emb - doc_vector))) forward_score, backward_score, _ = self.compute_scores(new_matrix) paired_scores = [] for node in range(len(forward_score)): paired_scores.append([node, (self.lambda1 * forward_score[node] + self.lambda2 * backward_score[node]) * (dist[node])]) return paired_scores def rank(self,): similarity_matrix = [] extracted_list = [] for embedded in self.tokens_embeddings: similarity_matrix.append(self.get_similarity_matrix(embedded)) for matrix, doc_vector, candidate_phrases_embeddings in zip(similarity_matrix, self.doc_embeddings, self.tokens_embeddings): extracted_list.append(self._rank_part(matrix, doc_vector, candidate_phrases_embeddings)) return extracted_list