Spaces:
Build error
Build error
# Author: Firqa Aqila Noor Arasyi | |
# Date: 2023-12-04 | |
import os | |
import io | |
import json | |
import pandas as pd | |
import streamlit as st | |
from stqdm import stqdm | |
from ast import literal_eval | |
from tempfile import NamedTemporaryFile | |
from json_repair import repair_json | |
import PyPDF2 | |
import pdf2image | |
import pytesseract | |
from utils import * | |
from schema import * | |
from summ import get_summ | |
from datetime import datetime | |
import time | |
import base64 | |
import string | |
import random | |
import numpy as np | |
from langchain.llms import OpenAI | |
from langchain.chains import RetrievalQA | |
from langchain.vectorstores import Chroma | |
from langchain.chat_models import ChatOpenAI | |
from langchain.document_loaders import TextLoader | |
from chromadb.utils import embedding_functions | |
from unstructured.partition.pdf import partition_pdf | |
from unstructured.staging.base import elements_to_json | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.chains import create_extraction_chain | |
from Bio import Entrez | |
nltk.download("punkt") | |
os.environ['OPENAI_API_KEY'] = os.getenv("OPENAI_API_KEY") | |
Entrez.email = os.getenv("ENTREZ_EMAIL") | |
Entrez.api_key = os.getenv("ENTREZ_API_KEY") | |
fold = -1 | |
buffer = io.BytesIO() | |
st.cache_data() | |
def convert_df(df): | |
return df.to_csv().encode("utf-8") | |
# Function to create a download link for an Excel file | |
# def create_excel_download_link(df, file_name): | |
# output = io.BytesIO() | |
# with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
# df.to_excel(writer, sheet_name='Sheet1', index=False) | |
# excel_data = output.getvalue() | |
# st.download_button(label="Download Excel File", data=excel_data, key=file_name, file_name=f"{file_name}.xlsx") | |
class Journal: | |
def __init__(self, name, bytes): | |
self.name = name | |
self.bytes = bytes | |
def __repr__(self): | |
return f"Journal(name='{self.name}', bytes='{self.bytes}')" | |
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-1106") | |
textex_chain = create_extraction_chain(textex_schema, llm) | |
tablex_chain = create_extraction_chain(tablex_schema, llm) | |
st.set_page_config(page_title="NutriGenMe Paper Extractor") | |
st.title("NutriGenMe - Paper Extraction") | |
st.markdown("<div style='text-align: left; color: white; font-size: 16px'>In its latest version, the app is equipped to extract essential information from papers, including tables in both horizontal and vertical orientations, images, and text exclusively.</div><br>", unsafe_allow_html=True) | |
uploaded_files = st.file_uploader("Upload Paper(s) here :", type="pdf", accept_multiple_files=True) | |
if uploaded_files: | |
st.warning(""" | |
Warning! Prior to proceeding, please take a moment to review the following : \n | |
Certain guidelines apply when utilizing this application, particularly if you intend to extract information from tables, whether they are oriented horizontally or vertically. | |
- If you intend to perform multiple PDF processes using Horizontal Table Extraction, ensure that all your PDF files adhere to a horizontal table format | |
- If you plan to undertake multiple PDF processes with Vertical Table Extraction, ensure that all your PDF files conform to a vertical table format | |
""", icon="β οΈ") | |
col1, col2, col3 = st.columns(3) | |
if uploaded_files: | |
journals = [] | |
strategy = "hi_res" | |
model_name = "yolox" | |
on_h, on_v, on_t = None, None, None | |
parseButtonH, parseButtonV, parseButtonT = None, None, None | |
# if uploaded_files: | |
with col1: | |
if on_v or on_t: | |
on_h = st.toggle("Horizontal Table Extraction", disabled=True) | |
else: | |
on_h = st.toggle("Horizontal Table Extraction") | |
if on_h: | |
chunk_size_h = st.selectbox( | |
'Tokens amounts per process :', | |
(15000, 12000, 10000, 8000, 5000), key='table_h' | |
) | |
parseButtonH = st.button("Get Result", key='table_H') | |
with col2: | |
if on_h or on_t: | |
on_v = st.toggle("Vertical Table Extraction", disabled=True) | |
else: | |
on_v = st.toggle("Vertical Table Extraction") | |
if on_v: | |
chunk_size_v = st.selectbox( | |
'Tokens amounts per process :', | |
(15000, 12000, 10000, 8000, 5000), key='table_v' | |
) | |
parseButtonV = st.button("Get Result", key='table_V') | |
with col3: | |
if on_h or on_v: | |
on_t = st.toggle("Text Extraction ", disabled=True) | |
else: | |
on_t = st.toggle("Text Extraction ") | |
if on_t: | |
chunk_size_t = st.selectbox( | |
'Tokens amounts per process :', | |
(15000, 12000, 10000, 8000, 5000), key='no_table' | |
) | |
parseButtonT = st.button("Get Result", key="no_Table") | |
if on_h: | |
if parseButtonH: | |
with st.status("Extraction in progress ...", expanded=True) as status: | |
st.write("Getting Result ...") | |
csv = pd.DataFrame() | |
for uploaded_file in stqdm(uploaded_files): | |
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
pdf.write(uploaded_file.getbuffer()) | |
# st.write(pdf.name) | |
L = [] | |
# Entity Extraction | |
st.write("β Extracting Entities ...") | |
bytes_data = uploaded_file.read() | |
journal = Journal(uploaded_file.name, bytes_data) | |
images = pdf2image.convert_from_bytes(journal.bytes) | |
extracted_text = "" | |
for image in images[:-1]: | |
text = pytesseract.image_to_string(image) | |
text = clean_text(text) | |
extracted_text += text + " " | |
text = replace_quotes(extracted_text) | |
text_chunk = split_text(text, chunk_size_h) | |
chunkdf = [] | |
for i, chunk in enumerate(text_chunk): | |
inp = chunk | |
df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') | |
# df = pd.DataFrame(repair_json(tablex_chain.run(inp)[0])) | |
chunkdf.append(df) | |
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
st.write("β Entities Extraction Done ..") | |
time.sleep(0.1) | |
st.write("β Generating Summary ...") | |
summary = get_summ(pdf.name) | |
st.write("β Generating Summary Done ..") | |
time.sleep(0.1) | |
st.write("β Table Extraction in progress ...") | |
# Table Extraction | |
# L = [] | |
output_list = [] | |
elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) | |
with NamedTemporaryFile(dir=".", suffix=".json") as f: | |
elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") | |
json_file_path = os.path.abspath(f.name) # Get the absolute file path | |
with open(json_file_path, "r", encoding="utf-8") as jsonfile: | |
data = json.load(jsonfile) | |
extracted_elements = [] | |
for entry in data: | |
if entry["type"] == "Table": | |
extracted_elements.append(entry["metadata"]["text_as_html"]) | |
with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: | |
text_file_path = os.path.abspath(txt_file.name) | |
with open(text_file_path, "w", encoding="utf-8") as txtfile: | |
for element in extracted_elements: | |
txtfile.write(element + "\n\n") | |
loader = TextLoader(text_file_path) | |
documents = loader.load() | |
# split it into chunks | |
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") | |
docs = text_splitter.split_documents(documents) | |
embeddings = OpenAIEmbeddings() | |
db = Chroma.from_documents(docs, embeddings) | |
llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0) | |
qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) | |
# List of questions | |
questions = [ | |
"""Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: | |
Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
""", | |
"""Mention all genes / locus name with respective potential diseases in a curly brackets like this: | |
Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesity"} | |
""", | |
"""Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: | |
Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
""" | |
] | |
try: | |
for query in questions: | |
response = qa_chain({"query" : query}) | |
output_list.append(response) | |
except Exception as e: | |
pass | |
db.delete_collection() | |
st.write(concat) | |
# 1 | |
for i in range(len(output_list[0]['result'].split('\n'))): | |
st.write(output_list[0]['result'].split('\n')) | |
if output_list[0]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))[0] | |
st.write(row) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
# 'Population' : concat['population_race'][0], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
# 'Sample Size' : concat['sample_size'][0] | |
}} | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
# 'Population' : concat['population_race'][0], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
# 'Sample Size' : concat['sample_size'][0], | |
'Genes' : g.strip().upper().replace('Unknown', ''), | |
'SNPs' : row['SNPs'].replace('Unknown', ''), | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
"Publisher Name" : concat['publisher_name'][0], | |
# 'Population' : concat['population_race'][0], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
# 'Sample Size' : concat['sample_size'][0] | |
} | |
} | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
'Genes' : g.strip().upper().replace('Unknown', ''), | |
'SNPs' : row['SNPs'].replace('Unknown', ''), | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) | |
row = f"""{row}""" | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
# 'Population' : concat['population_race'][0], | |
# 'Sample Size' : concat['sample_size'][0] | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[0]['result'].split('\n')[i]) is dict: | |
row = repair_json(output_list[0]['result'].split('\n')[i]) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
# 2 | |
for i in range(len(output_list[1]['result'].split('\n'))): | |
if output_list[1]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i]))[0] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if row['SNPs'] != "Not available": | |
row.update({ | |
'SNPs' : "Not available" | |
}) | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
'Genes' : g.strip().upper().replace('Unknown', ''), | |
"SNPs" : "Not available", | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if row['SNPs'] != "Not available": | |
row.update({ | |
'SNPs' : "Not available" | |
}) | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
'Genes' : g.strip().upper().replace('Unknown', ''), | |
"SNPs" : "Not available", | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = f"""{row}""" | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[1]['result'].split('\n')[i]) is dict: | |
row = output_list[1]['result'].split('\n')[i] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
# 3 | |
for i in range(len(output_list[2]['result'].split('\n'))): | |
if output_list[2]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i]))[0] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = f"""{row}""" | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[2]['result'].split('\n')[i]) is dict: | |
row = output_list[2]['result'].split('\n')[i] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
st.write(output_list[2]['result'].split('\n')) | |
st.write("β Table Extraction Done ...") | |
status.update(label="Gene and SNPs succesfully collected.") | |
L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] | |
L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] | |
csv = pd.DataFrame(L) | |
st.dataframe(csv) | |
generated_key = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(16)) | |
# if st.button("Download Excel File", key=generated_key): | |
# excel_link = create_excel_download_link(csv, uploaded_file.name.replace('.pdf', '')) | |
# st.markdown(excel_link, unsafe_allow_html=True) | |
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
# Write each dataframe to a different worksheet | |
csv.to_excel(writer, sheet_name='Result') | |
writer.close() | |
# time_now = datetime.now() | |
# current_time = time_now.strftime("%H:%M:%S") | |
csv = convert_df(csv) | |
st.download_button( | |
label="Save Result", | |
data=buffer, | |
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
mime='application/vnd.ms-excel', | |
key=generated_key | |
) | |
if on_v: | |
if parseButtonV: | |
with st.status("Extraction in progress ...", expanded=True) as status: | |
st.write("Getting Result ...") | |
csv = pd.DataFrame() | |
for uploaded_file in stqdm(uploaded_files): | |
L = [] | |
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
pdf.write(uploaded_file.getbuffer()) | |
# Open the PDF file in read-binary mode | |
with open(pdf.name, 'rb') as pdf_file: | |
# Create a PDF reader object | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
# Create a PDF writer object to write the rotated pages to a new PDF | |
pdf_writer = PyPDF2.PdfWriter() | |
# Iterate through each page in the original PDF | |
for page_num in range(len(pdf_reader.pages)): | |
# Get the page object | |
page = pdf_reader.pages[page_num] | |
# Rotate the page 90 degrees clockwise (use -90 for counterclockwise) | |
page.rotate(90) | |
# Add the rotated page to the PDF writer | |
pdf_writer.add_page(page) | |
with NamedTemporaryFile(dir='.', suffix=".pdf") as rotated_pdf: | |
pdf_writer.write(rotated_pdf.name) | |
# Entity Extraction | |
st.write("β Extracting Entities ...") | |
bytes_data = uploaded_file.read() | |
journal = Journal(uploaded_file.name, bytes_data) | |
images = pdf2image.convert_from_bytes(journal.bytes) | |
extracted_text = "" | |
for image in images[:-1]: | |
text = pytesseract.image_to_string(image) | |
text = clean_text(text) | |
extracted_text += text + " " | |
text = replace_quotes(extracted_text) | |
text_chunk = split_text(text, chunk_size_v) | |
chunkdf = [] | |
for i, chunk in enumerate(text_chunk): | |
inp = chunk | |
df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') | |
chunkdf.append(df) | |
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
st.write("β Entities Extraction Done ..") | |
time.sleep(0.1) | |
st.write("β Generating Summary ...") | |
summary = get_summ(pdf.name) | |
st.write("β Generating Summary Done ..") | |
time.sleep(0.1) | |
st.write("β Table Extraction in progress ...") | |
# Table Extraction | |
output_list = [] | |
elements = partition_pdf(filename=rotated_pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) | |
with NamedTemporaryFile(dir=".", suffix=".json") as f: | |
elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") | |
json_file_path = os.path.abspath(f.name) # Get the absolute file path | |
with open(json_file_path, "r", encoding="utf-8") as jsonfile: | |
data = json.load(jsonfile) | |
extracted_elements = [] | |
for entry in data: | |
if entry["type"] == "Table": | |
extracted_elements.append(entry["metadata"]["text_as_html"]) | |
with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: | |
text_file_path = os.path.abspath(txt_file.name) | |
with open(text_file_path, "w", encoding="utf-8") as txtfile: | |
for element in extracted_elements: | |
txtfile.write(element + "\n\n") | |
loader = TextLoader(text_file_path) | |
documents = loader.load() | |
# split it into chunks | |
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") | |
docs = text_splitter.split_documents(documents) | |
embeddings = OpenAIEmbeddings() | |
db = Chroma.from_documents(docs, embeddings) | |
llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0) | |
qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) | |
# List of questions | |
questions = [ | |
"""Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: | |
Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
""", | |
"""Mention all genes / locus name with respective potential diseases in a curly brackets like this: | |
Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesitya"} | |
""", | |
"""Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: | |
Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
""" | |
] | |
try: | |
for query in questions: | |
response = qa_chain({"query" : query}) | |
output_list.append(response) | |
except Exception as e: | |
pass | |
db.delete_collection() | |
# 1 | |
for i in range(len(output_list[0]['result'].split('\n'))): | |
if output_list[0]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(output_list[0]['result'].split('\n')[i])[0] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}} | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Genes' : g.strip().upper(), | |
'SNPs' : row['SNPs'], | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(output_list[0]['result'].split('\n')[i]) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}} | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Genes' : g.strip().upper(), | |
'SNPs' : row['SNPs'], | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[0]['result'].split('\n')[i]) is dict: | |
row = output_list[0]['result'].split('\n')[i] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
# 2 | |
for i in range(len(output_list[1]['result'].split('\n'))): | |
if output_list[1]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(output_list[1]['result'].split('\n')[i])[0] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}} | |
if row['SNPs'] != "Not available": | |
row.update({ | |
'SNPs' : "Not available" | |
}) | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Genes' : g.strip().upper(), | |
"SNPs" : "Not available", | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(output_list[1]['result'].split('\n')[i]) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}} | |
if row['SNPs'] != "Not available": | |
row.update({ | |
'SNPs' : "Not available" | |
}) | |
if len(row['Genes'].strip().split(',')) > 1: | |
for g in row['Genes'].strip().split(','): | |
L.append({ | |
'Genes' : g.strip().upper(), | |
"SNPs" : "Not available", | |
"Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[1]['result'].split('\n')[i]) is dict: | |
row = output_list[1]['result'].split('\n')[i] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
# 3 | |
for i in range(len(output_list[2]['result'].split('\n'))): | |
if output_list[2]['result'].split('\n')[i] != "": | |
try: | |
row = literal_eval(output_list[2]['result'].split('\n')[i])[0] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except KeyError: | |
row = literal_eval(output_list[2]['result'].split('\n')[i]) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except ValueError: | |
if type(output_list[2]['result'].split('\n')[i]) is dict: | |
row = output_list[2]['result'].split('\n')[i] | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
except SyntaxError: | |
row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
row = {**row, **{ | |
'Title' : concat['title'][0], | |
'Authors' : concat['authors'][0], | |
'Publisher Name' : concat['publisher_name'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), | |
'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), | |
'Recommendation' : summary, | |
} | |
} | |
if not row['SNPs'].startswith("rs"): | |
row.update({ | |
'SNPs' : "-" | |
}) | |
else: | |
L.append(row) | |
st.write("β Table Extraction Done") | |
status.update(label="Gene and SNPs succesfully collected.") | |
L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] | |
L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] | |
csv = pd.DataFrame(L) | |
st.dataframe(csv) | |
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
# Write each dataframe to a different worksheet | |
csv.to_excel(writer, sheet_name='Result') | |
writer.close() | |
time_now = datetime.now() | |
current_time = time_now.strftime("%H:%M:%S") | |
csv = convert_df(csv) | |
st.download_button( | |
label="Save Result", | |
data=buffer, | |
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
mime='application/vnd.ms-excel' | |
) | |
if on_t: | |
if parseButtonT: | |
with st.status("Extraction in progress ...", expanded=True) as status: | |
st.write("Getting Result ...") | |
csv = pd.DataFrame() | |
for uploaded_file in stqdm(uploaded_files): | |
L = [] | |
with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
pdf.write(uploaded_file.getbuffer()) | |
# Entity Extraction | |
st.write("β Extracting Entities ...") | |
bytes_data = uploaded_file.read() | |
journal = Journal(uploaded_file.name, bytes_data) | |
images = pdf2image.convert_from_bytes(journal.bytes) | |
extracted_text = "" | |
for image in images[:-1]: | |
text = pytesseract.image_to_string(image) | |
text = clean_text(text) | |
extracted_text += text + " " | |
text = replace_quotes(extracted_text) | |
text_chunk = split_text(text, chunk_size_t) | |
chunkdf = [] | |
for i, chunk in enumerate(text_chunk): | |
inp = chunk | |
df = pd.DataFrame(literal_eval(str(json.dumps(textex_chain.run(inp)[0])).replace("\'", "\"")), index=[0]).fillna('') | |
chunkdf.append(df) | |
concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
st.write("β Entities Extraction Done ..") | |
time.sleep(0.1) | |
st.write("β Generating Summary ...") | |
concat['SNPs'] = concat['SNPs'].apply(lambda x: x if x.startswith('rs') else '') | |
for col in list(concat.columns): | |
concat[col] = concat[col].apply(lambda x: x if x not in ['N/A', 'not mentioned', 'Not mentioned', 'Unknown'] else '') | |
summary = get_summ(pdf.name) | |
time.sleep(0.1) | |
st.write("β Generating Summary Done...") | |
for i in range(len(concat)): | |
if (len(concat['genes_locus'][i].split(',')) >= 1) and concat['SNPs'][i] == '': | |
for g in concat['genes_locus'][i].split(','): | |
L.append({ | |
'Title' : concat['title'][0], | |
'Author' : concat['authors'][0], | |
'Publisher Name' : concat['publisher'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
'Genes' : g.upper(), | |
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
'SNPs' : concat['SNPs'][i], | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
'Recommendation' : summary, | |
}) | |
elif (len(concat['SNPs'][i].split(',')) >= 1): | |
for s in concat['SNPs'][i].split(','): | |
try: | |
L.append({ | |
'Title' : concat['title'][0], | |
'Author' : concat['authors'][0], | |
'Publisher Name' : concat['publisher'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
'Genes' : get_geneName(s.strip()).upper(), | |
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
'SNPs' : s, | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
'Recommendation' : summary, | |
}) | |
except Exception as e: | |
L.append({ | |
'Title' : concat['title'][0], | |
'Author' : concat['authors'][0], | |
'Publisher Name' : concat['publisher'][0], | |
'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
'Genes' : '', | |
'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
'SNPs' : s, | |
'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
'Recommendation' : summary, | |
}) | |
csv = pd.concat([csv, pd.DataFrame(L)], ignore_index=True) | |
status.update(label="Gene and SNPs succesfully collected.") | |
st.dataframe(csv) | |
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
# Write each dataframe to a different worksheet | |
csv.to_excel(writer, sheet_name='Result') | |
writer.close() | |
time_now = datetime.now() | |
current_time = time_now.strftime("%H:%M:%S") | |
csv = convert_df(csv) | |
st.download_button( | |
label="Save Result", | |
data=buffer, | |
file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
mime='application/vnd.ms-excel' | |
) |