coldn00dl3s's picture
Update app.py
9cd9f70 verified
raw
history blame
5.77 kB
import streamlit as st
import pandas as pd
import requests
import json
from datetime import datetime
st.title("FHIR Converter")
# Dropdown for conversion type
conversion_type = st.selectbox(
"Select Conversion Type",
["CSV to FHIR", "CCDA to FHIR","Clinical Notes to FHIR"]
)
# Dropdown for resource type
if conversion_type == "CSV to FHIR":
res_type = st.selectbox(
"Select Resource Type",
["Patient","CarePlan", "MedicationRequest", "Immunization", "Encounter", "Claim", "Procedure", "Observation"]
)
# File uploader for CSV file
uploaded_file = st.file_uploader("Upload CSV", type=["csv"])
if uploaded_file is not None:
# Display the uploaded CSV file
df = pd.read_csv(uploaded_file)
st.write("Uploaded CSV:")
st.dataframe(df)
# Placeholders for progress bar and time left
progress_bar = st.progress(0)
time_placeholder = st.empty()
items_placeholder = st.empty()
# Variables to store the result and state
result = st.empty()
conversion_successful = False
# Button to trigger conversion
if st.button("Convert"):
# Convert the CSV to FHIR using the FastAPI endpoint
start_time = datetime.now()
total_items = len(df)
try:
files = {"file": uploaded_file.getvalue()}
data = {"res_type": res_type}
response = requests.post("https://fhir-api-9jsn.onrender.com/convert", files=files, data=data, stream=True)
result_text = ""
for chunk in response.iter_lines():
if chunk:
chunk_data = chunk.decode("utf-8")
json_chunk = json.loads(chunk_data)
if "progress" in json_chunk:
progress = json_chunk["progress"]
current_item = json_chunk["current_item"]
total_items = json_chunk["total_items"]
elapsed_time = datetime.now() - start_time
items_per_second = current_item / elapsed_time.total_seconds()
time_left = (total_items - current_item) / items_per_second
progress_bar.progress(int(progress))
time_placeholder.text(f"Time elapsed: {elapsed_time}")
items_placeholder.text(f"Items processed: {current_item}/{total_items}")
elif "output" in json_chunk:
result_text = json_chunk["output"]
elif "error" in json_chunk:
st.error(f"An error occurred: {json_chunk['error']}")
break
if result_text:
result.success("Conversion successful!")
result.text_area("FHIR Output", result_text, height=400)
conversion_successful = True
except Exception as e:
st.error(f"An error occurred: {str(e)}")
# Display validator button
if conversion_successful:
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True)
elif conversion_type == "CCDA to FHIR":
res_type = st.selectbox(
"Select Resource Type",
["Patient", "Encounter", "Observation","MedicationRequest","Procedure","Immunization","Condition"]
)
cda_content = st.text_area("Enter CCDA Content in XML format")
# Variables to store the result and state
result = st.empty()
conversion_successful = False
# Button to trigger conversion
if st.button("Convert"):
try:
data = {"res_type": res_type, "cda_content": cda_content}
response = requests.post("https://fhir-api-9jsn.onrender.com/convert_ccda", json=data)
if response.status_code == 200:
result_text = response.json().get("output", "")
if result_text:
result.success("Conversion successful!")
result.text_area("FHIR Output", result_text, height=400)
conversion_successful = True
else:
st.error(f"An error occurred: {response.json().get('error', 'Unknown error')}")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
if conversion_successful:
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True)
elif conversion_type == "Clinical Notes to FHIR":
note_content = st.text_area("Copy and Paste your clinical notes for a patient")
# Variables to store the result and state
result = st.empty()
conversion_successful = False
# Button to trigger conversion
if st.button("Convert"):
try:
data = {"note_content": note_content}
response = requests.post("https://fhir-api-9jsn.onrender.com/convert_notes", json=data)
if response.status_code == 200:
result_text = response.json().get("output", "")
if result_text:
result.success("Conversion successful!")
result.text_area("FHIR Output", result_text, height=400)
conversion_successful = True
else:
st.error(f"An error occurred: {response.json().get('error', 'Unknown error')}")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
# Display validator button
if conversion_successful:
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True)