Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import requests | |
import json | |
from datetime import datetime | |
st.title("FHIR Converter") | |
# Dropdown for conversion type | |
conversion_type = st.selectbox( | |
"Select Conversion Type", | |
["CSV to FHIR", "CCDA to FHIR","Clinical Notes to FHIR"] | |
) | |
# Dropdown for resource type | |
if conversion_type == "CSV to FHIR": | |
res_type = st.selectbox( | |
"Select Resource Type", | |
["Patient","CarePlan", "MedicationRequest", "Immunization", "Encounter", "Claim", "Procedure", "Observation"] | |
) | |
# File uploader for CSV file | |
uploaded_file = st.file_uploader("Upload CSV", type=["csv"]) | |
if uploaded_file is not None: | |
# Display the uploaded CSV file | |
df = pd.read_csv(uploaded_file) | |
st.write("Uploaded CSV:") | |
st.dataframe(df) | |
# Placeholders for progress bar and time left | |
progress_bar = st.progress(0) | |
time_placeholder = st.empty() | |
items_placeholder = st.empty() | |
# Variables to store the result and state | |
result = st.empty() | |
conversion_successful = False | |
# Button to trigger conversion | |
if st.button("Convert"): | |
# Convert the CSV to FHIR using the FastAPI endpoint | |
start_time = datetime.now() | |
total_items = len(df) | |
try: | |
files = {"file": uploaded_file.getvalue()} | |
data = {"res_type": res_type} | |
response = requests.post("https://fhir-api-9jsn.onrender.com/convert", files=files, data=data, stream=True) | |
result_text = "" | |
for chunk in response.iter_lines(): | |
if chunk: | |
chunk_data = chunk.decode("utf-8") | |
json_chunk = json.loads(chunk_data) | |
if "progress" in json_chunk: | |
progress = json_chunk["progress"] | |
current_item = json_chunk["current_item"] | |
total_items = json_chunk["total_items"] | |
elapsed_time = datetime.now() - start_time | |
items_per_second = current_item / elapsed_time.total_seconds() | |
time_left = (total_items - current_item) / items_per_second | |
progress_bar.progress(int(progress)) | |
time_placeholder.text(f"Time elapsed: {elapsed_time}") | |
items_placeholder.text(f"Items processed: {current_item}/{total_items}") | |
elif "output" in json_chunk: | |
result_text = json_chunk["output"] | |
elif "error" in json_chunk: | |
st.error(f"An error occurred: {json_chunk['error']}") | |
break | |
if result_text: | |
result.success("Conversion successful!") | |
result.text_area("FHIR Output", result_text, height=400) | |
conversion_successful = True | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
# Display validator button | |
if conversion_successful: | |
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True) | |
elif conversion_type == "CCDA to FHIR": | |
res_type = st.selectbox( | |
"Select Resource Type", | |
["Patient", "Encounter", "Observation","MedicationRequest","Procedure","Immunization","Condition"] | |
) | |
cda_content = st.text_area("Enter CCDA Content in XML format") | |
# Variables to store the result and state | |
result = st.empty() | |
conversion_successful = False | |
# Button to trigger conversion | |
if st.button("Convert"): | |
try: | |
data = {"res_type": res_type, "cda_content": cda_content} | |
response = requests.post("https://fhir-api-9jsn.onrender.com/convert_ccda", json=data) | |
if response.status_code == 200: | |
result_text = response.json().get("output", "") | |
if result_text: | |
result.success("Conversion successful!") | |
result.text_area("FHIR Output", result_text, height=400) | |
conversion_successful = True | |
else: | |
st.error(f"An error occurred: {response.json().get('error', 'Unknown error')}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
if conversion_successful: | |
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True) | |
elif conversion_type == "Clinical Notes to FHIR": | |
note_content = st.text_area("Copy and Paste your clinical notes for a patient") | |
# Variables to store the result and state | |
result = st.empty() | |
conversion_successful = False | |
# Button to trigger conversion | |
if st.button("Convert"): | |
try: | |
data = {"note_content": note_content} | |
response = requests.post("https://fhir-api-9jsn.onrender.com/convert_notes", json=data) | |
if response.status_code == 200: | |
result_text = response.json().get("output", "") | |
if result_text: | |
result.success("Conversion successful!") | |
result.text_area("FHIR Output", result_text, height=400) | |
conversion_successful = True | |
else: | |
st.error(f"An error occurred: {response.json().get('error', 'Unknown error')}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
# Display validator button | |
if conversion_successful: | |
st.markdown("[Go to FHIR Validator](https://validator.fhir.org)", unsafe_allow_html=True) | |