import os, cv2
import numpy as np
from network_configure import conf_unet
from network import *
from utils.predict_utils import get_coord, PercentileNormalizer, PadAndCropResizer
from utils.train_utils import augment_patch
from utils import train_utils
# UNet implementation inherited from GVTNets: https://github.com/zhengyang-wang/GVTNets
training_config = {'base_learning_rate': 0.0004,
class Noise2Same(object):
def __init__(self, base_dir, name,
dim=2, in_channels=1, lmbd=None,
masking='gaussian', mask_perc=0.5,
opt_config=training_config, **kwargs):
self.base_dir = base_dir # model direction
self.name = name # model name
self.dim = dim # image dimension
self.in_channels = in_channels # image channels
self.lmbd = lmbd # lambda in loss fn
self.masking = masking
self.mask_perc = mask_perc
self.opt_config = opt_config
conf_unet['dimension'] = '%dD'%dim
self.net = UNet(conf_unet)
def _model_fn(self, features, labels, mode):
conv_op = convolution_2D if self.dim==2 else convolution_3D
axis = {3:[1,2,3,4], 2:[1,2,3]}[self.dim]
def image_summary(img):
return tf.reduce_max(img, axis=1) if self.dim == 3 else img
# Local average excluding the center pixel (donut)
def mask_kernel(features):
kernel = (np.array([[0.5, 1.0, 0.5], [1.0, 0.0, 1.0], [0.5, 1.0, 0.5]])
if self.dim == 2 else
np.array([[[0, 0.5, 0], [0.5, 1.0, 0.5], [0, 0.5, 0]],
[[0.5, 1.0, 0.5], [1.0, 0.0, 1.0], [0.5, 1.0, 0.5]],
[[0, 0.5, 0], [0.5, 1.0, 0.5], [0, 0.5, 0]]]))
kernel = (kernel/kernel.sum())
kernels = np.empty([3, 3, self.in_channels, self.in_channels])
for i in range(self.in_channels):
kernels[:,:,i,i] = kernel
nn_conv_op = tf.nn.conv2d if self.dim == 2 else tf.nn.conv3d
return nn_conv_op(features, tf.constant(kernels.astype('float32')),
[1]*self.dim+[1,1], padding='SAME')
if not mode == tf.estimator.ModeKeys.PREDICT:
noise, mask = tf.split(labels, [self.in_channels, self.in_channels], -1)
if self.masking == 'gaussian':
masked_features = (1 - mask) * features + mask * noise
elif self.masking == 'donut':
masked_features = (1 - mask) * features + mask * mask_kernel(features)
raise NotImplementedError
# Prediction from masked input
with tf.variable_scope('main_unet', reuse=tf.compat.v1.AUTO_REUSE):
out = self.net(masked_features, mode == tf.estimator.ModeKeys.TRAIN)
out = batch_norm(out, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out')
out = relu(out)
preds = conv_op(out, self.in_channels, 1, 1, False, name = 'out_conv')
# Prediction from full input
with tf.variable_scope('main_unet', reuse=tf.compat.v1.AUTO_REUSE):
rawout = self.net(features, mode == tf.estimator.ModeKeys.TRAIN)
rawout = batch_norm(rawout, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out')
rawout = relu(rawout)
rawpreds = conv_op(rawout, self.in_channels, 1, 1, False, name = 'out_conv')
# Loss components
rec_mse = tf.reduce_mean(tf.square(rawpreds - features), axis=None)
inv_mse = tf.reduce_sum(tf.square(rawpreds - preds) * mask) / tf.reduce_sum(mask)
bsp_mse = tf.reduce_sum(tf.square(features - preds) * mask) / tf.reduce_sum(mask)
# Tensorboard display
tf.summary.image('1_inputs', image_summary(features), max_outputs=3)
tf.summary.image('2_raw_predictions', image_summary(rawpreds), max_outputs=3)
tf.summary.image('3_mask', image_summary(mask), max_outputs=3)
tf.summary.image('4_masked_predictions', image_summary(preds), max_outputs=3)
tf.summary.image('5_difference', image_summary(rawpreds-preds), max_outputs=3)
tf.summary.image('6_rec_error', image_summary(preds-features), max_outputs=3)
tf.summary.scalar('reconstruction', rec_mse, family='loss_metric')
tf.summary.scalar('invariance', inv_mse, family='loss_metric')
tf.summary.scalar('blind_spot', bsp_mse, family='loss_metric')
with tf.variable_scope('main_unet'):
out = self.net(features, mode == tf.estimator.ModeKeys.TRAIN)
out = batch_norm(out, mode == tf.estimator.ModeKeys.TRAIN, 'unet_out')
out = relu(out)
preds = conv_op(out, self.in_channels, 1, 1, False, name = 'out_conv')
return tf.estimator.EstimatorSpec(mode=mode, predictions=preds)
lmbd = 2 if self.lmbd is None else self.lmbd
loss = rec_mse + lmbd*tf.sqrt(inv_mse)
if mode == tf.estimator.ModeKeys.TRAIN:
global_step = tf.train.get_or_create_global_step()
learning_rate = tf.train.exponential_decay(self.opt_config['base_learning_rate'],
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, scope='main_unet')
with tf.control_dependencies(update_ops):
train_op = optimizer.minimize(loss, global_step)
train_op = None
metrics = {'loss_metric/invariance':tf.metrics.mean(inv_mse),
return tf.estimator.EstimatorSpec(mode=mode, predictions=preds, loss=loss, train_op=train_op,
def _input_fn(self, sources, patch_size, batch_size, is_train=True):
# Stratified sampling inherited from Noise2Void: https://github.com/juglab/n2v
get_stratified_coords = getattr(train_utils, 'get_stratified_coords%dD'%self.dim)
rand_float_coords = getattr(train_utils, 'rand_float_coords%dD'%self.dim)
def generator():
source = sources[np.random.randint(len(sources))]
valid_shape = source.shape[:-1] - np.array(patch_size)
if any([s<=0 for s in valid_shape]):
source_patch = augment_patch(source)
coords = [np.random.randint(0, shape_i+1) for shape_i in valid_shape]
s = tuple([slice(coord, coord+size) for coord, size in zip(coords, patch_size)])
source_patch = augment_patch(source[s])
mask = np.zeros_like(source_patch)
for c in range(self.in_channels):
boxsize = np.round(np.sqrt(100/self.mask_perc)).astype(np.int)
maskcoords = get_stratified_coords(rand_float_coords(boxsize),
box_size=boxsize, shape=tuple(patch_size))
indexing = maskcoords + (c,)
mask[indexing] = 1.0
noise_patch = np.concatenate([np.random.normal(0, 0.2, source_patch.shape), mask], axis=-1)
yield source_patch, noise_patch
def generator_val():
for idx in range(len(sources)):
source_patch = sources[idx]
patch_size = source_patch.shape[:-1]
boxsize = np.round(np.sqrt(100/self.mask_perc)).astype(np.int)
maskcoords = get_stratified_coords(rand_float_coords(boxsize),
box_size=boxsize, shape=tuple(patch_size))
indexing = maskcoords + (0,)
mask = np.zeros_like(source_patch)
mask[indexing] = 1.0
noise_patch = np.concatenate([np.random.normal(0, 0.2, source_patch.shape), mask], axis=-1)
yield source_patch, noise_patch
output_types = (tf.float32, tf.float32)
output_shapes = (tf.TensorShape(list(patch_size) + [self.in_channels]),
tf.TensorShape(list(patch_size) + [self.in_channels*2]))
gen = generator if is_train else generator_val
dataset = tf.data.Dataset.from_generator(gen, output_types=output_types, output_shapes=output_shapes)
dataset = dataset.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
return dataset
def train(self, source_lst, patch_size, validation=None, batch_size=64, save_steps=1000, log_steps=200, steps=50000):
assert len(patch_size)==self.dim
assert len(source_lst[0].shape)==self.dim + 1
assert source_lst[0].shape[-1]==self.in_channels
ses_config = tf.ConfigProto()
ses_config.gpu_options.allow_growth = True
run_config = tf.estimator.RunConfig(model_dir=self.base_dir+'/'+self.name,
estimator = tf.estimator.Estimator(model_fn=self._model_fn,
input_fn = lambda: self._input_fn(source_lst, patch_size, batch_size=batch_size)
if validation is not None:
train_spec = tf.estimator.TrainSpec(input_fn=input_fn, max_steps=steps)
val_input_fn = lambda: self._input_fn(validation.astype('float32'),
eval_spec = tf.estimator.EvalSpec(input_fn=val_input_fn, throttle_secs=120)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
estimator.train(input_fn=input_fn, steps=steps)
# Used for single image prediction
def predict(self, image, resizer=PadAndCropResizer(), checkpoint_path=None,
im_mean=None, im_std=None):
estimator = tf.estimator.Estimator(model_fn=self._model_fn,
im_mean, im_std = ((image.mean(), image.std()) if im_mean is None or im_std is None else (im_mean, im_std))
image = (image - im_mean)/im_std
if self.in_channels == 1:
image = resizer.before(image, 2 ** (self.net.depth), exclude=None)
input_fn = tf.estimator.inputs.numpy_input_fn(x=image[None, ..., None], batch_size=1, num_epochs=1, shuffle=False)
image = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0][..., 0]
image = resizer.after(image, exclude=None)
image = resizer.before(image, 2 ** (self.net.depth), exclude=-1)
input_fn = tf.estimator.inputs.numpy_input_fn(x=image[None], batch_size=1, num_epochs=1, shuffle=False)
image = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0]
image = resizer.after(image, exclude=-1)
image = image*im_std + im_mean
return image
# Used for batch images prediction
def batch_predict(self, images, resizer=PadAndCropResizer(), checkpoint_path=None,
im_mean=None, im_std=None, batch_size=32):
estimator = tf.estimator.Estimator(model_fn=self._model_fn,
im_mean, im_std = ((images.mean(), images.std()) if im_mean is None or im_std is None else (im_mean, im_std))
images = (images - im_mean)/im_std
images = resizer.before(images, 2 ** (self.net.depth), exclude=0)
input_fn = tf.estimator.inputs.numpy_input_fn(x=images[ ..., None], batch_size=batch_size, num_epochs=1, shuffle=False)
images = np.stack(list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path)))[..., 0]
images = resizer.after(images, exclude=0)
images = images*im_std + im_mean
return images
# Used for extremely large input images
def crop_predict(self, image, size, margin, resizer=PadAndCropResizer(), checkpoint_path=None,
im_mean=None, im_std=None):
estimator = tf.estimator.Estimator(model_fn=self._model_fn,
im_mean, im_std = ((image.mean(), image.std()) if im_mean is None or im_std is None else (im_mean, im_std))
image = (image - im_mean)/im_std
out_image = np.empty(image.shape, dtype='float32')
for src_s, trg_s, mrg_s in get_coord(image.shape, size, margin):
patch = resizer.before(image[src_s], 2 ** (self.net.depth), exclude=None)
input_fn = tf.estimator.inputs.numpy_input_fn(x=patch[None, ..., None], batch_size=1, num_epochs=1, shuffle=False)
patch = list(estimator.predict(input_fn=input_fn, checkpoint_path=checkpoint_path))[0][..., 0]
patch = resizer.after(patch, exclude=None)
out_image[trg_s] = patch[mrg_s]
image = out_image*im_std + im_mean
return image