import pandas as pd |
import json |
import numpy as np |
import re |
from itertools import combinations as itertools_combinations |
import os |
import sys |
from SPARQLWrapper import SPARQLWrapper, JSON |
from sentence_transformers import SentenceTransformer |
import aiohttp |
import asyncio |
import streamlit as st |
import time |
from openai import OpenAI |
import sys |
import time |
from bs4 import BeautifulSoup |
from fake_useragent import UserAgent |
import requests |
ua = UserAgent() |
headers = { |
"User-Agent": f"{ua.random}" |
} |
folder_path = '/home/user/app/qids_folder' |
if not os.path.exists(folder_path): |
os.mkdir(folder_path) |
print(f"folder created at {folder_path}") |
else: |
print(f"folder already exists.") |
folder_path_1 = '/home/user/app/info_extraction' |
if not os.path.exists(folder_path_1): |
os.mkdir(folder_path_1) |
print(f"Folder created at {folder_path_1}") |
else: |
print(f"folder already exists.") |
model = SentenceTransformer("Lajavaness/bilingual-embedding-large", trust_remote_code=True) |
async def fetch_json(url, session): |
async with session.get(url) as response: |
return await response.json() |
async def combination_method(name, session): |
async with aiohttp.ClientSession() as session: |
data = set() |
new_name = name.split() |
x = itertools_combinations(new_name, 2) |
for i in x: |
new_word = (i[0] + " " + i[1]) |
url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={new_word}&srlimit=5&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
json_suggestion = await fetch_json(url, session) |
results = json_suggestion.get('query', {}).get('search') |
for i in results: |
data.add(i.get('title')) |
return data |
async def single_method(name, session): |
async with aiohttp.ClientSession() as session: |
data = set() |
new_name = name.replace("-", " ").replace("/", " ").split() |
for i in new_name: |
url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={i}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
json_suggestion = await fetch_json(url, session) |
results = json_suggestion.get('query', {}).get('search') |
for i in results: |
data.add(i.get('title')) |
return data |
async def mains(name, single, combi): |
data = set() |
disam_data = set() |
qids = set() |
async with aiohttp.ClientSession() as session: |
url = f"https://www.google.com/search?q={name} site:en.wikipedia.org inurl:/wiki/ -inurl:? -inurl:Category: -inurl:File: -inurl:Special: -inurl:Help:" |
html = requests.get(url, headers=headers) |
soup = BeautifulSoup(html.text, "html.parser") |
elements_with_href = soup.find_all(href=True) |
href_links = [element['href'] for element in elements_with_href] |
for link in href_links: |
if link.startswith('https://en.wikipedia.org/wiki/'): |
data.add(link.split("/")[-1]) |
wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=1&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
json_data = await fetch_json(wikipedia_url, session) |
suggestion = json_data.get('query', {}).get('searchinfo', {}).get('suggestion') |
if suggestion: |
suggested_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={suggestion}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
json_suggestion = await fetch_json(suggested_url, session) |
results = json_suggestion.get('query', {}).get('search') |
for i in results: |
data.add(i.get('title')) |
if data != {0}: |
for ids in data: |
titles = set() |
wikipedia_disambiguation = f"https://en.wikipedia.org/w/api.php?action=query&generator=links&format=json&redirects=1&pageids={ids}&prop=pageprops&gpllimit=50&ppprop=wikibase_item" |
json_id = await fetch_json(wikipedia_disambiguation, session) |
try: |
title = json_id.get('query').get('pages') |
for k, v in title.items(): |
titles.add(v.get("title")) |
except: |
pass |
if "Help:Disambiguation" in titles: |
for i in titles: |
if ":" not in i: |
disam_data.add(i) |
else: |
disam_data.add(ids) |
if combi == "Yes": |
if len(name.replace("-", " ").split()) >= 3: |
combination_names = await combination_method(name, session) |
for i in combination_names: |
disam_data.add(i) |
if single == "Yes": |
if len(name.replace("-", " ").replace("/", " ").split()) >= 2: |
singles = await single_method(name, session) |
for i in singles: |
disam_data.add(i) |
for ids in disam_data: |
try: |
wikibase_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={ids}&prop=pageprops&format=json" |
json_qid = await fetch_json(wikibase_url, session) |
wikidata_qid = json_qid.get('query', {}).get('pages', {}) |
for page_id, page_data in wikidata_qid.items(): |
page_props = page_data.get('pageprops', {}) |
wikibase_item = page_props.get('wikibase_item', None) |
if wikibase_item: |
qids.add(wikibase_item) |
except: |
pass |
with open(f"/home/user/app/qids_folder/{name}.json", "w") as f: |
json.dump(list(qids), f) |
async def get_results(query): |
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
url = "https://query.wikidata.org/sparql" |
sparql = SPARQLWrapper(url, agent=user_agent) |
sparql.setQuery(query) |
sparql.setReturnFormat(JSON) |
return sparql.query().convert() |
def get_resultss(query): |
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
url = "https://query.wikidata.org/sparql" |
sparql = SPARQLWrapper(url, agent=user_agent) |
sparql.setQuery(query) |
sparql.setReturnFormat(JSON) |
return sparql.query().convert() |
def cleaner(text): |
text = text.replace('\\', '').replace('\n', ' ') |
text = re.sub(r'\{.*?\}', '', text) |
text = re.sub(' +', ' ', text).strip() |
return text |
async def retriever(qid): |
async with aiohttp.ClientSession() as session: |
list_with_sent = [] |
query_label = f"""SELECT ?subjectLabel |
WHERE {{ |
wd:{qid} rdfs:label ?subjectLabel . |
FILTER(LANG(?subjectLabel) = "en") |
}} |
""" |
results = await get_results(query_label) |
label = None |
if results["results"]["bindings"]: |
for result in results["results"]["bindings"]: |
for key, value in result.items(): |
label = value.get("value", {}).lower() |
query_alias = f"""SELECT ?alias |
WHERE {{ |
wd:{qid} skos:altLabel ?alias |
FILTER(LANG(?alias) = "en") |
}} |
""" |
alias_list = [] |
results = await get_results(query_alias) |
for result in results["results"]["bindings"]: |
for key, value in result.items(): |
alias = value.get("value", "None") |
alias_list.append(alias) |
query_desci = f"""SELECT ?subjectLabel |
WHERE {{ |
?subjectLabel schema:about wd:{qid} ; |
schema:inLanguage "en" ; |
schema:isPartOf <https://en.wikipedia.org/> . |
}} |
""" |
results = await get_results(query_desci) |
cleaned_first_para = "None" |
if results["results"]["bindings"]: |
for result in results["results"]["bindings"]: |
for key, value in result.items(): |
desc = value.get("value", "None") |
title = desc.split("/wiki/")[1] |
url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles={title}&exintro=&exsentences=2&explaintext=&redirects=&formatversion=2&format=json" |
json_data = await fetch_json(url, session) |
cleaned_first_para = cleaner(json_data.get('query', {}).get('pages', [{}])[0].get('extract', 'None')) |
else: |
query_desc = f"""SELECT ?subjectLabel |
WHERE {{ |
wd:{qid} schema:description ?subjectLabel . |
FILTER(LANG(?subjectLabel) = "en") |
}} |
""" |
results = await get_results(query_desc) |
if results["results"]["bindings"]: |
for result in results["results"]["bindings"]: |
for key, value in result.items(): |
cleaned_first_para = value.get("value", "None") |
list_with_sent.append({"qid": qid, "label": label, "description": cleaned_first_para}) |
if alias_list: |
for alias in alias_list: |
list_with_sent.append({"qid": qid, "label": alias.lower(), "description": cleaned_first_para}) |
return list_with_sent |
async def main(name): |
with open(f"/home/user/app/qids_folder/{name}.json", "r") as f: |
final_list = [] |
qids = json.load(f) |
for q in qids: |
returned_list = await retriever(q) |
if returned_list: |
final_list.extend(returned_list) |
with open(f"/home/user/app/info_extraction/{name}.json", "w", encoding="utf-8") as flast: |
json.dump(final_list, flast) |
def main_cli(): |
st.title("✨ Entity Linking Application ✨") |
st.caption("This web application is part of my master’s dissertation.") |
if 'run_button' in st.session_state and st.session_state.run_button == True: |
st.session_state.running = True |
else: |
st.session_state.running = False |
api_token = st.text_input("Enter your API key from [GitHub](https://github.com/marketplace/models/azure-openai/gpt-4o):", "", type="password", disabled=st.session_state.running) |
if api_token: |
endpoint = "https://models.inference.ai.azure.com" |
model_name = "gpt-4o" |
client = OpenAI( |
base_url=endpoint, |
api_key=api_token, |
) |
st.success("API Token is set for this session.") |
else: |
st.warning("Please enter an API token to proceed.") |
input_sentence_user = st.text_input("Enter a sentence:", "", disabled=st.session_state.running) |
input_mention_user = st.text_input("Enter a textural reference (mention) that is inside the sentence:", "", disabled=st.session_state.running) |
single = st.selectbox("Search each word individually? (Useful for difficult mentions)", ['Yes', 'No'], index=1, disabled=st.session_state.running) |
combi = st.selectbox("Make combinations of each word? (Useful for difficult mentions)", ['Yes', 'No'], index=1, disabled=st.session_state.running) |
disambi = st.selectbox("Run acronym disambiguation? (Enable it if the mention include an acronym or if it is nested)", ['Yes', 'No'], index=0, disabled=st.session_state.running) |
if st.button("Run Entity Linking", key="run_button", disabled=st.session_state.running): |
if input_sentence_user and input_mention_user: |
if input_mention_user in input_sentence_user: |
with st.spinner("Applying Data Normalization module... (1/5)"): |
start_time = time.time() |
list_with_full_names = [] |
list_with_names_to_show = [] |
if disambi == "Yes": |
response = client.chat.completions.create( |
messages=[ |
{ |
"role": "system", |
"content": """ |
I will give you one or more labels within a sentence. Your task is as follows: |
Identify each label in the sentence, and check if it is an acronym. |
If the label is an acronym, respond with the full name of the acronym. |
If the label is not an acronym, respond with the label exactly as it was given to you. |
If a label contains multiple terms (e.g., 'phase and DIC microscopy'), treat each term within the label as a separate label. |
This means you should identify and explain each part of the label individually. |
Each part should be on its own line in the response. |
Context-Specific Terms: If the sentence context suggests a relevant term that applies to each label (such as "study" in 'morphological, sedimentological, and stratigraphical study'), add that term to each label’s explanation. |
Use context clues to determine the appropriate term to add (e.g., 'study' or 'microscopy'). |
Output Format: Your response should contain only the explanations, formatted as follows: |
Each label or part of a label should be on a new line. |
Do not include any additional text, and do not repeat the original sentence. |
Example 1: |
Input: |
label: phase and DIC microscopy |
context: Tardigrades have been extracted from samples using centrifugation with Ludox AM™ and mounted on individual microscope slides in Hoyer's medium for identification under phase and DIC microscopy. |
Expected response: |
phase: phase microscopy |
DIC microscopy: Differential interference contrast microscopy |
Example 2: |
Input: |
label: morphological, sedimentological, and stratigraphical study |
context: This paper presents results of a morphological, sedimentological, and stratigraphical study of relict beach ridges formed on a prograded coastal barrier in Bream Bay, North Island New Zealand. |
Expected response: |
morphological: morphological study |
sedimentological: sedimentological study |
stratigraphical: stratigraphical study |
Each label, even if nested within another, should be treated as an individual item. |
Each individual label or acronym should be output on a separate line. |
""" |
}, |
{ |
"role": "user", |
"content": f"label:{input_mention_user}, context:{input_sentence_user}" |
} |
], |
temperature=1.0, |
top_p=1.0, |
max_tokens=1000, |
model=model_name |
) |
kati = response.choices[0].message.content.splitlines() |
print(response.choices[0].message.content) |
for i in kati: |
context = i.split(":")[-1].strip() |
original_name = i.split(":")[0].strip() |
list_with_full_names.append(context) |
list_with_names_to_show.append(original_name) |
name = ",".join(list_with_full_names) |
else: |
name = input_mention_user |
list_with_full_names.append(name) |
list_with_names_to_show.append(name) |
input_sentence_user = input_sentence_user.replace(input_mention_user, name) |
response = client.chat.completions.create( |
messages=[ |
{ |
"role": "system", |
"content": "Given a label or labels within a sentence, provide a brief description (2-3 sentences) explaining what the label represents, similar to how a Wikipedia entry would. Format your response as follows: label: description. I want only the description of the label, not the role in the context. Include the label in the description as well. For example: Sentiment analysis: Sentiment analysis is the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.\nText analysis: Text mining, text data mining (TDM) or text analytics is the process of deriving high-quality information from text. It involves the discovery by computer of new, previously unknown information, by automatically extracting information from different written resources.", |
}, |
{ |
"role": "user", |
"content": f"label:{name}, context:{input_sentence_user}" |
} |
], |
temperature=1.0, |
top_p=1.0, |
max_tokens=1000, |
model=model_name |
) |
z = response.choices[0].message.content.splitlines() |
print(response.choices[0].message.content) |
list_with_contexts = [] |
for i in z: |
context = i.split(":")[-1].strip() |
list_with_contexts.append(context) |
st.write("✅ Applied Data Normilzation module (1/5)") |
async def big_main(mention, single, combi): |
mention = mention.split(",") |
with st.spinner("Applying Candidate Retrieval module... (2/5)"): |
for i in mention: |
await mains(i, single, combi) |
st.write("✅ Applied Candidate Retrieval module (2/5)") |
with st.spinner("Applying Information Gathering module... (3/5)"): |
for i in mention: |
await main(i) |
st.write("✅ Applied Information Gathering module (3/5)") |
asyncio.run(big_main(name, single, combi)) |
number = 0 |
for i,j,o in zip(list_with_full_names,list_with_contexts,list_with_names_to_show): |
number += 1 |
with st.spinner(f"Applying Candidate Selection module... (4/5) [{number}/{len(list_with_full_names)}] (This may take a while)"): |
with open(f"/home/user/app/info_extraction/{i}.json", "r") as f: |
json_file = json.load(f) |
lista = [] |
lista_1 = [] |
my_bar = st.progress(0) |
for index, element in enumerate(json_file): |
qid = element.get("qid") |
link = f"https://www.wikidata.org/wiki/{qid}" |
label = element.get("label") |
description = element.get("description") |
label_emb = model.encode([label]) |
desc_emb = model.encode([description]) |
lista.append({link: [label_emb, desc_emb]}) |
my_bar.progress((index + 1) / len(json_file)) |
print(qid) |
label_dataset_emb = model.encode([i]) |
desc_dataset_emb = model.encode([j]) |
for emb in lista: |
for k, v in emb.items(): |
cossim_label = model.similarity(label_dataset_emb, v[0][0]) |
desc_label = model.similarity(desc_dataset_emb, v[1][0]) |
emb_mean = np.mean([cossim_label, desc_label]) |
lista_1.append({k: emb_mean}) |
sorted_data = sorted(lista_1, key=lambda x: list(x.values())[0], reverse=True) |
my_bar.empty() |
st.write(f"✅ Applined Candidate Selection module (4/5) [{number}/{len(list_with_full_names)}]") |
with st.spinner(f"Applying Candidate Matching module... (5/5) [{number}/{len(list_with_full_names)}]"): |
if sorted_data: |
sorted_top = sorted_data[0] |
for k, v in sorted_top.items(): |
qid = k.split("/")[-1] |
wikidata2wikipedia = f""" |
SELECT ?wikipedia |
WHERE {{ |
?wikipedia schema:about wd:{qid} . |
?wikipedia schema:isPartOf <https://en.wikipedia.org/> . |
}} |
""" |
results = get_resultss(wikidata2wikipedia) |
for result in results["results"]["bindings"]: |
for key, value in result.items(): |
wikipedia = value.get("value", "None") |
sparql = SPARQLWrapper("http://dbpedia.org/sparql") |
wikidata2dbpedia = f""" |
SELECT ?dbpedia |
WHERE {{ |
?dbpedia owl:sameAs <http://www.wikidata.org/entity/{qid}>. |
}} |
""" |
sparql.setQuery(wikidata2dbpedia) |
sparql.setReturnFormat(JSON) |
results = sparql.query().convert() |
for result in results["results"]["bindings"]: |
dbpedia = result["dbpedia"]["value"] |
st.write(f"✅ Applied Candidate Matching module (5/5) [{number}/{len(list_with_full_names)}]") |
st.text(f"The correct entity for '{o}' is:") |
st.success(f"Wikipedia: {wikipedia}") |
st.success(f"Wikidata: {k}") |
st.success(f"DBpedia: {dbpedia}") |
else: |
st.warning(f"The entity: {o} is NIL.") |
else: |
st.warning(f"The mention '{input_mention_user}' was NOT found in the sentence.") |
else: |
st.warning("Please fill in both fields.") |
end_time = time.time() |
execution_time = end_time - start_time |
ETA = time.strftime("%H:%M:%S", time.gmtime(execution_time)) |
st.write(f"⌛ Execution time: {ETA}") |
st.button("Rerun", disabled=False) |
folder_path = "qids_folder" |
for filename in os.listdir(folder_path): |
file_path = os.path.join(folder_path, filename) |
os.remove(file_path) |
folder_path_1 = "info_extraction" |
for filename in os.listdir(folder_path_1): |
file_path = os.path.join(folder_path_1, filename) |
os.remove(file_path) |
if __name__ == "__main__": |
main_cli() |