Upload 2 files
Browse files- +1 -0
- +271 -0
@@ -0,0 +1 @@
1 |
from .face_deid_ct import *
@@ -0,0 +1,271 @@
1 |
import os
2 |
import pydicom
3 |
import numpy as np
4 |
import cv2
5 |
from matplotlib import pyplot as plt
6 |
import random
7 |
from tqdm import tqdm
8 |
import time
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
def is_dicom(file_path):
19 |
20 |
21 |
return True
22 |
except Exception:
23 |
return False
24 |
25 |
def get_first_directory(path):
26 |
# Normalize the path to always use Unix-style path separators
27 |
normalized_path = path.replace("\\", "/")
28 |
split_path = normalized_path.split("/")[-1]
29 |
30 |
return split_path # Return None if no directories are found
31 |
32 |
def list_dicom_directories(root_dir):
33 |
dicom_dirs = set()
34 |
35 |
for root, dirs, files in os.walk(root_dir):
36 |
for file in files:
37 |
file_path = os.path.join(root, file)
38 |
if is_dicom(file_path):
39 |
40 |
41 |
42 |
return list(dicom_dirs)
43 |
44 |
def load_scan(path):
45 |
slices = [pydicom.read_file(path + '/' + s) for s in os.listdir(path)]
46 |
slices.sort(key = lambda x: float(x.ImagePositionPatient[2]))
47 |
48 |
slice_thickness = np.abs(slices[0].ImagePositionPatient[2] - slices[1].ImagePositionPatient[2])
49 |
50 |
slice_thickness = np.abs(slices[0].SliceLocation - slices[1].SliceLocation)
51 |
52 |
for s in slices:
53 |
s.SliceThickness = slice_thickness
54 |
55 |
return slices
56 |
57 |
def get_pixels_hu(slices):
58 |
image = np.stack([s.pixel_array for s in slices])
59 |
# Convert to int16 (from sometimes int16),
60 |
# should be possible as values should always be low enough (<32k)
61 |
image = image.astype(np.int16)
62 |
63 |
# Set outside-of-scan pixels to 0
64 |
# The intercept is usually -1024, so air is approximately 0
65 |
image[image == -2000] = 0
66 |
67 |
# Convert to Hounsfield units (HU)
68 |
for slice_number in range(len(slices)):
69 |
70 |
intercept = slices[slice_number].RescaleIntercept
71 |
slope = slices[slice_number].RescaleSlope
72 |
73 |
if slope != 1:
74 |
image[slice_number] = slope * image[slice_number].astype(np.float64)
75 |
image[slice_number] = image[slice_number].astype(np.int16)
76 |
77 |
image[slice_number] += np.int16(intercept)
78 |
79 |
return np.array(image, dtype=np.int16)
80 |
81 |
def binarize_volume(volume, air_hu=AIR_THRESHOLD):
82 |
binary_volume = np.zeros_like(volume, dtype=np.uint8)
83 |
binary_volume[volume <= air_hu] = 1
84 |
return binary_volume
85 |
86 |
def largest_connected_component(binary_image):
87 |
# Find all connected components and stats
88 |
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary_image, connectivity=8)
89 |
90 |
# Get the index of the largest component, ignoring the background
91 |
# The background is considered as a component by connectedComponentsWithStats and it is usually the first component
92 |
largest_component_index = np.argmax(stats[1:, cv2.CC_STAT_AREA]) + 1
93 |
94 |
# Create an image to keep largest component only
95 |
largest_component_image = np.zeros(labels.shape, dtype=np.uint8)
96 |
largest_component_image[labels == largest_component_index] = 1
97 |
98 |
return largest_component_image
99 |
100 |
def get_largest_component_volume(volume):
101 |
# Initialize an empty array to hold the processed volume
102 |
processed_volume = np.empty_like(volume, dtype=np.uint8)
103 |
104 |
# Iterate over each slice in the volume
105 |
for i in range(volume.shape[0]):
106 |
# Process the slice and store it in the processed volume
107 |
processed_volume[i] = largest_connected_component(volume[i])
108 |
109 |
return processed_volume
110 |
111 |
112 |
113 |
def dilate_volume(volume, kernel_size=KERNEL_SIZE):
114 |
# Create the structuring element (kernel) for dilation
115 |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
116 |
117 |
# Initialize an empty array to hold the dilated volume
118 |
dilated_volume = np.empty_like(volume)
119 |
120 |
# Iterate over each slice in the volume
121 |
for i in range(volume.shape[0]):
122 |
# Dilate the slice and store it in the dilated volume
123 |
dilated_volume[i] = cv2.dilate(volume[i].astype(np.uint8), kernel)
124 |
125 |
return dilated_volume
126 |
127 |
128 |
def apply_mask_and_get_values(image_volume, mask_volume):
129 |
# Apply the mask by multiplying the image volume with the mask volume
130 |
masked_volume = image_volume * mask_volume
131 |
132 |
# Get all unique values in the masked volume, excluding zero
133 |
unique_values = np.unique(masked_volume)
134 |
unique_values = unique_values[unique_values > FACE_MIN_VALUE]
135 |
unique_values = unique_values[unique_values < FACE_MAX_VALUE]
136 |
137 |
# Convert numpy array to a list
138 |
unique_values_list = unique_values.tolist()
139 |
140 |
return unique_values_list
141 |
142 |
143 |
def apply_random_values_optimized(pixels_hu, dilated_volume, unique_values_list):
144 |
# Initialize new volume as a copy of the original volume
145 |
new_volume = np.copy(pixels_hu)
146 |
147 |
# Generate random indices
148 |
random_indices = np.random.choice(len(unique_values_list), size=np.sum(dilated_volume))
149 |
150 |
# Select random values from the unique_values_list
151 |
random_values = np.array(unique_values_list)[random_indices]
152 |
153 |
# Apply the random values to the locations where dilated_volume equals 1
154 |
new_volume[dilated_volume == 1] = random_values
155 |
156 |
return new_volume
157 |
158 |
def save_new_dicom_files(new_volume, original_dir, out_path, app="_d"):
159 |
# Create a new directory path by appending "_d" to the original directory
160 |
if out_path is None:
161 |
new_dir = original_dir + app
162 |
163 |
new_dir = out_path
164 |
165 |
# Create the new directory if it doesn't exist
166 |
if not os.path.exists(new_dir):
167 |
168 |
169 |
# List all DICOM files in the original directory
170 |
dicom_files = [os.path.join(original_dir, f) for f in os.listdir(original_dir) if f.endswith('.dcm')]
171 |
172 |
# Sort the dicom_files list by SliceLocation
173 |
dicom_files.sort(key=lambda x: pydicom.dcmread(x).SliceLocation)
174 |
175 |
# Loop over each slice of the new volume
176 |
for i in range(new_volume.shape[0]):
177 |
# Get the corresponding original DICOM file
178 |
dicom_file = dicom_files[i]
179 |
180 |
# Read the file
181 |
ds = pydicom.dcmread(dicom_file)
182 |
183 |
# Revert the slope and intercept operation on the slice
184 |
new_slice = (new_volume[i] - ds.RescaleIntercept) / ds.RescaleSlope
185 |
186 |
# Update the pixel data with the data from the new slice
187 |
ds.PixelData = new_slice.astype(np.int16).tobytes()
188 |
189 |
# Generate new file name
190 |
new_file_name = os.path.join(new_dir, f"new_image_{i}.dcm")
191 |
192 |
# Save the new DICOM file
193 |
194 |
195 |
196 |
197 |
def drown_volume(in_path, out_path=None, replacer='face'):
198 |
199 |
Processes DICOM files from the provided directory by binarizing, getting the largest connected component,
200 |
dilating and applying mask. Then applies random values to the dilated volume based on a unique values list
201 |
obtained from the masked volume (or air value). The results are saved as new DICOM files in a specified directory.
202 |
203 |
204 |
in_path (str): The path to the directory containing the input DICOM files.
205 |
out_path (str, optional): The path to the directory where the output DICOM files will be saved.
206 |
If not provided, the output files will be saved in the input directory appended by "_d".
207 |
replacer (str, optional): Indicates what kind of pixels are going to be replaced. Default is 'face'.
208 |
'face': replaces air and face with random values that are found in the skin and subcutaneous fat.
209 |
'air': replaces air and face with -1000 HU.
210 |
int: replaces air and face with int HU.
211 |
212 |
213 |
None. The function saves new DICOM files and prints the total elapsed time of the operation.
214 |
215 |
start_time = time.time()
216 |
217 |
if out_path is None:
218 |
out_path = '_d'
219 |
220 |
221 |
dirs = list_dicom_directories(in_path)
222 |
223 |
for _d in tqdm(dirs):
224 |
225 |
with tqdm(total=8, desc="Processing DICOM Files", ncols=80) as pbar:
226 |
# Load the DICOM files
227 |
slices = load_scan(_d)
228 |
229 |
230 |
# Get the pixel values and convert them to Hounsfield Units (HU)
231 |
pixels_hu = get_pixels_hu(slices)
232 |
233 |
234 |
# Apply the binarization function on the HU volume
235 |
binarized_volume = binarize_volume(pixels_hu)
236 |
237 |
238 |
# Get the largest connected component from the binarized volume
239 |
processed_volume = get_largest_component_volume(binarized_volume)
240 |
241 |
242 |
# Dilate the processed volume
243 |
dilated_volume = dilate_volume(processed_volume)
244 |
245 |
if replacer == 'face':
246 |
# Apply the mask to the original volume and get unique values list
247 |
unique_values_list = apply_mask_and_get_values(pixels_hu, dilated_volume - processed_volume)
248 |
elif replacer == 'air':
249 |
unique_values_list = [0]
250 |
251 |
252 |
replacer = int(replacer)
253 |
unique_values_list = [replacer]
254 |
255 |
print('replacer must be either air, face, or an integer number in Hounsfield units, but ' + str(replacer) + ' was provided.')
256 |
print('replacing with face')
257 |
unique_values_list = apply_mask_and_get_values(pixels_hu, dilated_volume - processed_volume)
258 |
259 |
260 |
261 |
# Apply random values to the dilated volume based on the unique values list
262 |
new_volume = apply_random_values_optimized(pixels_hu, dilated_volume, unique_values_list)
263 |
264 |
265 |
# Save the new DICOM files
266 |
out_path_n = _d.replace(get_first_directory(_d), get_first_directory(_d) + out_path)
267 |
save_new_dicom_files(new_volume, _d, out_path_n)
268 |
269 |
270 |
elapsed_time = time.time() - start_time
271 |
print(f"Total elapsed time for 1 study: {elapsed_time} seconds")