|
import pydicom |
|
import sys |
|
import os |
|
import numpy as np |
|
import nibabel as nib |
|
import scipy |
|
|
|
def convert_ct_dicom_to_nii(dir_dicom, dir_nii, outputname, newvoxelsize=None): |
|
Patients = PatientList() |
|
|
|
Patients.list_dicom_files(dir_dicom, 1) |
|
patient = Patients.list[0] |
|
patient_name = patient.PatientInfo.PatientName |
|
patient.import_patient_data(newvoxelsize) |
|
CT = patient.CTimages[0] |
|
image_position_patient = CT.ImagePositionPatient |
|
voxelsize = np.array(CT.PixelSpacing) |
|
save_images(dst_dir=os.path.join(dir_nii), voxelsize=voxelsize, |
|
image_position_patient=image_position_patient, image=CT.Image, outputname=outputname) |
|
return CT |
|
|
|
|
|
def save_images(dst_dir, voxelsize, image_position_patient, image, outputname): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image_shape = image.shape |
|
|
|
|
|
|
|
image_nii = nib.Nifti1Image(image, affine=np.eye(4)) |
|
|
|
image_nii = set_header_info(image_nii, voxelsize, image_position_patient) |
|
|
|
|
|
nib.save(image_nii, os.path.join(dst_dir, outputname)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_header_info(nii_file, voxelsize, image_position_patient, contours_exist=None): |
|
nii_file.header['pixdim'][1] = voxelsize[0] |
|
nii_file.header['pixdim'][2] = voxelsize[1] |
|
nii_file.header['pixdim'][3] = voxelsize[2] |
|
|
|
|
|
nii_file.affine[0][0] = voxelsize[0] |
|
nii_file.affine[1][1] = voxelsize[1] |
|
nii_file.affine[2][2] = voxelsize[2] |
|
|
|
nii_file.affine[0][3] = image_position_patient[0] |
|
nii_file.affine[1][3] = image_position_patient[1] |
|
nii_file.affine[2][3] = image_position_patient[2] |
|
if contours_exist: |
|
nii_file.header.extensions.append( |
|
nib.nifti1.Nifti1Extension(0, bytearray(contours_exist))) |
|
return nii_file |
|
|
|
|
|
class PatientList: |
|
|
|
def __init__(self): |
|
self.list = [] |
|
|
|
def find_CT_image(self, display_id): |
|
count = -1 |
|
for patient_id in range(len(self.list)): |
|
for ct_id in range(len(self.list[patient_id].CTimages)): |
|
if (self.list[patient_id].CTimages[ct_id].isLoaded == 1): |
|
count += 1 |
|
if (count == display_id): |
|
break |
|
if (count == display_id): |
|
break |
|
|
|
return patient_id, ct_id |
|
|
|
def find_dose_image(self, display_id): |
|
count = -1 |
|
for patient_id in range(len(self.list)): |
|
for dose_id in range(len(self.list[patient_id].RTdoses)): |
|
if (self.list[patient_id].RTdoses[dose_id].isLoaded == 1): |
|
count += 1 |
|
if (count == display_id): |
|
break |
|
if (count == display_id): |
|
break |
|
|
|
return patient_id, dose_id |
|
|
|
def find_contour(self, ROIName): |
|
for patient_id in range(len(self.list)): |
|
for struct_id in range(len(self.list[patient_id].RTstructs)): |
|
if (self.list[patient_id].RTstructs[struct_id].isLoaded == 1): |
|
for contour_id in range(len(self.list[patient_id].RTstructs[struct_id].Contours)): |
|
if (self.list[patient_id].RTstructs[struct_id].Contours[contour_id].ROIName == ROIName): |
|
return patient_id, struct_id, contour_id |
|
|
|
def list_dicom_files(self, folder_path, recursive): |
|
file_list = os.listdir(folder_path) |
|
|
|
for file_name in file_list: |
|
file_path = os.path.join(folder_path, file_name) |
|
|
|
|
|
if os.path.isdir(file_path): |
|
if recursive == True: |
|
subfolder_list = self.list_dicom_files(file_path, True) |
|
|
|
|
|
|
|
elif os.path.isfile(file_path): |
|
|
|
try: |
|
dcm = pydicom.dcmread(file_path) |
|
except: |
|
print("Invalid Dicom file: " + file_path) |
|
continue |
|
|
|
patient_id = next((x for x, val in enumerate( |
|
self.list) if val.PatientInfo.PatientID == dcm.PatientID), -1) |
|
|
|
if patient_id == -1: |
|
Patient = PatientData() |
|
Patient.PatientInfo.PatientID = dcm.PatientID |
|
Patient.PatientInfo.PatientName = str(dcm.PatientName) |
|
Patient.PatientInfo.PatientBirthDate = dcm.PatientBirthDate |
|
Patient.PatientInfo.PatientSex = dcm.PatientSex |
|
self.list.append(Patient) |
|
patient_id = len(self.list) - 1 |
|
|
|
|
|
if dcm.SOPClassUID == "1.2.840.10008.5.1.4.1.1.2": |
|
ct_id = next((x for x, val in enumerate( |
|
self.list[patient_id].CTimages) if val.SeriesInstanceUID == dcm.SeriesInstanceUID), -1) |
|
if ct_id == -1: |
|
CT = CTimage() |
|
CT.SeriesInstanceUID = dcm.SeriesInstanceUID |
|
CT.SOPClassUID == "1.2.840.10008.5.1.4.1.1.2" |
|
CT.PatientInfo = self.list[patient_id].PatientInfo |
|
CT.StudyInfo = StudyInfo() |
|
CT.StudyInfo.StudyInstanceUID = dcm.StudyInstanceUID |
|
CT.StudyInfo.StudyID = dcm.StudyID |
|
CT.StudyInfo.StudyDate = dcm.StudyDate |
|
CT.StudyInfo.StudyTime = dcm.StudyTime |
|
if (hasattr(dcm, 'SeriesDescription') and dcm.SeriesDescription != ""): |
|
CT.ImgName = dcm.SeriesDescription |
|
else: |
|
CT.ImgName = dcm.SeriesInstanceUID |
|
self.list[patient_id].CTimages.append(CT) |
|
ct_id = len(self.list[patient_id].CTimages) - 1 |
|
|
|
self.list[patient_id].CTimages[ct_id].DcmFiles.append( |
|
file_path) |
|
|
|
else: |
|
print("Unknown SOPClassUID " + |
|
dcm.SOPClassUID + " for file " + file_path) |
|
|
|
|
|
else: |
|
print("Unknown file type " + file_path) |
|
|
|
def print_patient_list(self): |
|
print("") |
|
for patient in self.list: |
|
patient.print_patient_info() |
|
|
|
print("") |
|
|
|
|
|
class PatientData: |
|
|
|
def __init__(self): |
|
self.PatientInfo = PatientInfo() |
|
self.CTimages = [] |
|
|
|
def print_patient_info(self, prefix=""): |
|
print("") |
|
print(prefix + "PatientName: " + self.PatientInfo.PatientName) |
|
print(prefix + "PatientID: " + self.PatientInfo.PatientID) |
|
|
|
for ct in self.CTimages: |
|
print("") |
|
ct.print_CT_info(prefix + " ") |
|
|
|
def import_patient_data(self, newvoxelsize=None): |
|
|
|
for i, ct in enumerate(self.CTimages): |
|
if (ct.isLoaded == 1): |
|
continue |
|
ct.import_Dicom_CT() |
|
|
|
for i, ct in enumerate(self.CTimages): |
|
ct.resample_CT(newvoxelsize) |
|
|
|
|
|
class PatientInfo: |
|
|
|
def __init__(self): |
|
self.PatientID = '' |
|
self.PatientName = '' |
|
self.PatientBirthDate = '' |
|
self.PatientSex = '' |
|
|
|
|
|
class StudyInfo: |
|
|
|
def __init__(self): |
|
self.StudyInstanceUID = '' |
|
self.StudyID = '' |
|
self.StudyDate = '' |
|
self.StudyTime = '' |
|
|
|
|
|
class CTimage: |
|
|
|
def __init__(self): |
|
self.SeriesInstanceUID = "" |
|
self.PatientInfo = {} |
|
self.StudyInfo = {} |
|
self.FrameOfReferenceUID = "" |
|
self.ImgName = "" |
|
self.SOPClassUID = "" |
|
self.DcmFiles = [] |
|
self.isLoaded = 0 |
|
|
|
def print_CT_info(self, prefix=""): |
|
print(prefix + "CT series: " + self.SeriesInstanceUID) |
|
for ct_slice in self.DcmFiles: |
|
print(prefix + " " + ct_slice) |
|
|
|
def resample_CT(self, newvoxelsize): |
|
ct = self.Image |
|
|
|
if newvoxelsize is not None: |
|
source_shape = self.GridSize |
|
voxelsize = self.PixelSpacing |
|
|
|
VoxelX_source = self.ImagePositionPatient[0] + \ |
|
np.arange(source_shape[0])*voxelsize[0] |
|
VoxelY_source = self.ImagePositionPatient[1] + \ |
|
np.arange(source_shape[1])*voxelsize[1] |
|
VoxelZ_source = self.ImagePositionPatient[2] + \ |
|
np.arange(source_shape[2])*voxelsize[2] |
|
|
|
target_shape = np.ceil(np.array(source_shape).astype( |
|
float)*np.array(voxelsize).astype(float)/newvoxelsize).astype(int) |
|
VoxelX_target = self.ImagePositionPatient[0] + \ |
|
np.arange(target_shape[0])*newvoxelsize[0] |
|
VoxelY_target = self.ImagePositionPatient[1] + \ |
|
np.arange(target_shape[1])*newvoxelsize[1] |
|
VoxelZ_target = self.ImagePositionPatient[2] + \ |
|
np.arange(target_shape[2])*newvoxelsize[2] |
|
|
|
if (all(source_shape == target_shape) and np.linalg.norm(np.subtract(voxelsize, newvoxelsize) < 0.001)): |
|
print("Image does not need filtering") |
|
else: |
|
|
|
sigma = [0, 0, 0] |
|
if (newvoxelsize[0] > voxelsize[0]): |
|
sigma[0] = 0.4 * (newvoxelsize[0]/voxelsize[0]) |
|
if (newvoxelsize[1] > voxelsize[1]): |
|
sigma[1] = 0.4 * (newvoxelsize[1]/voxelsize[1]) |
|
if (newvoxelsize[2] > voxelsize[2]): |
|
sigma[2] = 0.4 * (newvoxelsize[2]/voxelsize[2]) |
|
|
|
if (sigma != [0, 0, 0]): |
|
print("Image is filtered before downsampling") |
|
ct = scipy.ndimage.gaussian_filter(ct, sigma) |
|
|
|
xi = np.array(np.meshgrid( |
|
VoxelX_target, VoxelY_target, VoxelZ_target)) |
|
xi = np.rollaxis(xi, 0, 4) |
|
xi = xi.reshape((xi.size // 3, 3)) |
|
|
|
|
|
ct = scipy.interpolate.interpn((VoxelX_source, VoxelY_source, VoxelZ_source), ct, xi, method='linear', |
|
fill_value=-1000, bounds_error=False).reshape(target_shape).transpose(1, 0, 2) |
|
|
|
self.PixelSpacing = newvoxelsize |
|
self.GridSize = list(ct.shape) |
|
self.NumVoxels = self.GridSize[0] * self.GridSize[1] * self.GridSize[2] |
|
self.Image = ct |
|
|
|
|
|
self.VoxelX = self.ImagePositionPatient[0] + \ |
|
np.arange(self.GridSize[0])*self.PixelSpacing[0] |
|
self.VoxelY = self.ImagePositionPatient[1] + \ |
|
np.arange(self.GridSize[1])*self.PixelSpacing[1] |
|
self.VoxelZ = self.ImagePositionPatient[2] + \ |
|
np.arange(self.GridSize[2])*self.PixelSpacing[2] |
|
self.isLoaded = 1 |
|
|
|
def import_Dicom_CT(self): |
|
|
|
if (self.isLoaded == 1): |
|
print("Warning: CT serries " + |
|
self.SeriesInstanceUID + " is already loaded") |
|
return |
|
|
|
images = [] |
|
SOPInstanceUIDs = [] |
|
SliceLocation = np.zeros(len(self.DcmFiles), dtype='float') |
|
|
|
for i in range(len(self.DcmFiles)): |
|
file_path = self.DcmFiles[i] |
|
dcm = pydicom.dcmread(file_path) |
|
|
|
if (hasattr(dcm, 'SliceLocation') and abs(dcm.SliceLocation - dcm.ImagePositionPatient[2]) > 0.001): |
|
print("WARNING: SliceLocation (" + str(dcm.SliceLocation) + |
|
") is different than ImagePositionPatient[2] (" + str(dcm.ImagePositionPatient[2]) + ") for " + file_path) |
|
|
|
SliceLocation[i] = float(dcm.ImagePositionPatient[2]) |
|
images.append(dcm.pixel_array * dcm.RescaleSlope + |
|
dcm.RescaleIntercept) |
|
SOPInstanceUIDs.append(dcm.SOPInstanceUID) |
|
|
|
|
|
sort_index = np.argsort(SliceLocation) |
|
SliceLocation = SliceLocation[sort_index] |
|
SOPInstanceUIDs = [SOPInstanceUIDs[n] for n in sort_index] |
|
images = [images[n] for n in sort_index] |
|
ct = np.dstack(images).astype("float32") |
|
|
|
if ct.shape[0:2] != (dcm.Rows, dcm.Columns): |
|
print("WARNING: GridSize " + str(ct.shape[0:2]) + " different from Dicom Rows (" + str( |
|
dcm.Rows) + ") and Columns (" + str(dcm.Columns) + ")") |
|
|
|
MeanSliceDistance = ( |
|
SliceLocation[-1] - SliceLocation[0]) / (len(images)-1) |
|
if (abs(MeanSliceDistance - dcm.SliceThickness) > 0.001): |
|
print("WARNING: MeanSliceDistance (" + str(MeanSliceDistance) + |
|
") is different from SliceThickness (" + str(dcm.SliceThickness) + ")") |
|
|
|
self.FrameOfReferenceUID = dcm.FrameOfReferenceUID |
|
self.ImagePositionPatient = [float(dcm.ImagePositionPatient[0]), float( |
|
dcm.ImagePositionPatient[1]), SliceLocation[0]] |
|
self.PixelSpacing = [float(dcm.PixelSpacing[0]), float( |
|
dcm.PixelSpacing[1]), MeanSliceDistance] |
|
self.GridSize = list(ct.shape) |
|
self.NumVoxels = self.GridSize[0] * self.GridSize[1] * self.GridSize[2] |
|
self.Image = ct |
|
self.SOPInstanceUIDs = SOPInstanceUIDs |
|
self.VoxelX = self.ImagePositionPatient[0] + \ |
|
np.arange(self.GridSize[0])*self.PixelSpacing[0] |
|
self.VoxelY = self.ImagePositionPatient[1] + \ |
|
np.arange(self.GridSize[1])*self.PixelSpacing[1] |
|
self.VoxelZ = self.ImagePositionPatient[2] + \ |
|
np.arange(self.GridSize[2])*self.PixelSpacing[2] |
|
self.isLoaded = 1 |
|
|
|
print("Convert CT dicom to nii done") |
|
|