import streamlit as st from idc_index import index from pathlib import Path import pydicom import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from tempfile import TemporaryDirectory import os from pathlib import Path import polars import pydicom.datadict as dd import shutil # Helper function to get the description of a DICOM t def get_tag_description(tag, description): """ Get the description of the DICOM tag """ try: return dd.get_entry(tag)[2] # Get the third element which is the keyword/description except KeyError: return description def convert_value(value): """ Convert pydicom value to Python native format """ if isinstance(value, pydicom.multival.MultiValue): return [convert_value(v) for v in value] elif isinstance(value, pydicom.sequence.Sequence): return [convert_sequence_item(item) for item in value] elif isinstance(value, pydicom.valuerep.PersonName) or isinstance(value, pydicom.uid.UID): return str(value) else: return value # Sanitize column or field name for compatibility def sanitize_name(name): """ Sanitize column or field name """ # Remove special characters and replace underscores with an empty string return name.replace('(', '').replace(')', '').replace(',', '').replace(' ', '').replace('_', '') # Convert pydicom sequence item to Python native format def convert_sequence_item(item): """ Convert pydicom sequence item to Python native format """ return {sanitize_name(get_tag_description(elem.tag, elem.description())): convert_value(elem.value) for elem in item} # Clean column name by removing special characters and spaces def clean_column_name(column_name): """ Clean column name """ return ''.join(e for e in column_name if e.isalnum()) # Serialize complex DICOM elements to JSON string while preserving nesting def serialize_element(value): """ Serialize complex DICOM elements to JSON string while preserving nesting """ if isinstance(value, pydicom.Dataset): # Convert the Dataset to a dict preserving the nested structure return {sanitize_name(get_tag_description(elem.tag, elem.description())): serialize_element(elem.value) for elem in value} elif isinstance(value, pydicom.sequence.Sequence): # Convert the Sequence to a list preserving the nested structure return [serialize_element(item) for item in value] else: return convert_value(value) # Extract DICOM header data and serialize complex types while preserving nesting def extract_dicom_header(dicom_file): """ Extract DICOM header data and serialize complex types while preserving nesting """ ds = pydicom.dcmread(dicom_file, stop_before_pixels=True) header_data = {} for elem in ds: header_data[sanitize_name(clean_column_name(get_tag_description(elem.tag, elem.description())))] = serialize_element(elem.value) return header_data # Save DICOM header data to a Parquet file def save_dicom_header_to_parquet(dicom_files, parquet_file): """ Save DICOM header data to a Parquet file """ all_header_data = [] for dicom_file in dicom_files: header_data = extract_dicom_header(dicom_file) all_header_data.append(header_data) df = pd.DataFrame(all_header_data) # Sanitize column names df.columns = [sanitize_name(col) for col in df.columns] table = pa.Table.from_pandas(df) pq.write_table(table, parquet_file) # Main Streamlit app code st.title("DICOM to Parquet Converter") st.write("Select IDC data to download and extract metadata into a Parquet file.") # Fetch IDC index client = index.IDCClient() index_df = client.index # Option to choose IDC data st.subheader("Choose IDC Data to Process") collection_ids = index_df["collection_id"].unique() selected_collection_id = st.selectbox("Select Collection ID", collection_ids) # Filter dataframe based on selected collection_id df_filtered_by_collection = index_df[index_df["collection_id"] == selected_collection_id] patients = df_filtered_by_collection["PatientID"].unique() selected_patient_id = st.selectbox("Select Patient ID", patients) # Filter dataframe based on selected patient_id df_filtered_by_patient = df_filtered_by_collection[df_filtered_by_collection["PatientID"] == selected_patient_id] modalities = df_filtered_by_patient["Modality"].unique() selected_modality = st.selectbox("Select Modality", modalities) # Filter dataframe based on selected modality df_filtered_by_modality = df_filtered_by_patient[df_filtered_by_patient["Modality"] == selected_modality] studies = df_filtered_by_modality["StudyInstanceUID"].unique() selected_study = st.selectbox("Select Study", studies) # Filter dataframe based on selected study df_filtered_by_study = df_filtered_by_modality[df_filtered_by_modality["StudyInstanceUID"] == selected_study] series = df_filtered_by_study["SeriesInstanceUID"].unique() selected_series = st.selectbox("Select Series", series) # Button to process IDC data if st.button("Extract DICOM Header into a parquet"): # Fetch data from IDC based on selection selection = index_df[ (index_df["SeriesInstanceUID"] == selected_series) ] series_instance_uids = selection["SeriesInstanceUID"].tolist() # with TemporaryDirectory() as temp_dir: download_errors = [] #input_dir = os.path.join(temp_dir, "input_data") input_dir=Path("input_data/") if input_dir.exists(): shutil.rmtree(input_dir) os.makedirs(input_dir, exist_ok=True) try: client.download_from_selection(seriesInstanceUID=series_instance_uids, downloadDir=input_dir) except Exception as e: download_errors.append(f"Error downloading data: {str(e)}") if download_errors: st.error("\n".join(download_errors)) else: st.success("Data downloaded successfully.") # Process downloaded DICOM data dicom_files = [str(file) for file in input_dir.glob('**/*.dcm')] parquet_file = 'dcm2parquet_output.parquet' save_dicom_header_to_parquet(dicom_files, parquet_file) st.success("Processing complete.") with open(parquet_file, "rb") as f: st.download_button( label="Download the metadata parquet file", data=f, file_name="dcm2parquet_output.parquet", mime="application/parquet" )