# Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Tuple import numpy as np from batchgenerators.utilities.file_and_folder_operations import join, maybe_mkdir_p from nnunet.network_architecture.neural_network import SegmentationNetwork from nnunet.training.data_augmentation.data_augmentation_noDA import get_no_augmentation from nnunet.training.dataloading.dataset_loading import unpack_dataset, DataLoader3D, DataLoader2D from nnunet.training.loss_functions.deep_supervision import MultipleOutputLoss2 from nnunet.training.network_training.nnUNetTrainerV2 import nnUNetTrainerV2 from torch import nn class nnUNetTrainerV2_noDataAugmentation(nnUNetTrainerV2): def setup_DA_params(self): super().setup_DA_params() # important because we need to know in validation and inference that we did not mirror in training self.data_aug_params["do_mirror"] = False self.data_aug_params["mirror_axes"] = tuple() def get_basic_generators(self): self.load_dataset() self.do_split() if self.threeD: dl_tr = DataLoader3D(self.dataset_tr, self.patch_size, self.patch_size, self.batch_size, False, oversample_foreground_percent=self.oversample_foreground_percent , pad_mode="constant", pad_sides=self.pad_all_sides) dl_val = DataLoader3D(self.dataset_val, self.patch_size, self.patch_size, self.batch_size, False, oversample_foreground_percent=self.oversample_foreground_percent, pad_mode="constant", pad_sides=self.pad_all_sides) else: dl_tr = DataLoader2D(self.dataset_tr, self.patch_size, self.patch_size, self.batch_size, transpose=self.plans.get('transpose_forward'), oversample_foreground_percent=self.oversample_foreground_percent , pad_mode="constant", pad_sides=self.pad_all_sides) dl_val = DataLoader2D(self.dataset_val, self.patch_size, self.patch_size, self.batch_size, transpose=self.plans.get('transpose_forward'), oversample_foreground_percent=self.oversample_foreground_percent, pad_mode="constant", pad_sides=self.pad_all_sides) return dl_tr, dl_val def initialize(self, training=True, force_load_plans=False): if not self.was_initialized: maybe_mkdir_p(self.output_folder) if force_load_plans or (self.plans is None): self.load_plans_file() self.process_plans(self.plans) self.setup_DA_params() ################# Here we wrap the loss for deep supervision ############ # we need to know the number of outputs of the network net_numpool = len(self.net_num_pool_op_kernel_sizes) # we give each output a weight which decreases exponentially (division by 2) as the resolution decreases # this gives higher resolution outputs more weight in the loss weights = np.array([1 / (2 ** i) for i in range(net_numpool)]) # we don't use the lowest 2 outputs. Normalize weights so that they sum to 1 mask = np.array([True if i < net_numpool - 1 else False for i in range(net_numpool)]) weights[~mask] = 0 weights = weights / weights.sum() # now wrap the loss self.loss = MultipleOutputLoss2(self.loss, weights) ################# END ################### self.folder_with_preprocessed_data = join(self.dataset_directory, self.plans['data_identifier'] + "_stage%d" % self.stage) if training: self.dl_tr, self.dl_val = self.get_basic_generators() if self.unpack_data: print("unpacking dataset") unpack_dataset(self.folder_with_preprocessed_data) print("done") else: print( "INFO: Not unpacking data! Training may be slow due to that. Pray you are not using 2d or you " "will wait all winter for your model to finish!") self.tr_gen, self.val_gen = get_no_augmentation(self.dl_tr, self.dl_val, params=self.data_aug_params, deep_supervision_scales=self.deep_supervision_scales, pin_memory=self.pin_memory) self.print_to_log_file("TRAINING KEYS:\n %s" % (str(self.dataset_tr.keys())), also_print_to_console=False) self.print_to_log_file("VALIDATION KEYS:\n %s" % (str(self.dataset_val.keys())), also_print_to_console=False) else: pass self.initialize_network() self.initialize_optimizer_and_scheduler() assert isinstance(self.network, (SegmentationNetwork, nn.DataParallel)) else: self.print_to_log_file('self.was_initialized is True, not running self.initialize again') self.was_initialized = True def validate(self, do_mirroring: bool = True, use_sliding_window: bool = True, step_size: float = 0.5, save_softmax: bool = True, use_gaussian: bool = True, overwrite: bool = True, validation_folder_name: str = 'validation_raw', debug: bool = False, all_in_gpu: bool = False, segmentation_export_kwargs: dict = None, run_postprocessing_on_folds: bool = True): """ We need to wrap this because we need to enforce self.network.do_ds = False for prediction """ ds = self.network.do_ds if do_mirroring: print("WARNING! do_mirroring was True but we cannot do that because we trained without mirroring. " "do_mirroring was set to False") do_mirroring = False self.network.do_ds = False ret = super().validate(do_mirroring=do_mirroring, use_sliding_window=use_sliding_window, step_size=step_size, save_softmax=save_softmax, use_gaussian=use_gaussian, overwrite=overwrite, validation_folder_name=validation_folder_name, debug=debug, all_in_gpu=all_in_gpu, segmentation_export_kwargs=segmentation_export_kwargs, run_postprocessing_on_folds=run_postprocessing_on_folds) self.network.do_ds = ds return ret nnUNetTrainerV2_noDataAugmentation_copy1 = nnUNetTrainerV2_noDataAugmentation nnUNetTrainerV2_noDataAugmentation_copy2 = nnUNetTrainerV2_noDataAugmentation nnUNetTrainerV2_noDataAugmentation_copy3 = nnUNetTrainerV2_noDataAugmentation nnUNetTrainerV2_noDataAugmentation_copy4 = nnUNetTrainerV2_noDataAugmentation