from minicheck_web.minicheck import MiniCheck from web_retrieval import * from nltk.tokenize import sent_tokenize import evaluate from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk): ''' Sort the chunks in a single document based on the probability of "supported" in descending order. This function is used when a user document is provided. ''' flattened_docs = [doc for chunk in used_chunk for doc in chunk] flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk] doc_score = list(zip(flattened_docs, flattened_scores)) ranked_doc_score = sorted(doc_score, key=lambda x: x[1], reverse=True) ranked_docs, scores = zip(*ranked_doc_score) return ranked_docs, scores def rank_documents_TFIDF(claim, scraped_results): """ each element in scraped_results is a tuple of (document, URL) """ documents = [result[0] for result in scraped_results] corpus = [claim] + documents vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform(corpus) claim_vector = tfidf_matrix[0] similarity_scores = cosine_similarity(claim_vector, tfidf_matrix[1:]) ranked_results = [(scraped_results[i][0], scraped_results[i][1], score) for i, score in enumerate(similarity_scores[0])] ranked_results.sort(key=lambda x: x[2], reverse=True) ranked_documents = [(result[0], result[1]) for result in ranked_results] return ranked_documents class EndpointHandler(): def __init__(self, path="./"): self.scorer = MiniCheck(path=path) self.rouge = evaluate.load('rouge') self.tfidf_order = True def __call__(self, data): claim = data['inputs']['claims'][0] ents = extract_entities(claim) # Using user-provided document to do fact-checking if len(data['inputs']['docs']) == 1 and data['inputs']['docs'][0] != '': _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=data) ranked_docs, scores = sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk) span_to_highlight = [] for doc_chunk, score in zip(ranked_docs, scores): # If the chunk can support the claim, find the sentence with the highest rouge score if score > 0.5: highest_score_sent, _ = self.chunk_and_highest_rouge_score(doc_chunk, claim) span_to_highlight.append(highest_score_sent) else: span_to_highlight.append("") outputs = { 'ranked_docs': ranked_docs, 'scores': scores, 'span_to_highlight': span_to_highlight, 'entities': ents } else: assert len(data['inputs']['claims']) == 1, "Only one claim is allowed for web retrieval for the current version." ranked_docs, scores, ranked_urls = self.search_relevant_docs(claim, tfidf_order=self.tfidf_order) span_to_highlight = [] for doc_chunk, score in zip(ranked_docs, scores): # If the chunk can support the claim, find the sentence with the highest rouge score if score > 0.5: highest_score_sent, _ = self.chunk_and_highest_rouge_score(doc_chunk, claim) span_to_highlight.append(highest_score_sent) else: span_to_highlight.append("") outputs = { 'ranked_docs': ranked_docs, 'scores': scores, 'ranked_urls': ranked_urls, 'span_to_highlight': span_to_highlight, 'entities': ents } return outputs def search_relevant_docs(self, claim, timeout=10, max_search_results_per_query=5, allow_duplicated_urls=False, tfidf_order=False): """ if tfidf_order == True, then display the docs in the order of TF-IDF similarity with the claim, regardless of the entailment score otherwise, display the docs in the order of the entailment score """ search_results = search_google(claim, timeout=timeout) print('Searching webpages...') start = time() with concurrent.futures.ThreadPoolExecutor() as e: scraped_results = e.map(scrape_url, search_results, itertools.repeat(timeout)) end = time() print(f"Finished searching in {round((end - start), 1)} seconds.\n") scraped_results = [(r[0][:20000], r[1]) for r in scraped_results if r[0] and '��' not in r[0]] # those can be ranked based on TF-IDF to be more efficient scraped_results = rank_documents_TFIDF(claim, scraped_results) retrieved_docs, urls = zip(*scraped_results[:max_search_results_per_query]) print('Scoring webpages...') start = time() retrieved_data = { 'inputs': { 'docs': list(retrieved_docs), 'claims': [claim]*len(retrieved_docs) } } _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=retrieved_data) end = time() num_chunks = len([item for items in used_chunk for item in items]) print(f'Finished {num_chunks} entailment checks in {round((end - start), 1)} seconds ({round(num_chunks / (end - start) * 60)} Doc./min).') if tfidf_order: tfidf_docs, scores = [], [] for used_c, support_prob_per_c in zip(used_chunk, support_prob_per_chunk): # If the doc can support the claim, find the chunk with the # highest entailment score; otherwise, use the first chunk if max(support_prob_per_c) > 0.5: tfidf_docs.append(used_c[np.argmax(support_prob_per_c)]) scores.append(max(support_prob_per_c)) else: tfidf_docs.append(used_c[0]) scores.append(support_prob_per_c[0]) return tfidf_docs, scores, urls else: ranked_docs, scores, ranked_urls = order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=allow_duplicated_urls) return ranked_docs, scores, ranked_urls def chunk_and_highest_rouge_score(self, doc, claim): ''' Given a document and a claim, return the sentence with the highest rouge score and the score ''' doc_sentences = sent_tokenize(doc) claims = [claim] * len(doc_sentences) results = self.rouge.compute( predictions=doc_sentences, references=claims, use_aggregator=False) highest_score = 0 highest_score_sent = "" for i in range(len(doc_sentences)): if results['rouge1'][i] > highest_score: highest_score = results['rouge1'][i] highest_score_sent = doc_sentences[i] return highest_score_sent, highest_score