Spaces:
Sleeping
Sleeping
File size: 4,448 Bytes
a3290d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
"""
@author: louisblankemeier
"""
import os
import shutil
from pathlib import Path
from typing import Dict, Union
# import dicom2nifti
import dosma as dm
import pydicom
import SimpleITK as sitk
from comp2comp.inference_class_base import InferenceClass
class DicomLoader(InferenceClass):
"""Load a single dicom series."""
def __init__(self, input_path: Union[str, Path]):
super().__init__()
self.dicom_dir = Path(input_path)
self.dr = dm.DicomReader()
def __call__(self, inference_pipeline) -> Dict:
medical_volume = self.dr.load(
self.dicom_dir, group_by=None, sort_by="InstanceNumber"
)[0]
return {"medical_volume": medical_volume}
class NiftiSaver(InferenceClass):
"""Save dosma medical volume object to NIfTI file."""
def __init__(self):
super().__init__()
# self.output_dir = Path(output_path)
self.nw = dm.NiftiWriter()
def __call__(
self, inference_pipeline, medical_volume: dm.MedicalVolume
) -> Dict[str, Path]:
nifti_file = inference_pipeline.output_dir
self.nw.write(medical_volume, nifti_file)
return {"nifti_file": nifti_file}
class DicomFinder(InferenceClass):
"""Find dicom files in a directory."""
def __init__(self, input_path: Union[str, Path]) -> Dict[str, Path]:
super().__init__()
self.input_path = Path(input_path)
def __call__(self, inference_pipeline) -> Dict[str, Path]:
"""Find dicom files in a directory.
Args:
inference_pipeline (InferencePipeline): Inference pipeline.
Returns:
Dict[str, Path]: Dictionary containing dicom files.
"""
dicom_files = []
for file in self.input_path.glob("**/*.dcm"):
dicom_files.append(file)
inference_pipeline.dicom_file_paths = dicom_files
return {}
class DicomToNifti(InferenceClass):
"""Convert dicom files to NIfTI files."""
def __init__(self, input_path: Union[str, Path], save=True):
super().__init__()
self.input_path = Path(input_path)
self.save = save
def __call__(self, inference_pipeline):
if os.path.exists(
os.path.join(
inference_pipeline.output_dir, "segmentations", "converted_dcm.nii.gz"
)
):
return {}
if hasattr(inference_pipeline, "medical_volume"):
return {}
output_dir = inference_pipeline.output_dir
segmentations_output_dir = os.path.join(output_dir, "segmentations")
os.makedirs(segmentations_output_dir, exist_ok=True)
# if self.input_path is a folder
if self.input_path.is_dir():
ds = dicom_series_to_nifti(
self.input_path,
output_file=os.path.join(
segmentations_output_dir, "converted_dcm.nii.gz"
),
reorient_nifti=False,
)
inference_pipeline.dicom_series_path = str(self.input_path)
inference_pipeline.dicom_ds = ds
elif str(self.input_path).endswith(".nii"):
shutil.copy(
self.input_path,
os.path.join(segmentations_output_dir, "converted_dcm.nii"),
)
elif str(self.input_path).endswith(".nii.gz"):
shutil.copy(
self.input_path,
os.path.join(segmentations_output_dir, "converted_dcm.nii.gz"),
)
return {}
def series_selector(dicom_path):
ds = pydicom.filereader.dcmread(dicom_path)
image_type_list = list(ds.ImageType)
if not any("primary" in s.lower() for s in image_type_list):
raise ValueError("Not primary image type")
if not any("original" in s.lower() for s in image_type_list):
raise ValueError("Not original image type")
# if any("gsi" in s.lower() for s in image_type_list):
# raise ValueError("GSI image type")
if ds.ImageOrientationPatient != [1, 0, 0, 0, 1, 0]:
raise ValueError("Image orientation is not axial")
return ds
def dicom_series_to_nifti(input_path, output_file, reorient_nifti):
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(str(input_path))
ds = series_selector(dicom_names[0])
reader.SetFileNames(dicom_names)
image = reader.Execute()
sitk.WriteImage(image, output_file)
return ds
|