|
import pandas as pd |
|
import numpy as np |
|
import sqlite3, torch, json, re, os, torch, itertools, nltk |
|
from ast import literal_eval as leval |
|
from tqdm.auto import tqdm |
|
from utils.verbalisation_module import VerbModule |
|
from utils.sentence_retrieval_module import SentenceRetrievalModule |
|
from utils.textual_entailment_module import TextualEntailmentModule |
|
from importlib import reload |
|
from html.parser import HTMLParser |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from tqdm import tqdm |
|
import gradio as gr |
|
from bs4 import BeautifulSoup |
|
from cleantext import clean |
|
|
|
|
|
def verbalisation(claim_df): |
|
verb_module = VerbModule() |
|
triples = [] |
|
for _, row in claim_df.iterrows(): |
|
triple = { |
|
'subject': row['entity_label'], |
|
'predicate': row['property_label'], |
|
'object': row['object_label'] |
|
} |
|
triples.append(triple) |
|
|
|
|
|
claim_df['verbalisation'] = verb_module.verbalise_triples(triples) |
|
claim_df['verbalisation_unks_replaced'] = claim_df['verbalisation'].apply(verb_module.replace_unks_on_sentence) |
|
claim_df['verbalisation_unks_replaced_then_dropped'] = claim_df['verbalisation'].apply(lambda x: verb_module.replace_unks_on_sentence(x, empty_after=True)) |
|
return claim_df |
|
|
|
def setencesSpliter(verbalised_claims_df_final, reference_text_df, update_progress): |
|
join_df = pd.merge(verbalised_claims_df_final, reference_text_df[['reference_id', 'url', 'html']], on='reference_id', how='left') |
|
SS_df = join_df[['reference_id','url','verbalisation', 'html']].copy() |
|
def clean_html(html_content): |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
text = soup.get_text(separator=' ', strip=True) |
|
cleaned_text = clean(text, |
|
fix_unicode=True, |
|
to_ascii=True, |
|
lower=False, |
|
no_line_breaks=False, |
|
no_urls=True, |
|
no_emails=True, |
|
no_phone_numbers=True, |
|
no_numbers=False, |
|
no_digits=False, |
|
no_currency_symbols=True, |
|
no_punct=False, |
|
replace_with_url="", |
|
replace_with_email="", |
|
replace_with_phone_number="", |
|
replace_with_number="", |
|
replace_with_digit="", |
|
replace_with_currency_symbol="") |
|
return cleaned_text |
|
def split_into_sentences(text): |
|
sentences = nltk.sent_tokenize(text) |
|
return sentences |
|
def slide_sentences(sentences, window_size=2): |
|
if len(sentences) < window_size: |
|
return [" ".join(sentences)] |
|
return [" ".join(sentences[i:i + window_size]) for i in range(len(sentences) - window_size + 1)] |
|
|
|
SS_df['html2text'] = SS_df['html'].apply(clean_html) |
|
SS_df['nlp_sentences'] = SS_df['html2text'].apply(split_into_sentences) |
|
SS_df['nlp_sentences_slide_2'] = SS_df['nlp_sentences'].apply(slide_sentences) |
|
|
|
return SS_df[['reference_id','verbalisation','url','nlp_sentences','nlp_sentences_slide_2']] |
|
|
|
def evidenceSelection(splited_sentences_from_html, BATCH_SIZE, N_TOP_SENTENCES): |
|
sr_module = SentenceRetrievalModule(max_len=512) |
|
sentence_relevance_df = splited_sentences_from_html.copy() |
|
sentence_relevance_df.rename(columns={'verbalisation': 'final_verbalisation'}, inplace=True) |
|
|
|
def chunks(l, n): |
|
n = max(1, n) |
|
return [l[i:i + n] for i in range(0, len(l), n)] |
|
|
|
def compute_scores(column_name): |
|
all_outputs = [] |
|
for _, row in tqdm(sentence_relevance_df.iterrows(), total=sentence_relevance_df.shape[0]): |
|
outputs = [] |
|
for batch in chunks(row[column_name], BATCH_SIZE): |
|
batch_outputs = sr_module.score_sentence_pairs([(row['final_verbalisation'], sentence) for sentence in batch]) |
|
outputs += batch_outputs |
|
all_outputs.append(outputs) |
|
sentence_relevance_df[f'{column_name}_scores'] = pd.Series(all_outputs) |
|
assert all(sentence_relevance_df.apply(lambda x: len(x[column_name]) == len(x[f'{column_name}_scores']), axis=1)) |
|
|
|
compute_scores('nlp_sentences') |
|
compute_scores('nlp_sentences_slide_2') |
|
|
|
def get_top_n_sentences(row, column_name, n): |
|
sentences_with_scores = [{'sentence': t[0], 'score': t[1], 'sentence_id': f"{row.name}_{j}"} for j, t in enumerate(zip(row[column_name], row[f'{column_name}_scores']))] |
|
return sorted(sentences_with_scores, key=lambda x: x['score'], reverse=True)[:n] |
|
|
|
|
|
def filter_overlaps(sentences): |
|
filtered = [] |
|
for evidence in sentences: |
|
if ';' in evidence['sentence_id']: |
|
start_id, end_id = evidence['sentence_id'].split(';') |
|
if not any(start_id in e['sentence_id'].split(';') or end_id in e['sentence_id'].split(';') for e in filtered): |
|
filtered.append(evidence) |
|
else: |
|
if not any(evidence['sentence_id'] in e['sentence_id'].split(';') for e in filtered): |
|
filtered.append(evidence) |
|
return filtered |
|
|
|
def limit_sentence_length(sentence, max_length): |
|
if len(sentence) > max_length: |
|
return sentence[:max_length] + '...' |
|
return sentence |
|
|
|
nlp_sentences_TOP_N, nlp_sentences_slide_2_TOP_N, nlp_sentences_all_TOP_N = [], [], [] |
|
|
|
for _, row in tqdm(sentence_relevance_df.iterrows(), total=sentence_relevance_df.shape[0]): |
|
top_n = get_top_n_sentences(row, 'nlp_sentences', N_TOP_SENTENCES) |
|
top_n = [{'sentence': limit_sentence_length(s['sentence'], 1024), 'score': s['score'], 'sentence_id': s['sentence_id']} for s in top_n] |
|
nlp_sentences_TOP_N.append(top_n) |
|
|
|
top_n_slide_2 = get_top_n_sentences(row, 'nlp_sentences_slide_2', N_TOP_SENTENCES) |
|
top_n_slide_2 = [{'sentence': limit_sentence_length(s['sentence'], 1024), 'score': s['score'], 'sentence_id': s['sentence_id']} for s in top_n_slide_2] |
|
nlp_sentences_slide_2_TOP_N.append(top_n_slide_2) |
|
|
|
all_sentences = top_n + top_n_slide_2 |
|
all_sentences_sorted = sorted(all_sentences, key=lambda x: x['score'], reverse=True) |
|
filtered_sentences = filter_overlaps(all_sentences_sorted) |
|
filtered_sentences = [{'sentence': limit_sentence_length(s['sentence'], 1024), 'score': s['score'], 'sentence_id': s['sentence_id']} for s in filtered_sentences] |
|
nlp_sentences_all_TOP_N.append(filtered_sentences[:N_TOP_SENTENCES]) |
|
|
|
sentence_relevance_df['nlp_sentences_TOP_N'] = pd.Series(nlp_sentences_TOP_N) |
|
sentence_relevance_df['nlp_sentences_slide_2_TOP_N'] = pd.Series(nlp_sentences_slide_2_TOP_N) |
|
sentence_relevance_df['nlp_sentences_all_TOP_N'] = pd.Series(nlp_sentences_all_TOP_N) |
|
|
|
return sentence_relevance_df |
|
|
|
def textEntailment(evidence_df, SCORE_THRESHOLD): |
|
textual_entailment_df = evidence_df.copy() |
|
te_module = TextualEntailmentModule() |
|
|
|
keys = ['TOP_N', 'slide_2_TOP_N', 'all_TOP_N'] |
|
te_columns = {f'evidence_TE_prob_{key}': [] for key in keys} |
|
te_columns.update({f'evidence_TE_prob_weighted_{key}': [] for key in keys}) |
|
te_columns.update({f'evidence_TE_labels_{key}': [] for key in keys}) |
|
te_columns.update({f'claim_TE_prob_weighted_sum_{key}': [] for key in keys}) |
|
te_columns.update({f'claim_TE_label_weighted_sum_{key}': [] for key in keys}) |
|
te_columns.update({f'claim_TE_label_malon_{key}': [] for key in keys}) |
|
|
|
def process_row(row): |
|
claim = row['final_verbalisation'] |
|
results = {} |
|
for key in keys: |
|
evidence = row[f'nlp_sentences_{key}'] |
|
evidence_size = len(evidence) |
|
if evidence_size == 0: |
|
results[key] = { |
|
'evidence_TE_prob': [], |
|
'evidence_TE_labels': [], |
|
'evidence_TE_prob_weighted': [], |
|
'claim_TE_prob_weighted_sum': [0, 0, 0], |
|
'claim_TE_label_weighted_sum': 'NOT ENOUGH INFO', |
|
'claim_TE_label_malon': 'NOT ENOUGH INFO' |
|
} |
|
continue |
|
|
|
evidence_TE_prob = te_module.get_batch_scores( |
|
claims=[claim] * evidence_size, |
|
evidence=[e['sentence'] for e in evidence] |
|
) |
|
|
|
evidence_TE_labels = [te_module.get_label_from_scores(s) for s in evidence_TE_prob] |
|
|
|
evidence_TE_prob_weighted = [ |
|
probs * ev['score'] for probs, ev in zip(evidence_TE_prob, evidence) |
|
if ev['score'] > SCORE_THRESHOLD |
|
] |
|
|
|
claim_TE_prob_weighted_sum = np.sum(evidence_TE_prob_weighted, axis=0) if evidence_TE_prob_weighted else [0, 0, 0] |
|
|
|
claim_TE_label_weighted_sum = te_module.get_label_from_scores(claim_TE_prob_weighted_sum) if evidence_TE_prob_weighted else 'NOT ENOUGH INFO' |
|
|
|
claim_TE_label_malon = te_module.get_label_malon( |
|
[probs for probs, ev in zip(evidence_TE_prob, evidence) if ev['score'] > SCORE_THRESHOLD] |
|
) |
|
|
|
results[key] = { |
|
'evidence_TE_prob': evidence_TE_prob, |
|
'evidence_TE_labels': evidence_TE_labels, |
|
'evidence_TE_prob_weighted': evidence_TE_prob_weighted, |
|
'claim_TE_prob_weighted_sum': claim_TE_prob_weighted_sum, |
|
'claim_TE_label_weighted_sum': claim_TE_label_weighted_sum, |
|
'claim_TE_label_malon': claim_TE_label_malon |
|
} |
|
return results |
|
|
|
for i, row in tqdm(textual_entailment_df.iterrows(), total=textual_entailment_df.shape[0]): |
|
try: |
|
result_sets = process_row(row) |
|
for key in keys: |
|
for k, v in result_sets[key].items(): |
|
te_columns[f'{k}_{key}'].append(v) |
|
except Exception as e: |
|
print(f"Error processing row {i}: {e}") |
|
print(row) |
|
raise |
|
|
|
for key in keys: |
|
for col in ['evidence_TE_prob', 'evidence_TE_prob_weighted', 'evidence_TE_labels', |
|
'claim_TE_prob_weighted_sum', 'claim_TE_label_weighted_sum', 'claim_TE_label_malon']: |
|
textual_entailment_df[f'{col}_{key}'] = pd.Series(te_columns[f'{col}_{key}']) |
|
|
|
return textual_entailment_df |
|
|
|
def TableMaking(verbalised_claims_df_final, result): |
|
verbalised_claims_df_final.set_index('reference_id', inplace=True) |
|
result.set_index('reference_id', inplace=True) |
|
results = pd.concat([verbalised_claims_df_final, result], axis=1) |
|
results['triple'] = results[['entity_label', 'property_label', 'object_label']].apply(lambda x: ', '.join(x), axis=1) |
|
all_result = pd.DataFrame() |
|
for idx, row in results.iterrows(): |
|
aResult = pd.DataFrame(row["nlp_sentences_TOP_N"])[['sentence','score']] |
|
aResult.rename(columns={'score': 'Relevance_score'}, inplace=True) |
|
aResult = pd.concat([aResult, pd.DataFrame(row["evidence_TE_labels_all_TOP_N"], columns=['TextEntailment'])], axis=1) |
|
aResult = pd.concat([aResult, pd.DataFrame(np.max(row["evidence_TE_prob_all_TOP_N"], axis=1), columns=['Entailment_score'])], axis=1) |
|
aResult = aResult.reindex(columns=['sentence', 'TextEntailment', 'Entailment_score','Relevance_score']) |
|
aBox = pd.DataFrame({'triple': [row["triple"]], 'url': row['url'],'Results': [aResult]}) |
|
all_result = pd.concat([all_result,aBox], axis=0) |
|
|
|
def dataframe_to_html(all_result): |
|
html = '<html><head><style>table {border-collapse: collapse; width: 100%;} th, td {border: 1px solid black; padding: 8px; text-align: left;} th {background-color: #f2f2f2;}</style></head><body>' |
|
for triple in all_result['triple'].unique(): |
|
html += f'<h3>Triple: {triple}</h3>' |
|
df = all_result[all_result['triple']==triple].copy() |
|
for idx, row in df.iterrows(): |
|
url = row['url'] |
|
results = row['Results'] |
|
html += f'<h3>Reference: {url}</h3>' |
|
html += results.to_html(index=False) |
|
html += '</body></html>' |
|
return html |
|
html_result = dataframe_to_html(all_result) |
|
return html_result |
|
|
|
if __name__ == '__main__': |
|
target_QID = 'Q245247' |
|
conn = sqlite3.connect('wikidata_claims_refs_parsed.db') |
|
query = f"SELECT * FROM claim_text WHERE entity_id = '{target_QID}'" |
|
claim_df = pd.read_sql_query(query, conn) |
|
query = f"SELECT * FROM html_text Where entity_id = '{target_QID}'" |
|
reference_text_df = pd.read_sql_query(query, conn) |
|
verbalised_claims_df_final = verbalisation(claim_df) |
|
progress = gr.Progress(len(verbalised_claims_df_final)) |
|
def update_progress(curr_step, total_steps): |
|
progress((curr_step + 1) / total_steps) |
|
|
|
splited_sentences_from_html = setencesSpliter(verbalised_claims_df_final, reference_text_df, update_progress) |
|
|
|
BATCH_SIZE = 512 |
|
N_TOP_SENTENCES = 5 |
|
SCORE_THRESHOLD = 0.6 |
|
evidence_df = evidenceSelection(splited_sentences_from_html, BATCH_SIZE, N_TOP_SENTENCES) |
|
result = textEntailment(evidence_df, SCORE_THRESHOLD) |
|
conn.commit() |
|
conn.close() |
|
display_df =TableMaking(verbalised_claims_df_final, result) |
|
|