NutriGenMePE / app.py
fadliaulawi's picture
Add time tracking for each process
e79e408 verified
# Author: Firqa Aqila Noor Arasyi
# Date: 2023-12-04
import os
import io
import json
import pandas as pd
import streamlit as st
from stqdm import stqdm
from ast import literal_eval
from tempfile import NamedTemporaryFile
from json_repair import repair_json
import PyPDF2
import pdf2image
import pytesseract
from utils import *
from schema import *
from summ import get_summ
from datetime import datetime
import time
import base64
import string
import random
import numpy as np
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import TextLoader
from chromadb.utils import embedding_functions
from unstructured.partition.pdf import partition_pdf
from unstructured.staging.base import elements_to_json
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chains import create_extraction_chain
from Bio import Entrez
nltk.download("punkt")
os.environ['OPENAI_API_KEY'] = os.getenv("OPENAI_API_KEY")
Entrez.email = os.getenv("ENTREZ_EMAIL")
Entrez.api_key = os.getenv("ENTREZ_API_KEY")
fold = -1
buffer = io.BytesIO()
st.cache_data()
def convert_df(df):
return df.to_csv().encode("utf-8")
# Function to create a download link for an Excel file
# def create_excel_download_link(df, file_name):
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# df.to_excel(writer, sheet_name='Sheet1', index=False)
# excel_data = output.getvalue()
# st.download_button(label="Download Excel File", data=excel_data, key=file_name, file_name=f"{file_name}.xlsx")
class Journal:
def __init__(self, name, bytes):
self.name = name
self.bytes = bytes
def __repr__(self):
return f"Journal(name='{self.name}', bytes='{self.bytes}')"
llm = ChatOpenAI(temperature=0, model="gpt-4-0125-preview")
textex_chain = create_extraction_chain(textex_schema, llm)
tablex_chain = create_extraction_chain(tablex_schema, llm)
st.set_page_config(page_title="NutriGenMe Paper Extractor")
st.title("NutriGenMe - Paper Extraction")
st.markdown("<div style='text-align: left; color: white; font-size: 16px'>In its latest version, the app is equipped to extract essential information from papers, including tables in both horizontal and vertical orientations, images, and text exclusively.</div><br>", unsafe_allow_html=True)
uploaded_files = st.file_uploader("Upload Paper(s) here :", type="pdf", accept_multiple_files=True)
if uploaded_files:
st.warning("""
Warning! Prior to proceeding, please take a moment to review the following : \n
Certain guidelines apply when utilizing this application, particularly if you intend to extract information from tables, whether they are oriented horizontally or vertically.
- If you intend to perform multiple PDF processes using Horizontal Table Extraction, ensure that all your PDF files adhere to a horizontal table format
- If you plan to undertake multiple PDF processes with Vertical Table Extraction, ensure that all your PDF files conform to a vertical table format
""", icon="⚠️")
col1, col2, col3 = st.columns(3)
if uploaded_files:
journals = []
strategy = "hi_res"
model_name = "yolox"
on_h, on_v, on_t = None, None, None
parseButtonH, parseButtonV, parseButtonT = None, None, None
# if uploaded_files:
with col1:
if on_v or on_t:
on_h = st.toggle("Horizontal Table Extraction", disabled=True)
else:
on_h = st.toggle("Horizontal Table Extraction")
if on_h:
chunk_size_h = st.selectbox(
'Tokens amounts per process :',
(120000, 96000, 64000, 32000), key='table_h'
)
parseButtonH = st.button("Get Result", key='table_H')
with col2:
if on_h or on_t:
on_v = st.toggle("Vertical Table Extraction", disabled=True)
else:
on_v = st.toggle("Vertical Table Extraction")
if on_v:
chunk_size_v = st.selectbox(
'Tokens amounts per process :',
(120000, 96000, 64000, 32000), key='table_v'
)
parseButtonV = st.button("Get Result", key='table_V')
with col3:
if on_h or on_v:
on_t = st.toggle("Text Extraction ", disabled=True)
else:
on_t = st.toggle("Text Extraction ")
if on_t:
chunk_size_t = st.selectbox(
'Tokens amounts per process :',
(120000, 96000, 64000, 32000), key='no_table'
)
parseButtonT = st.button("Get Result", key="no_Table")
if on_h:
if parseButtonH:
with st.status("Extraction in progress ...", expanded=True) as status:
st.write("Getting Result ...")
csv = pd.DataFrame()
for uploaded_file in stqdm(uploaded_files):
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf:
pdf.write(uploaded_file.getbuffer())
# st.write(pdf.name)
L = []
# Entity Extraction
start_time_ext = time.time()
st.write("β˜‘ Extracting Entities ...")
bytes_data = uploaded_file.read()
journal = Journal(uploaded_file.name, bytes_data)
images = pdf2image.convert_from_bytes(journal.bytes)
extracted_text = ""
for image in images[:-1]:
text = pytesseract.image_to_string(image)
text = clean_text(text)
extracted_text += text + " "
text = replace_quotes(extracted_text)
text_chunk = split_text(text, chunk_size_h)
chunkdf = []
for i, chunk in enumerate(text_chunk):
inp = chunk
try:
# Assuming tablex_chain.run(inp)[0] returns a dictionary
original_dict = tablex_chain.run(inp)[0]
# Convert the dictionary to a JSON string
json_str = json.dumps(original_dict)
# Replace single quotes with double quotes in the JSON string
json_str_fixed = json_str.replace("'", '"')
# Use literal_eval to safely evaluate the JSON string as a Python dictionary
fixed_dict = literal_eval(json_str_fixed)
# Create a DataFrame from the fixed dictionary
df = pd.DataFrame(fixed_dict, index=[0]).fillna('')
except:
try:
df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('')
except:
try:
df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0]) + ']').replace("\'", '\"')), index=[0]).fillna('')
except SyntaxError:
df = pd.DataFrame(literal_eval('[' + str(json.dumps(tablex_chain.run(inp)[0]) + ']').replace("\'", '\"')), index=[0]).fillna('')
# df = pd.DataFrame(repair_json(tablex_chain.run(inp)[0]))
chunkdf.append(df)
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('')
st.write("β˜‘ Entities Extraction Done ..", round((time.time() - start_time_ext) / 60, 2), "minutes")
time.sleep(0.1)
start_time_summ = time.time()
st.write("β˜‘ Generating Summary ...")
summary = get_summ(pdf.name)
st.write("β˜‘ Generating Summary Done ..", round((time.time() - start_time_summ) / 60, 2), "minutes")
time.sleep(0.1)
start_time_tab = time.time()
st.write("β˜‘ Table Extraction in progress ...")
# Table Extraction
# L = []
output_list = []
try:
elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name)
except:
elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True)
with NamedTemporaryFile(dir=".", suffix=".json") as f:
elements_to_json(elements, filename=f"{f.name.split('/')[-1]}")
json_file_path = os.path.abspath(f.name) # Get the absolute file path
with open(json_file_path, "r", encoding="utf-8") as jsonfile:
data = json.load(jsonfile)
extracted_elements = []
for entry in data:
if entry["type"] == "Table":
extracted_elements.append(entry["metadata"]["text_as_html"])
with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file:
text_file_path = os.path.abspath(txt_file.name)
with open(text_file_path, "w", encoding="utf-8") as txtfile:
for element in extracted_elements:
txtfile.write(element + "\n\n")
loader = TextLoader(text_file_path)
documents = loader.load()
# split it into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n")
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = Chroma.from_documents(docs, embeddings)
llm_table = ChatOpenAI(model_name="gpt-4-0125-preview", temperature=0)
qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever())
# List of questions
questions = [
"""Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this:
Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"}
""",
"""Mention all genes / locus name with respective potential diseases in a curly brackets like this:
Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesity"}
""",
"""Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this:
Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"}
"""
]
try:
for query in questions:
response = qa_chain({"query" : query})
output_list.append(response)
except Exception as e:
pass
db.delete_collection()
# 1
for i in range(len(output_list[0]['result'].split('\n'))):
# st.write(output_list[0]['result'].split('\n'))
if output_list[0]['result'].split('\n')[i] != "":
try:
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))[0]
st.write(row)
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
# 'Population' : concat['population_race'][0],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
# 'Sample Size' : concat['sample_size'][0]
}}
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
# 'Population' : concat['population_race'][0],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
# 'Sample Size' : concat['sample_size'][0],
'Genes' : g.strip().upper().replace('Unknown', ''),
'SNPs' : row['SNPs'].replace('Unknown', ''),
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '')
})
else:
L.append(row)
except KeyError:
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
# 'Population' : concat['population_race'][0],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
# 'Sample Size' : concat['sample_size'][0]
}
}
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
'Genes' : g.strip().upper().replace('Unknown', ''),
'SNPs' : row['SNPs'].replace('Unknown', ''),
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '')
})
else:
L.append(row)
except SyntaxError:
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))
row = f"""{row}"""
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
# 'Population' : concat['population_race'][0],
# 'Sample Size' : concat['sample_size'][0]
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except ValueError:
if type(output_list[0]['result'].split('\n')[i]) is dict:
row = repair_json(output_list[0]['result'].split('\n')[i])
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
# 2
for i in range(len(output_list[1]['result'].split('\n'))):
if output_list[1]['result'].split('\n')[i] != "":
try:
row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i]))[0]
st.write(row)
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if row['SNPs'] != "Not available":
row.update({
'SNPs' : "Not available"
})
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
'Genes' : g.strip().upper().replace('Unknown', ''),
"SNPs" : "Not available",
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '')
})
else:
L.append(row)
except KeyError:
row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if 'SNPs' in list(row.keys()):
if row['SNPs'] != "Not available":
row.update({
'SNPs' : "Not available"
})
else:
row.update({
'SNPs' : "Not available"
})
if 'Genes' in list(row.keys()):
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
'Genes' : g.strip().upper().replace('Unknown', ''),
"SNPs" : "Not available",
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '')
})
else:
L.append(row)
except SyntaxError:
row = f"""{row}"""
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except ValueError:
if type(output_list[1]['result'].split('\n')[i]) is dict:
row = output_list[1]['result'].split('\n')[i]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
# 3
for i in range(len(output_list[2]['result'].split('\n'))):
if output_list[2]['result'].split('\n')[i] != "":
try:
row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i]))[0]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except KeyError:
row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except SyntaxError:
row = f"""{row}"""
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except ValueError:
if type(output_list[2]['result'].split('\n')[i]) is dict:
row = output_list[2]['result'].split('\n')[i]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
st.write("β˜‘ Table Extraction Done ...", round((time.time() - start_time_summ) / 60, 2), "minutes")
status.update(label="Gene and SNPs succesfully collected.")
L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L]
L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L]
csv = pd.DataFrame(L)
st.dataframe(csv)
generated_key = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(16))
# if st.button("Download Excel File", key=generated_key):
# excel_link = create_excel_download_link(csv, uploaded_file.name.replace('.pdf', ''))
# st.markdown(excel_link, unsafe_allow_html=True)
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
# Write each dataframe to a different worksheet
csv.to_excel(writer, sheet_name='Result')
writer.close()
# time_now = datetime.now()
# current_time = time_now.strftime("%H:%M:%S")
csv = convert_df(csv)
st.download_button(
label="Save Result",
data=buffer,
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx',
mime='application/vnd.ms-excel',
key=generated_key
)
if on_v:
if parseButtonV:
with st.status("Extraction in progress ...", expanded=True) as status:
st.write("Getting Result ...")
csv = pd.DataFrame()
for uploaded_file in stqdm(uploaded_files):
L = []
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf:
pdf.write(uploaded_file.getbuffer())
# Open the PDF file in read-binary mode
with open(pdf.name, 'rb') as pdf_file:
# Create a PDF reader object
pdf_reader = PyPDF2.PdfReader(pdf_file)
# Create a PDF writer object to write the rotated pages to a new PDF
pdf_writer = PyPDF2.PdfWriter()
# Iterate through each page in the original PDF
for page_num in range(len(pdf_reader.pages)):
# Get the page object
page = pdf_reader.pages[page_num]
# Rotate the page 90 degrees clockwise (use -90 for counterclockwise)
page.rotate(90)
# Add the rotated page to the PDF writer
pdf_writer.add_page(page)
with NamedTemporaryFile(dir='.', suffix=".pdf") as rotated_pdf:
pdf_writer.write(rotated_pdf.name)
# Entity Extraction
start_time_ext = time.time()
st.write("β˜‘ Extracting Entities ...")
bytes_data = uploaded_file.read()
journal = Journal(uploaded_file.name, bytes_data)
images = pdf2image.convert_from_bytes(journal.bytes)
extracted_text = ""
for image in images[:-1]:
text = pytesseract.image_to_string(image)
text = clean_text(text)
extracted_text += text + " "
text = replace_quotes(extracted_text)
text_chunk = split_text(text, chunk_size_v)
chunkdf = []
for i, chunk in enumerate(text_chunk):
inp = chunk
# Assuming tablex_chain.run(inp)[0] returns a dictionary
original_dict = tablex_chain.run(inp)[0]
# Convert the dictionary to a JSON string
json_str = json.dumps(original_dict)
# Replace single quotes with double quotes in the JSON string
json_str_fixed = json_str.replace("'", '"')
# Use literal_eval to safely evaluate the JSON string as a Python dictionary
fixed_dict = literal_eval(json_str_fixed)
# Create a DataFrame from the fixed dictionary
df = pd.DataFrame(fixed_dict, index=[0]).fillna('')
# df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('')
chunkdf.append(df)
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('')
st.write("β˜‘ Entities Extraction Done ..", round((time.time() - start_time_ext) / 60, 2), "minutes")
time.sleep(0.1)
start_time_summ = time.time()
st.write("β˜‘ Generating Summary ...")
summary = get_summ(pdf.name)
st.write("β˜‘ Generating Summary Done ..", round((time.time() - start_time_summ) / 60, 2), "minutes")
time.sleep(0.1)
start_time_tab = time.time()
st.write("β˜‘ Table Extraction in progress ...")
# Table Extraction
output_list = []
elements = partition_pdf(filename=rotated_pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name)
with NamedTemporaryFile(dir=".", suffix=".json") as f:
elements_to_json(elements, filename=f"{f.name.split('/')[-1]}")
json_file_path = os.path.abspath(f.name) # Get the absolute file path
with open(json_file_path, "r", encoding="utf-8") as jsonfile:
data = json.load(jsonfile)
extracted_elements = []
for entry in data:
if entry["type"] == "Table":
extracted_elements.append(entry["metadata"]["text_as_html"])
with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file:
text_file_path = os.path.abspath(txt_file.name)
with open(text_file_path, "w", encoding="utf-8") as txtfile:
for element in extracted_elements:
txtfile.write(element + "\n\n")
loader = TextLoader(text_file_path)
documents = loader.load()
# split it into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n")
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = Chroma.from_documents(docs, embeddings)
llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0)
qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever())
# List of questions
questions = [
"""Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this:
Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"}
""",
"""Mention all genes / locus name with respective potential diseases in a curly brackets like this:
Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesitya"}
""",
"""Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this:
Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"}
"""
]
try:
for query in questions:
response = qa_chain({"query" : query})
output_list.append(response)
except Exception as e:
pass
db.delete_collection()
# 1
for i in range(len(output_list[0]['result'].split('\n'))):
if output_list[0]['result'].split('\n')[i] != "":
try:
row = literal_eval(output_list[0]['result'].split('\n')[i])[0]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
}}
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Genes' : g.strip().upper(),
'SNPs' : row['SNPs'],
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''),
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(),
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
})
else:
L.append(row)
except KeyError:
row = literal_eval(output_list[0]['result'].split('\n')[i])
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}}
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Genes' : g.strip().upper(),
'SNPs' : row['SNPs'],
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''),
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
})
else:
L.append(row)
except ValueError:
if type(output_list[0]['result'].split('\n')[i]) is dict:
row = output_list[0]['result'].split('\n')[i]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except SyntaxError:
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
# 2
for i in range(len(output_list[1]['result'].split('\n'))):
if output_list[1]['result'].split('\n')[i] != "":
try:
row = literal_eval(output_list[1]['result'].split('\n')[i])[0]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}}
if row['SNPs'] != "Not available":
row.update({
'SNPs' : "Not available"
})
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Genes' : g.strip().upper(),
"SNPs" : "Not available",
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''),
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
})
else:
L.append(row)
except KeyError:
row = literal_eval(output_list[1]['result'].split('\n')[i])
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}}
if row['SNPs'] != "Not available":
row.update({
'SNPs' : "Not available"
})
if len(row['Genes'].strip().split(',')) > 1:
for g in row['Genes'].strip().split(','):
L.append({
'Genes' : g.strip().upper(),
"SNPs" : "Not available",
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''),
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
})
else:
L.append(row)
except ValueError:
if type(output_list[1]['result'].split('\n')[i]) is dict:
row = output_list[1]['result'].split('\n')[i]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except SyntaxError:
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
# 3
for i in range(len(output_list[2]['result'].split('\n'))):
if output_list[2]['result'].split('\n')[i] != "":
try:
row = literal_eval(output_list[2]['result'].split('\n')[i])[0]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except KeyError:
row = literal_eval(output_list[2]['result'].split('\n')[i])
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except ValueError:
if type(output_list[2]['result'].split('\n')[i]) is dict:
row = output_list[2]['result'].split('\n')[i]
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
except SyntaxError:
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i]))
row = {**row, **{
'Title' : concat['title'][0],
'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'],
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'],
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'],
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'],
'Recommendation' : summary,
}
}
if not row['SNPs'].startswith("rs"):
row.update({
'SNPs' : "-"
})
else:
L.append(row)
st.write("β˜‘ Table Extraction Done", round((time.time() - start_time_summ) / 60, 2), "minutes")
status.update(label="Gene and SNPs succesfully collected.")
L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L]
L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L]
csv = pd.DataFrame(L)
st.dataframe(csv)
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
# Write each dataframe to a different worksheet
csv.to_excel(writer, sheet_name='Result')
writer.close()
time_now = datetime.now()
current_time = time_now.strftime("%H:%M:%S")
csv = convert_df(csv)
st.download_button(
label="Save Result",
data=buffer,
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx',
mime='application/vnd.ms-excel'
)
if on_t:
if parseButtonT:
with st.status("Extraction in progress ...", expanded=True) as status:
st.write("Getting Result ...")
csv = pd.DataFrame()
for uploaded_file in stqdm(uploaded_files):
L = []
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf:
pdf.write(uploaded_file.getbuffer())
# Entity Extraction
start_time_ext = time.time()
st.write("β˜‘ Extracting Entities ...")
bytes_data = uploaded_file.read()
journal = Journal(uploaded_file.name, bytes_data)
images = pdf2image.convert_from_bytes(journal.bytes)
extracted_text = ""
for image in images[:-1]:
text = pytesseract.image_to_string(image)
text = clean_text(text)
extracted_text += text + " "
text = replace_quotes(extracted_text)
text_chunk = split_text(text, chunk_size_t)
chunkdf = []
for i, chunk in enumerate(text_chunk):
inp = chunk
df = pd.DataFrame(literal_eval(str(json.dumps(textex_chain.run(inp)[0])).replace("\'", "\"")), index=[0]).fillna('')
chunkdf.append(df)
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('')
st.write("β˜‘ Entities Extraction Done ..", round((time.time() - start_time_ext) / 60, 2), "minutes")
time.sleep(0.1)
start_time_summ = time.time()
st.write("β˜‘ Generating Summary ...")
if 'SNPs' in list(concat.columns):
concat['SNPs'] = concat['SNPs'].apply(lambda x: x if x.startswith('rs') else '')
else:
concat['SNPs'] = ''
for col in list(concat.columns):
concat[col] = concat[col].apply(lambda x: x if x not in ['N/A', 'not mentioned', 'Not mentioned', 'Unknown'] else '')
summary = get_summ(pdf.name)
time.sleep(0.1)
st.write("β˜‘ Generating Summary Done...", round((time.time() - start_time_summ) / 60, 2), "minutes")
for i in range(len(concat)):
if (len(concat['genes_locus'][i].split(',')) >= 1) and concat['SNPs'][i] == '':
for g in concat['genes_locus'][i].split(','):
L.append({
'Title' : concat['title'][0],
'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())),
'Genes' : g.upper(),
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()),
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()),
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())),
'SNPs' : concat['SNPs'][i],
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()),
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()),
'Recommendation' : summary,
})
elif (len(concat['SNPs'][i].split(',')) >= 1):
for s in concat['SNPs'][i].split(','):
try:
L.append({
'Title' : concat['title'][0],
'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())),
'Genes' : get_geneName(s.strip()).upper(),
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()),
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()),
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())),
'SNPs' : s,
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()),
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()),
'Recommendation' : summary,
})
except Exception as e:
L.append({
'Title' : concat['title'][0],
'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '',
'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '',
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())),
'Genes' : '',
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()),
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()),
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())),
'SNPs' : s,
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()),
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()),
'Recommendation' : summary,
})
csv = pd.concat([csv, pd.DataFrame(L)], ignore_index=True).drop_duplicates(subset='Genes')
status.update(label="Gene and SNPs succesfully collected.")
st.dataframe(csv)
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
# Write each dataframe to a different worksheet
csv.to_excel(writer, sheet_name='Result')
writer.close()
time_now = datetime.now()
current_time = time_now.strftime("%H:%M:%S")
csv = convert_df(csv)
st.download_button(
label="Save Result",
data=buffer,
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx',
mime='application/vnd.ms-excel'
)