from minicheck_web.minicheck import MiniCheck from web_retrieval import * from nltk.tokenize import sent_tokenize import evaluate from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from heapq import heappush, heappop def sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk): ''' Sort the chunks in a single document based on the probability of "supported" in descending order. This function is used when a user document is provided. ''' flattened_docs = [doc for chunk in used_chunk for doc in chunk] flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk] doc_score = list(zip(flattened_docs, flattened_scores)) ranked_doc_score = sorted(doc_score, key=lambda x: x[1], reverse=True) ranked_docs, scores = zip(*ranked_doc_score) return ranked_docs, scores def rank_documents_TFIDF(claim, scraped_results): """ each element in scraped_results is a tuple of (document, URL) """ documents = [result[0] for result in scraped_results] corpus = [claim] + documents vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform(corpus) claim_vector = tfidf_matrix[0] similarity_scores = cosine_similarity(claim_vector, tfidf_matrix[1:]) ranked_results = [(scraped_results[i][0], scraped_results[i][1], score) for i, score in enumerate(similarity_scores[0])] ranked_results.sort(key=lambda x: x[2], reverse=True) ranked_documents = [(result[0], result[1]) for result in ranked_results] return ranked_documents class EndpointHandler(): def __init__(self, path="./"): self.scorer = MiniCheck(path=path) self.rouge = evaluate.load('rouge') self.tfidf_order = True self.num_highlights = 1 self.default_chunk_size = 500 self.chunk_size = 500 def __call__(self, data): # this is necessary for setting the chunk size for # retrived docs if 'chunk_size' in data['inputs']: self.chunk_size = int(data['inputs']['chunk_size']) else: self.chunk_size = self.default_chunk_size claim = data['inputs']['claims'][0] ents = extract_entities(claim) # Using user-provided document to do fact-checking if len(data['inputs']['docs']) == 1 and data['inputs']['docs'][0] != '': _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=data) ranked_docs, scores = sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk) span_to_highlight, rouge_score = [], [] for doc_chunk in ranked_docs: highest_score_sent, rouge_score = self.chunk_and_highest_rouge_score(doc_chunk, claim, k=self.num_highlights) span_to_highlight.append(highest_score_sent) outputs = { 'ranked_docs': ranked_docs, 'scores': scores, 'span_to_highlight': span_to_highlight, 'entities': ents, 'rouge_score': rouge_score } else: assert len(data['inputs']['claims']) == 1, "Only one claim is allowed for web retrieval for the current version." ranked_docs, scores, ranked_urls = self.search_relevant_docs(claim, tfidf_order=self.tfidf_order) span_to_highlight, rouge_score = [], [] for doc_chunk in ranked_docs: highest_score_sent, rouge_score = self.chunk_and_highest_rouge_score(doc_chunk, claim, k=self.num_highlights) span_to_highlight.append(highest_score_sent) outputs = { 'ranked_docs': ranked_docs, 'scores': scores, 'ranked_urls': ranked_urls, 'span_to_highlight': span_to_highlight, 'entities': ents, 'rouge_score': rouge_score } return outputs def search_relevant_docs(self, claim, timeout=10, max_search_results_per_query=5, allow_duplicated_urls=False, tfidf_order=False): """ if tfidf_order == True, then display the docs in the order of TF-IDF similarity with the claim, regardless of the entailment score otherwise, display the docs in the order of the entailment score """ search_results = search_google(claim, timeout=timeout) print('Searching webpages...') start = time() with concurrent.futures.ThreadPoolExecutor() as e: scraped_results = e.map(scrape_url, search_results, itertools.repeat(timeout)) end = time() print(f"Finished searching in {round((end - start), 1)} seconds.\n") scraped_results = [(r[0][:20000], r[1]) for r in scraped_results if r[0] and '��' not in r[0]] # those can be ranked based on TF-IDF to be more efficient scraped_results = rank_documents_TFIDF(claim, scraped_results) retrieved_docs, urls = zip(*scraped_results[:max_search_results_per_query]) print('Scoring webpages...') start = time() retrieved_data = { 'inputs': { 'docs': list(retrieved_docs), 'claims': [claim]*len(retrieved_docs), 'chunk_size': self.chunk_size } } _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=retrieved_data) end = time() num_chunks = len([item for items in used_chunk for item in items]) print(f'Finished {num_chunks} entailment checks in {round((end - start), 1)} seconds ({round(num_chunks / (end - start) * 60)} Doc./min).') if tfidf_order: tfidf_docs, scores = [], [] for used_c, support_prob_per_c in zip(used_chunk, support_prob_per_chunk): # If the doc can support the claim, find the chunk with the # highest entailment score; otherwise, use the first chunk if max(support_prob_per_c) > 0.5: tfidf_docs.append(used_c[np.argmax(support_prob_per_c)]) scores.append(max(support_prob_per_c)) else: tfidf_docs.append(used_c[0]) scores.append(support_prob_per_c[0]) return tfidf_docs, scores, urls else: ranked_docs, scores, ranked_urls = order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=allow_duplicated_urls) return ranked_docs, scores, ranked_urls def chunk_and_highest_rouge_score(self, doc, claim, k=1): ''' Given a document and a claim, return the top k sentences with the highest rouge scores and their scores ''' doc_sentences = sent_tokenize(doc) claims = [claim] * len(doc_sentences) results = self.rouge.compute( predictions=doc_sentences, references=claims, use_aggregator=False) # Initialize a min heap to store the top k sentences and their scores top_k_heap = [] for i in range(len(doc_sentences)): score = results['rouge1'][i] sentence = doc_sentences[i] # If the heap has less than k elements, push the current sentence and score if len(top_k_heap) < k: heappush(top_k_heap, (score, sentence)) else: # If the current score is higher than the minimum score in the heap, # remove the minimum and push the current sentence and score if score > top_k_heap[0][0]: heappop(top_k_heap) heappush(top_k_heap, (score, sentence)) # Extract the top k sentences and scores from the heap top_k_sentences = [] top_k_scores = [] while top_k_heap: score, sentence = heappop(top_k_heap) top_k_sentences.append(sentence) top_k_scores.append(score) # Reverse the order of sentences and scores to get them in descending order top_k_sentences = top_k_sentences[::-1] top_k_scores = top_k_scores[::-1] return top_k_sentences, top_k_scores