test_app / text_input.py
sanmoy27's picture
Upload text_input.py
c55f43f
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 6 10:31:42 2023
@author: sanmpaul
"""
#Import the required Libraries
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import time
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from string import punctuation
from collections import Counter
import re
import pathlib
import time
import json
import s3fs
import spacy
from io import StringIO
from ast import literal_eval
from operator import itemgetter
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModelForTokenClassification
aws_access_key_id="ASIA2WUO352SX6WKQ3V2"
aws_secret_access_key="KrlZ7mfvBc75It5Bk3snhOE/fyh/TM/i9nVpBQ/D"
aws_session_token="IQoJb3JpZ2luX2VjENr//////////wEaCXVzLWVhc3QtMSJGMEQCIBdDB/bWuclqtZz9bpKOTeVf8fcB07s3MDF6yfv86u7YAiBVbvZ95spkRt000Uu26EVD0o+8i0R8ZkH964l/7ZQGjirtAggyEAAaDDczNTgxMjc3NTU4OSIMKdbdomk59ANo40RKKsoCKRpcyQJCx+6BLumSwbtxd13glZ2t/4HamUQ87rYlxLL1z/2CYYli3SLt6PiLz7Hi+OtnL6avWiay6xbfhBDW5sOFKZ6AIMlBwpso4VP3cXlbcFSgcrN/Yp3wBAyHvT0cVUjm7pNlAvsZCA/1STCs4N+xI/y7hWUH9MsoCmqqvowD0ioPJ1ebONvP0HZHu9emhuLQ7gReQJGl40fdJMwZ2SW61G6TwrvnNDOlADb2v5JFrz6UoxbK3MO7/B2fgShrA3Y0RQIZkKKd7faMUsLipH8lZ/PRE3H+NMICIb0Te51gltv1KXRE6Fdy5AsA6CW/rlveyhy4BBr7jc3SPDzzNnp2yg2m7towCclkfpfubgcOzOLXEsefTkc5BlNr8syYgrwXXUBBVjAHqf1EgqSutr+k3twuLq+aw1U3I3O8GxaSMPV7FItNJqQuMN6NraQGOqgBGe6b4mdPvctSxzpZaJ7Y200J9Cw2VcNSLTOylZodH+lNlkcVR3KNY6RckllVkXIjUULpSV0oLhFaJkHZQgsX8cKSladGGtmLwIJeEJWjx4hp0nYZy2eqL/YHbC9isU+t3eYbJIR4936G4hxhx9i1mlqLpbDI0QRUJSp1E1ur6HzPioXmCqiu0rEoUBPRjW9jTcIUwWLoJmGLxtmzQNNhVGoZF4Y/Etub"
fs = s3fs.S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key, token=aws_session_token)
bucket_name = 'mimic3data'
folder_path = 'cleanedmimic'
file_name = 'extracted_diseases_DIAGNOSES.csv'
s3_file_path = f'{bucket_name}/{folder_path}/{file_name}'
# Add a title and intro text
st.title('Medical Coding Explorer')
st.text('This is a web app to allow exploration of the capabilities of Medical coding')
headings = ["Discharge Diagnosis:","Final Diagnosis:","Final Report:","FINAL DIAGNOSES:","DISCHARGE DIAGNOSES:","[**Hospital **] Diagnosis:"]
pattern = r"(?i)(?:^|\n)(?:{})\s*([\s\S]*?)(?=\n\n\w|$)".format("|".join(re.escape(heading) for heading in headings))
def extractSummary(text):
matches = re.findall(pattern, text)
ext_txt_lst = []
for match in matches:
ext_txt_lst.append(match.strip().replace("\n", " "))
extracted_txt = " ".join([ts for ts in ext_txt_lst])
return extracted_txt
# sci_nlp = spacy.load('en_ner_bc5cdr_md')
abbreviations_to_remove = ["s/p", "d/c'd", "w/"]
chars_to_remove = ["#", ".", ")", "(", "[", "]"]
numeric_pattern = r'\d+'
# Function to extract all diseases
# def extract_diseases(text):
# docx = sci_nlp(text)
# results_diseases = [ent.text.lower() for ent in docx.ents if (ent.label_ == 'DISEASE')]
# results_diseases2 = [' '.join(word for word in disease.split() if word not in abbreviations_to_remove) for disease in results_diseases]
# results_diseases3 = [re.sub(numeric_pattern, '', phrase) for phrase in results_diseases2]
# results_diseases4 = ["".join(c for c in phrase if c not in chars_to_remove) for phrase in results_diseases3]
# results_diseases_cleaned = list(set(results_diseases4))
# return results_diseases_cleaned
def convert_df(df):
return df.to_csv(index=False).encode('utf-8')
if "extract_button" not in st.session_state:
st.session_state.extract_button=False
if "ner_button" not in st.session_state:
st.session_state.ner_button=False
def extractCallback():
st.session_state.extract_button=True
st.session_state.extract_spinner=True
def nercallback():
st.session_state.extract_button=True
st.session_state.ner_button=True
st.session_state.extract_spinner=False
st.session_state.ner_spinner=True
def icdcallback():
st.session_state.icd_button=True
st.session_state.extract_button=True
st.session_state.ner_button=True
st.session_state.icd_spinner=True
st.session_state.ner_spinner=False
model_name = "emilyalsentzer/Bio_ClinicalBERT"
tokenizer = AutoTokenizer.from_pretrained(model_name)
def compareWith_ICD(row1,row2):
scobj=dict()
txt2 = row2['DISEASES']
code=row2['ICD9_CODE']
tkns = tokenizer([row1, txt2],padding=True,truncation=True,max_length=512)
score = cosine_similarity([tkns['input_ids'][0]], [tkns['input_ids'][1]])
scobj['ICD9_CODE']=code
scobj['score']=round(score[0][0],2)
return scobj
def compare(row):
obj = icd_df1.apply(lambda x: compareWith_ICD(row, x), axis=1)
return json.dumps(sorted(obj.to_list(), key=itemgetter('score'), reverse=True))
def top_icd(row):
sorted_lst = literal_eval(row)
k = [x['ICD9_CODE'] for x in sorted_lst]
new_vals=[]
for i in Counter(k):
all = [x for x in sorted_lst if x['ICD9_CODE']==i]
new_vals.append(max(all, key=lambda x: x['score']))
return json.dumps(sorted(new_vals, key=itemgetter('score'), reverse=True)[:5])
# Read the file from S3 using s3fs and store it in a Pandas DataFrame
with fs.open(s3_file_path, 'rb') as file:
icd_df = pd.read_csv(file)
icd_df['DISEASES'] = icd_df['DISEASES'].apply(literal_eval)
icd_df1 = icd_df.explode('DISEASES')
icd_df1 = icd_df1[icd_df1['DISEASES'].notna()]
icd_df1.drop(columns=['SEQ_NUM','SHORT_TITLE','LONG_TITLE'], inplace=True)
icd_df1.drop_duplicates(subset=["ICD9_CODE", "DISEASES"], inplace=True)
icd_df1.reset_index(drop=True, inplace=True)
upload_file = st.file_uploader('Upload a file containing medical data')
if upload_file is not None:
file_extension = pathlib.Path(upload_file.name).suffix
if file_extension=='.txt':
time.sleep(15)
bytes_data = upload_file.getvalue()
string_data = bytes_data.decode('utf-8')
# string_data = string_data.lstrip('"')
string_data = string_data.replace('**','')
# st.subheader('Display string_data')
# st.write(string_data)
ex_txt = extractSummary(string_data)
st.subheader('Extracted Discharge Summary information')
st.write(ex_txt)
# if st.button('Run NER Model', on_click=nercallback) or st.session_state.ner_button:
# with st.spinner('Running NER...'):
# time.sleep(15)
# extracted_disease_lst = extract_diseases(extracted_txt)
# st.info('Extracted Diseases------------------')
# st.write(extracted_disease_lst)
# disease_df = pd.DataFrame(extracted_disease_lst, columns=['DISEASES'])
# disease_df.drop_duplicates(subset=["DISEASES"], keep="first", inplace=True)
# disease_df.reset_index(drop=True, inplace=True)
# st.info('Extracted Disease')
# st.write(disease_df.head())
# if st.button('Run ICD Model'):
# disease_df['icd_map'] = disease_df.apply(lambda x: compare(x['DISEASES']), axis=1)
# disease_df['top5_icd'] = disease_df.apply(lambda x: top_icd(x['icd_map']), axis=1)
# disease_df.drop(columns=['icd_map'], inplace=True)
# st.info('Display top 5 rows of icd mapping')
# st.write(disease_df.head())
# csv = convert_df(disease_df)
# st.download_button(
# "Press to Download",
# csv,
# "top5icd.csv",
# "text/csv",
# key='top5icd-csv'
# )
else:
st.error('Upload a txt file', icon="🚨")
else:
st.error('Upload a file', icon="🚨")