# Author: Firqa Aqila Noor Arasyi # Date: 2023-12-04 import os import io import json import pandas as pd import streamlit as st from stqdm import stqdm from ast import literal_eval from tempfile import NamedTemporaryFile from json_repair import repair_json import PyPDF2 import pdf2image import pytesseract from utils import * from schema import * from summ import get_summ from datetime import datetime import time import base64 import string import random import numpy as np from langchain.llms import OpenAI from langchain.chains import RetrievalQA from langchain.vectorstores import Chroma from langchain.chat_models import ChatOpenAI from langchain.document_loaders import TextLoader from chromadb.utils import embedding_functions from unstructured.partition.pdf import partition_pdf from unstructured.staging.base import elements_to_json from langchain.text_splitter import CharacterTextSplitter from langchain.embeddings.openai import OpenAIEmbeddings from langchain.chains import create_extraction_chain from Bio import Entrez nltk.download("punkt") os.environ['OPENAI_API_KEY'] = os.getenv("OPENAI_API_KEY") Entrez.email = os.getenv("ENTREZ_EMAIL") Entrez.api_key = os.getenv("ENTREZ_API_KEY") fold = -1 buffer = io.BytesIO() st.cache_data() def convert_df(df): return df.to_csv().encode("utf-8") # Function to create a download link for an Excel file # def create_excel_download_link(df, file_name): # output = io.BytesIO() # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: # df.to_excel(writer, sheet_name='Sheet1', index=False) # excel_data = output.getvalue() # st.download_button(label="Download Excel File", data=excel_data, key=file_name, file_name=f"{file_name}.xlsx") class Journal: def __init__(self, name, bytes): self.name = name self.bytes = bytes def __repr__(self): return f"Journal(name='{self.name}', bytes='{self.bytes}')" llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-1106") textex_chain = create_extraction_chain(textex_schema, llm) tablex_chain = create_extraction_chain(tablex_schema, llm) st.set_page_config(page_title="NutriGenMe Paper Extractor") st.title("NutriGenMe - Paper Extraction") st.markdown("
In its latest version, the app is equipped to extract essential information from papers, including tables in both horizontal and vertical orientations, images, and text exclusively.

", unsafe_allow_html=True) uploaded_files = st.file_uploader("Upload Paper(s) here :", type="pdf", accept_multiple_files=True) if uploaded_files: st.warning(""" Warning! Prior to proceeding, please take a moment to review the following : \n Certain guidelines apply when utilizing this application, particularly if you intend to extract information from tables, whether they are oriented horizontally or vertically. - If you intend to perform multiple PDF processes using Horizontal Table Extraction, ensure that all your PDF files adhere to a horizontal table format - If you plan to undertake multiple PDF processes with Vertical Table Extraction, ensure that all your PDF files conform to a vertical table format """, icon="⚠️") col1, col2, col3 = st.columns(3) if uploaded_files: journals = [] strategy = "hi_res" model_name = "yolox" on_h, on_v, on_t = None, None, None parseButtonH, parseButtonV, parseButtonT = None, None, None # if uploaded_files: with col1: if on_v or on_t: on_h = st.toggle("Horizontal Table Extraction", disabled=True) else: on_h = st.toggle("Horizontal Table Extraction") if on_h: chunk_size_h = st.selectbox( 'Tokens amounts per process :', (15000, 12000, 10000, 8000, 5000), key='table_h' ) parseButtonH = st.button("Get Result", key='table_H') with col2: if on_h or on_t: on_v = st.toggle("Vertical Table Extraction", disabled=True) else: on_v = st.toggle("Vertical Table Extraction") if on_v: chunk_size_v = st.selectbox( 'Tokens amounts per process :', (15000, 12000, 10000, 8000, 5000), key='table_v' ) parseButtonV = st.button("Get Result", key='table_V') with col3: if on_h or on_v: on_t = st.toggle("Text Extraction ", disabled=True) else: on_t = st.toggle("Text Extraction ") if on_t: chunk_size_t = st.selectbox( 'Tokens amounts per process :', (15000, 12000, 10000, 8000, 5000), key='no_table' ) parseButtonT = st.button("Get Result", key="no_Table") if on_h: if parseButtonH: with st.status("Extraction in progress ...", expanded=True) as status: st.write("Getting Result ...") csv = pd.DataFrame() for uploaded_file in stqdm(uploaded_files): with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: pdf.write(uploaded_file.getbuffer()) # st.write(pdf.name) L = [] # Entity Extraction st.write("☑ Extracting Entities ...") bytes_data = uploaded_file.read() journal = Journal(uploaded_file.name, bytes_data) images = pdf2image.convert_from_bytes(journal.bytes) extracted_text = "" for image in images[:-1]: text = pytesseract.image_to_string(image) text = clean_text(text) extracted_text += text + " " text = replace_quotes(extracted_text) text_chunk = split_text(text, chunk_size_h) chunkdf = [] for i, chunk in enumerate(text_chunk): inp = chunk try: df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') except: df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0]) + ']').replace("\'", '\"')), index=[0]).fillna('') # df = pd.DataFrame(repair_json(tablex_chain.run(inp)[0])) chunkdf.append(df) concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') st.write("☑ Entities Extraction Done ..") time.sleep(0.1) st.write("☑ Generating Summary ...") summary = get_summ(pdf.name) st.write("☑ Generating Summary Done ..") time.sleep(0.1) st.write("☑ Table Extraction in progress ...") # Table Extraction # L = [] output_list = [] elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) with NamedTemporaryFile(dir=".", suffix=".json") as f: elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") json_file_path = os.path.abspath(f.name) # Get the absolute file path with open(json_file_path, "r", encoding="utf-8") as jsonfile: data = json.load(jsonfile) extracted_elements = [] for entry in data: if entry["type"] == "Table": extracted_elements.append(entry["metadata"]["text_as_html"]) with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: text_file_path = os.path.abspath(txt_file.name) with open(text_file_path, "w", encoding="utf-8") as txtfile: for element in extracted_elements: txtfile.write(element + "\n\n") loader = TextLoader(text_file_path) documents = loader.load() # split it into chunks text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") docs = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() db = Chroma.from_documents(docs, embeddings) llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0) qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) # List of questions questions = [ """Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} """, """Mention all genes / locus name with respective potential diseases in a curly brackets like this: Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesity"} """, """Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} """ ] try: for query in questions: response = qa_chain({"query" : query}) output_list.append(response) except Exception as e: pass db.delete_collection() # 1 for i in range(len(output_list[0]['result'].split('\n'))): st.write(output_list[0]['result'].split('\n')) if output_list[0]['result'].split('\n')[i] != "": try: row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], # 'Population' : concat['population_race'][0], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, # 'Sample Size' : concat['sample_size'][0] }} if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], # 'Population' : concat['population_race'][0], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, # 'Sample Size' : concat['sample_size'][0], 'Genes' : g.strip().upper().replace('Unknown', ''), 'SNPs' : row['SNPs'].replace('Unknown', ''), "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') }) else: L.append(row) except KeyError: row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], "Publisher Name" : "", # 'Population' : concat['population_race'][0], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, # 'Sample Size' : concat['sample_size'][0] } } if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, 'Genes' : g.strip().upper().replace('Unknown', ''), 'SNPs' : row['SNPs'].replace('Unknown', ''), "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') }) else: L.append(row) except SyntaxError: row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) row = f"""{row}""" row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, # 'Population' : concat['population_race'][0], # 'Sample Size' : concat['sample_size'][0] } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except ValueError: if type(output_list[0]['result'].split('\n')[i]) is dict: row = repair_json(output_list[0]['result'].split('\n')[i]) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) # 2 for i in range(len(output_list[1]['result'].split('\n'))): if output_list[1]['result'].split('\n')[i] != "": try: row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i]))[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if row['SNPs'] != "Not available": row.update({ 'SNPs' : "Not available" }) if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, 'Genes' : g.strip().upper().replace('Unknown', ''), "SNPs" : "Not available", "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') }) else: L.append(row) except KeyError: row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if row['SNPs'] != "Not available": row.update({ 'SNPs' : "Not available" }) if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, 'Genes' : g.strip().upper().replace('Unknown', ''), "SNPs" : "Not available", "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') }) else: L.append(row) except SyntaxError: row = f"""{row}""" row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except ValueError: if type(output_list[1]['result'].split('\n')[i]) is dict: row = output_list[1]['result'].split('\n')[i] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) # 3 for i in range(len(output_list[2]['result'].split('\n'))): if output_list[2]['result'].split('\n')[i] != "": try: row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i]))[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except KeyError: row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except SyntaxError: row = f"""{row}""" row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except ValueError: if type(output_list[2]['result'].split('\n')[i]) is dict: row = output_list[2]['result'].split('\n')[i] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) st.write("☑ Table Extraction Done ...") status.update(label="Gene and SNPs succesfully collected.") L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] csv = pd.DataFrame(L) st.dataframe(csv) generated_key = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(16)) # if st.button("Download Excel File", key=generated_key): # excel_link = create_excel_download_link(csv, uploaded_file.name.replace('.pdf', '')) # st.markdown(excel_link, unsafe_allow_html=True) with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: # Write each dataframe to a different worksheet csv.to_excel(writer, sheet_name='Result') writer.close() # time_now = datetime.now() # current_time = time_now.strftime("%H:%M:%S") csv = convert_df(csv) st.download_button( label="Save Result", data=buffer, file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', mime='application/vnd.ms-excel', key=generated_key ) if on_v: if parseButtonV: with st.status("Extraction in progress ...", expanded=True) as status: st.write("Getting Result ...") csv = pd.DataFrame() for uploaded_file in stqdm(uploaded_files): L = [] with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: pdf.write(uploaded_file.getbuffer()) # Open the PDF file in read-binary mode with open(pdf.name, 'rb') as pdf_file: # Create a PDF reader object pdf_reader = PyPDF2.PdfReader(pdf_file) # Create a PDF writer object to write the rotated pages to a new PDF pdf_writer = PyPDF2.PdfWriter() # Iterate through each page in the original PDF for page_num in range(len(pdf_reader.pages)): # Get the page object page = pdf_reader.pages[page_num] # Rotate the page 90 degrees clockwise (use -90 for counterclockwise) page.rotate(90) # Add the rotated page to the PDF writer pdf_writer.add_page(page) with NamedTemporaryFile(dir='.', suffix=".pdf") as rotated_pdf: pdf_writer.write(rotated_pdf.name) # Entity Extraction st.write("☑ Extracting Entities ...") bytes_data = uploaded_file.read() journal = Journal(uploaded_file.name, bytes_data) images = pdf2image.convert_from_bytes(journal.bytes) extracted_text = "" for image in images[:-1]: text = pytesseract.image_to_string(image) text = clean_text(text) extracted_text += text + " " text = replace_quotes(extracted_text) text_chunk = split_text(text, chunk_size_v) chunkdf = [] for i, chunk in enumerate(text_chunk): inp = chunk df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') chunkdf.append(df) concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') st.write("☑ Entities Extraction Done ..") time.sleep(0.1) st.write("☑ Generating Summary ...") summary = get_summ(pdf.name) st.write("☑ Generating Summary Done ..") time.sleep(0.1) st.write("☑ Table Extraction in progress ...") # Table Extraction output_list = [] elements = partition_pdf(filename=rotated_pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) with NamedTemporaryFile(dir=".", suffix=".json") as f: elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") json_file_path = os.path.abspath(f.name) # Get the absolute file path with open(json_file_path, "r", encoding="utf-8") as jsonfile: data = json.load(jsonfile) extracted_elements = [] for entry in data: if entry["type"] == "Table": extracted_elements.append(entry["metadata"]["text_as_html"]) with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: text_file_path = os.path.abspath(txt_file.name) with open(text_file_path, "w", encoding="utf-8") as txtfile: for element in extracted_elements: txtfile.write(element + "\n\n") loader = TextLoader(text_file_path) documents = loader.load() # split it into chunks text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") docs = text_splitter.split_documents(documents) embeddings = OpenAIEmbeddings() db = Chroma.from_documents(docs, embeddings) llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0) qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) # List of questions questions = [ """Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} """, """Mention all genes / locus name with respective potential diseases in a curly brackets like this: Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesitya"} """, """Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} """ ] try: for query in questions: response = qa_chain({"query" : query}) output_list.append(response) except Exception as e: pass db.delete_collection() # 1 for i in range(len(output_list[0]['result'].split('\n'))): if output_list[0]['result'].split('\n')[i] != "": try: row = literal_eval(output_list[0]['result'].split('\n')[i])[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }} if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Genes' : g.strip().upper(), 'SNPs' : row['SNPs'], "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }) else: L.append(row) except KeyError: row = literal_eval(output_list[0]['result'].split('\n')[i]) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }} if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Genes' : g.strip().upper(), 'SNPs' : row['SNPs'], "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }) else: L.append(row) except ValueError: if type(output_list[0]['result'].split('\n')[i]) is dict: row = output_list[0]['result'].split('\n')[i] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except SyntaxError: row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) # 2 for i in range(len(output_list[1]['result'].split('\n'))): if output_list[1]['result'].split('\n')[i] != "": try: row = literal_eval(output_list[1]['result'].split('\n')[i])[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }} if row['SNPs'] != "Not available": row.update({ 'SNPs' : "Not available" }) if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Genes' : g.strip().upper(), "SNPs" : "Not available", "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }) else: L.append(row) except KeyError: row = literal_eval(output_list[1]['result'].split('\n')[i]) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }} if row['SNPs'] != "Not available": row.update({ 'SNPs' : "Not available" }) if len(row['Genes'].strip().split(',')) > 1: for g in row['Genes'].strip().split(','): L.append({ 'Genes' : g.strip().upper(), "SNPs" : "Not available", "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, }) else: L.append(row) except ValueError: if type(output_list[1]['result'].split('\n')[i]) is dict: row = output_list[1]['result'].split('\n')[i] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except SyntaxError: row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) # 3 for i in range(len(output_list[2]['result'].split('\n'))): if output_list[2]['result'].split('\n')[i] != "": try: row = literal_eval(output_list[2]['result'].split('\n')[i])[0] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except KeyError: row = literal_eval(output_list[2]['result'].split('\n')[i]) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except ValueError: if type(output_list[2]['result'].split('\n')[i]) is dict: row = output_list[2]['result'].split('\n')[i] row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) except SyntaxError: row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) row = {**row, **{ 'Title' : concat['title'][0], 'Authors' : concat['authors'][0], 'Publisher Name' : concat['publisher_name'][0], 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], 'Study Methodology' : ' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title(), 'Study Level' : ' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title(), 'Recommendation' : summary, } } if not row['SNPs'].startswith("rs"): row.update({ 'SNPs' : "-" }) else: L.append(row) st.write("☑ Table Extraction Done") status.update(label="Gene and SNPs succesfully collected.") L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] csv = pd.DataFrame(L) st.dataframe(csv) with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: # Write each dataframe to a different worksheet csv.to_excel(writer, sheet_name='Result') writer.close() time_now = datetime.now() current_time = time_now.strftime("%H:%M:%S") csv = convert_df(csv) st.download_button( label="Save Result", data=buffer, file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', mime='application/vnd.ms-excel' ) if on_t: if parseButtonT: with st.status("Extraction in progress ...", expanded=True) as status: st.write("Getting Result ...") csv = pd.DataFrame() for uploaded_file in stqdm(uploaded_files): L = [] with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: pdf.write(uploaded_file.getbuffer()) # Entity Extraction st.write("☑ Extracting Entities ...") bytes_data = uploaded_file.read() journal = Journal(uploaded_file.name, bytes_data) images = pdf2image.convert_from_bytes(journal.bytes) extracted_text = "" for image in images[:-1]: text = pytesseract.image_to_string(image) text = clean_text(text) extracted_text += text + " " text = replace_quotes(extracted_text) text_chunk = split_text(text, chunk_size_t) chunkdf = [] for i, chunk in enumerate(text_chunk): inp = chunk df = pd.DataFrame(literal_eval(str(json.dumps(textex_chain.run(inp)[0])).replace("\'", "\"")), index=[0]).fillna('') chunkdf.append(df) concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') st.write("☑ Entities Extraction Done ..") time.sleep(0.1) st.write("☑ Generating Summary ...") concat['SNPs'] = concat['SNPs'].apply(lambda x: x if x.startswith('rs') else '') for col in list(concat.columns): concat[col] = concat[col].apply(lambda x: x if x not in ['N/A', 'not mentioned', 'Not mentioned', 'Unknown'] else '') summary = get_summ(pdf.name) time.sleep(0.1) st.write("☑ Generating Summary Done...") for i in range(len(concat)): if (len(concat['genes_locus'][i].split(',')) >= 1) and concat['SNPs'][i] == '': for g in concat['genes_locus'][i].split(','): L.append({ 'Title' : concat['title'][0], 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), 'Genes' : g.upper(), 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), 'SNPs' : concat['SNPs'][i], 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), 'Recommendation' : summary, }) elif (len(concat['SNPs'][i].split(',')) >= 1): for s in concat['SNPs'][i].split(','): try: L.append({ 'Title' : concat['title'][0], 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), 'Genes' : get_geneName(s.strip()).upper(), 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), 'SNPs' : s, 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), 'Recommendation' : summary, }) except Exception as e: L.append({ 'Title' : concat['title'][0], 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), 'Genes' : '', 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), 'SNPs' : s, 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), 'Recommendation' : summary, }) csv = pd.concat([csv, pd.DataFrame(L)], ignore_index=True) status.update(label="Gene and SNPs succesfully collected.") st.dataframe(csv) with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: # Write each dataframe to a different worksheet csv.to_excel(writer, sheet_name='Result') writer.close() time_now = datetime.now() current_time = time_now.strftime("%H:%M:%S") csv = convert_df(csv) st.download_button( label="Save Result", data=buffer, file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', mime='application/vnd.ms-excel' )