Spaces:
Build error
Build error
import os, cv2 | |
import numpy as np | |
from network_configure import conf_unet | |
from network import * | |
from utils.predict_utils import get_coord, PercentileNormalizer, PadAndCropResizer | |
from utils.train_utils import augment_patch | |
from utils import train_utils | |
# UNet implementation inherited from GVTNets: https://github.com/zhengyang-wang/GVTNets | |
training_config = {'base_learning_rate': 0.0004, | |
'lr_decay_steps':5e3, | |
'lr_decay_rate':0.5, | |
'lr_staircase':True} | |
class Noise2Same(object): | |
def __init__(self, base_dir, name, | |
dim=2, in_channels=1, lmbd=None, | |
masking='gaussian', mask_perc=0.5, | |
opt_config=training_config, **kwargs): | |
self.base_dir = base_dir # model direction | |
self.name = name # model name | |
self.dim = dim # image dimension | |
self.in_channels = in_channels # image channels | |
self.lmbd = lmbd # lambda in loss fn | |
self.masking = masking | |
self.mask_perc = mask_perc | |
self.opt_config = opt_config | |
conf_unet['dimension'] = '%dD'%dim | |
self.net = UNet(conf_unet) | |
def _model_fn(self, features, labels, mode): | |
conv_op = convolution_2D if self.dim==2 else convolution_3D | |
axis = {3:[1,2,3,4], 2:[1,2,3]}[self.dim] | |
def image_summary(img): | |
return tf.reduce_max(img, axis=1) if self.dim == 3 else img | |
# Local average excluding the center pixel (donut) | |
def mask_kernel(features): | |
kernel = (np.array([[0.5, 1.0, 0.5], [1.0, 0.0, 1.0], [0.5, 1.0, 0.5]]) | |
if self.dim == 2 else | |
np.array([[[0, 0.5, 0], [0.5, 1.0, 0.5], [0, 0.5, 0]], | |
[[0.5, 1.0, 0.5], [1.0, 0.0, 1.0], [0.5, 1.0, 0.5]], | |
[[0, 0.5, 0], [0.5, 1.0, 0.5], [0, 0.5, 0]]])) | |
kernel = (kernel/kernel.sum()) | |
kernels = np.empty([3, 3, self.in_channels, self.in_channels]) | |
for i in range(self.in_channels): | |
kernels[:,:,i,i] = kernel | |
nn_conv_op = tf.nn.conv2d if self.dim == 2 else tf.nn.conv3d | |
return nn_conv_op(features, tf.constant(kernels.astype('float32')), | |
[1]*self.dim+[1,1], padding='SAME') | |
if not mode == tf.estimator.ModeKeys.PREDICT: | |
noise, mask = tf.split(labels, [self.in_channels, self.in_channels], -1) | |
if self.masking == 'gaussian': | |
masked_features = (1 - mask) * features + mask * noise | |
elif self.masking == 'donut': | |
masked_features = (1 - mask) * features + mask * mask_kernel(features) | |
else: | |
raise NotImplementedError | |
# Prediction from masked input | |
with tf.variable_scope('main_unet', reuse=tf.compat.v1.AUTO_REUSE): | |
out = self.net(masked_features, mode == tf.estimator.ModeKeys.TRAIN) | |
out = batch_norm(out, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out') | |
out = relu(out) | |
preds = conv_op(out, self.in_channels, 1, 1, False, name = 'out_conv') | |
# Prediction from full input | |
with tf.variable_scope('main_unet', reuse=tf.compat.v1.AUTO_REUSE): | |
rawout = self.net(features, mode == tf.estimator.ModeKeys.TRAIN) | |
rawout = batch_norm(rawout, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out') | |
rawout = relu(rawout) | |
rawpreds = conv_op(rawout, self.in_channels, 1, 1, False, name = 'out_conv') | |
# Loss components | |
rec_mse = tf.reduce_mean(tf.square(rawpreds - features), axis=None) | |
inv_mse = tf.reduce_sum(tf.square(rawpreds - preds) * mask) / tf.reduce_sum(mask) | |
bsp_mse = tf.reduce_sum(tf.square(features - preds) * mask) / tf.reduce_sum(mask) | |
# Tensorboard display | |
tf.summary.image('1_inputs', image_summary(features), max_outputs=3) | |
tf.summary.image('2_raw_predictions', image_summary(rawpreds), max_outputs=3) | |
tf.summary.image('3_mask', image_summary(mask), max_outputs=3) | |
tf.summary.image('4_masked_predictions', image_summary(preds), max_outputs=3) | |
tf.summary.image('5_difference', image_summary(rawpreds-preds), max_outputs=3) | |
tf.summary.image('6_rec_error', image_summary(preds-features), max_outputs=3) | |
tf.summary.scalar('reconstruction', rec_mse, family='loss_metric') | |
tf.summary.scalar('invariance', inv_mse, family='loss_metric') | |
tf.summary.scalar('blind_spot', bsp_mse, family='loss_metric') | |
else: | |
with tf.variable_scope('main_unet'): | |
out = self.net(features, mode == tf.estimator.ModeKeys.TRAIN) | |
out = batch_norm(out, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out') | |
out = relu(out) | |
preds = conv_op(out, self.in_channels, 1, 1, False, name = 'out_conv') | |
return tf.estimator.EstimatorSpec(mode=mode, predictions=preds) | |
lmbd = 2 if self.lmbd is None else self.lmbd | |
loss = rec_mse + lmbd*tf.sqrt(inv_mse) | |
if mode == tf.estimator.ModeKeys.TRAIN: | |
global_step = tf.train.get_or_create_global_step() | |
learning_rate = tf.train.exponential_decay(self.opt_config['base_learning_rate'], | |
global_step, | |
self.opt_config['lr_decay_steps'], | |
self.opt_config['lr_decay_rate'], | |
self.opt_config['lr_staircase']) | |
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) | |
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, scope='main_unet') | |
with tf.control_dependencies(update_ops): | |
train_op = optimizer.minimize(loss, global_step) | |
else: | |
train_op = None | |
metrics = {'loss_metric/invariance':tf.metrics.mean(inv_mse), | |
'loss_metric/blind_spot':tf.metrics.mean(bsp_mse), | |
'loss_metric/reconstruction':tf.metrics.mean(rec_mse)} | |
return tf.estimator.EstimatorSpec(mode=mode, predictions=preds, loss=loss, train_op=train_op, | |
eval_metric_ops=metrics) | |
def _input_fn(self, sources, patch_size, batch_size, is_train=True): | |
# Stratified sampling inherited from Noise2Void: https://github.com/juglab/n2v | |
get_stratified_coords = getattr(train_utils, 'get_stratified_coords%dD'%self.dim) | |
rand_float_coords = getattr(train_utils, 'rand_float_coords%dD'%self.dim) | |
def generator(): | |
while(True): | |
source = sources[np.random.randint(len(sources))] | |
valid_shape = source.shape[:-1] - np.array(patch_size) | |
if any([s<=0 for s in valid_shape]): | |
source_patch = augment_patch(source) | |
else: | |
coords = [np.random.randint(0, shape_i+1) for shape_i in valid_shape] | |
s = tuple([slice(coord, coord+size) for coord, size in zip(coords, patch_size)]) | |
source_patch = augment_patch(source[s]) | |
mask = np.zeros_like(source_patch) | |
for c in range(self.in_channels): | |
boxsize = np.round(np.sqrt(100/self.mask_perc)).astype(np.int) | |
maskcoords = get_stratified_coords(rand_float_coords(boxsize), | |
box_size=boxsize, shape=tuple(patch_size)) | |
indexing = maskcoords + (c,) | |
mask[indexing] = 1.0 | |
noise_patch = np.concatenate([np.random.normal(0, 0.2, source_patch.shape), mask], axis=-1) | |
yield source_patch, noise_patch | |
def generator_val(): | |
for idx in range(len(sources)): | |
source_patch = sources[idx] | |
patch_size = source_patch.shape[:-1] | |
boxsize = np.round(np.sqrt(100/self.mask_perc)).astype(np.int) | |
maskcoords = get_stratified_coords(rand_float_coords(boxsize), | |
box_size=boxsize, shape=tuple(patch_size)) | |
indexing = maskcoords + (0,) | |
mask = np.zeros_like(source_patch) | |
mask[indexing] = 1.0 | |
noise_patch = np.concatenate([np.random.normal(0, 0.2, source_patch.shape), mask], axis=-1) | |
yield source_patch, noise_patch | |
output_types = (tf.float32, tf.float32) | |
output_shapes = (tf.TensorShape(list(patch_size) + [self.in_channels]), | |
tf.TensorShape(list(patch_size) + [self.in_channels*2])) | |
gen = generator if is_train else generator_val | |
dataset = tf.data.Dataset.from_generator(gen, output_types=output_types, output_shapes=output_shapes) | |
dataset = dataset.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE) | |
return dataset | |
def train(self, source_lst, patch_size, validation=None, batch_size=64, save_steps=1000, log_steps=200, steps=50000): | |
assert len(patch_size)==self.dim | |
assert len(source_lst[0].shape)==self.dim + 1 | |
assert source_lst[0].shape[-1]==self.in_channels | |
ses_config = tf.ConfigProto() | |
ses_config.gpu_options.allow_growth = True | |
run_config = tf.estimator.RunConfig(model_dir=self.base_dir+'/'+self.name, | |
save_checkpoints_steps=save_steps, | |
session_config=ses_config, | |
log_step_count_steps=log_steps, | |
save_summary_steps=log_steps, | |
keep_checkpoint_max=2) | |
estimator = tf.estimator.Estimator(model_fn=self._model_fn, | |
model_dir=self.base_dir+'/'+self.name, | |
config=run_config) | |
input_fn = lambda: self._input_fn(source_lst, patch_size, batch_size=batch_size) | |
if validation is not None: | |
train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=steps) | |
val_input_fn = lambda: self._input_fn(validation.astype('float32'), | |
validation.shape[1:-1], | |
batch_size=4, | |
is_train=False) | |
eval_spec = tf.estimator.EvalSpec(input_fn=val_input_fn, throttle_secs=120) | |
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) | |
else: | |
estimator.train(input_fn=input_fn, steps=steps) | |
# Used for single image prediction | |
def predict(self, image, resizer=PadAndCropResizer(), checkpoint_path=None, | |
im_mean=None, im_std=None): | |
tf.logging.set_verbosity(tf.logging.ERROR) | |
estimator = tf.estimator.Estimator(model_fn=self._model_fn, | |
model_dir=self.base_dir+'/'+self.name) | |
im_mean, im_std = ((image.mean(), image.std()) if im_mean is None or im_std is None else (im_mean, im_std)) | |
image = (image - im_mean)/im_std | |
if self.in_channels == 1: | |
image = resizer.before(image, 2 ** (self.net.depth), exclude=None) | |
input_fn = tf.estimator.inputs.numpy_input_fn(x=image[None, ..., None], batch_size=1, num_epochs=1, shuffle=False) | |
image = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0][..., 0] | |
image = resizer.after(image, exclude=None) | |
else: | |
image = resizer.before(image, 2 ** (self.net.depth), exclude=-1) | |
input_fn = tf.estimator.inputs.numpy_input_fn(x=image[None], batch_size=1, num_epochs=1, shuffle=False) | |
image = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0] | |
image = resizer.after(image, exclude=-1) | |
image = image*im_std + im_mean | |
return image | |
# Used for batch images prediction | |
def batch_predict(self, images, resizer=PadAndCropResizer(), checkpoint_path=None, | |
im_mean=None, im_std=None, batch_size=32): | |
tf.logging.set_verbosity(tf.logging.ERROR) | |
estimator = tf.estimator.Estimator(model_fn=self._model_fn, | |
model_dir=self.base_dir+'/'+self.name) | |
im_mean, im_std = ((images.mean(), images.std()) if im_mean is None or im_std is None else (im_mean, im_std)) | |
images = (images - im_mean)/im_std | |
images = resizer.before(images, 2 ** (self.net.depth), exclude=0) | |
input_fn = tf.estimator.inputs.numpy_input_fn(x=images[ ..., None], batch_size=batch_size, num_epochs=1, shuffle=False) | |
images = np.stack(list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path)))[..., 0] | |
images = resizer.after(images, exclude=0) | |
images = images*im_std + im_mean | |
return images | |
# Used for extremely large input images | |
def crop_predict(self, image, size, margin, resizer=PadAndCropResizer(), checkpoint_path=None, | |
im_mean=None, im_std=None): | |
tf.logging.set_verbosity(tf.logging.ERROR) | |
estimator = tf.estimator.Estimator(model_fn=self._model_fn, | |
model_dir=self.base_dir+'/'+self.name) | |
im_mean, im_std = ((image.mean(), image.std()) if im_mean is None or im_std is None else (im_mean, im_std)) | |
image = (image - im_mean)/im_std | |
out_image = np.empty(image.shape, dtype='float32') | |
for src_s, trg_s, mrg_s in get_coord(image.shape, size, margin): | |
patch = resizer.before(image[src_s], 2 ** (self.net.depth), exclude=None) | |
input_fn = tf.estimator.inputs.numpy_input_fn(x=patch[None, ..., None], batch_size=1, num_epochs=1, shuffle=False) | |
patch = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0][..., 0] | |
patch = resizer.after(patch, exclude=None) | |
out_image[trg_s] = patch[mrg_s] | |
image = out_image*im_std + im_mean | |
return image | |