Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The Deeplab2 Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""This file contains code to create a Trainer for training and validation.""" | |
from typing import Dict, Any, Text | |
import orbit | |
import tensorflow as tf | |
from deeplab2 import common | |
from deeplab2 import config_pb2 | |
from deeplab2.model import utils | |
from deeplab2.trainer import runner_utils | |
class WarmUp(tf.keras.optimizers.schedules.LearningRateSchedule): | |
"""Applies a warmup schedule on a given learning rate decay schedule.""" | |
def __init__(self, | |
initial_learning_rate, | |
decay_schedule_fn, | |
warmup_steps, | |
name=None): | |
super(WarmUp, self).__init__() | |
self.initial_learning_rate = initial_learning_rate | |
self.warmup_steps = warmup_steps | |
self.decay_schedule_fn = decay_schedule_fn | |
self.name = name | |
def __call__(self, step): | |
with tf.name_scope(self.name or 'WarmUp') as name: | |
# Implements linear warmup. i.e., if global_step < warmup_steps, the | |
# learning rate will be `global_step/num_warmup_steps * init_lr`. | |
global_step_float = tf.cast(step, tf.float32) | |
warmup_steps_float = tf.cast(self.warmup_steps, tf.float32) | |
warmup_percent_done = global_step_float / warmup_steps_float | |
warmup_learning_rate = self.initial_learning_rate * warmup_percent_done | |
return tf.cond( | |
global_step_float < warmup_steps_float, | |
lambda: warmup_learning_rate, | |
lambda: self.decay_schedule_fn(step), | |
name=name) | |
def get_config(self): | |
return { | |
'initial_learning_rate': self.initial_learning_rate, | |
'decay_schedule_fn': self.decay_schedule_fn, | |
'warmup_steps': self.warmup_steps, | |
'name': self.name | |
} | |
def _create_optimizer( | |
solver_config: config_pb2.SolverOptions, | |
learning_rate_multiplier: float = 1.0) -> tf.keras.optimizers.Optimizer: | |
"""Creates an Optimizer based on the configuration. | |
Args: | |
solver_config: A trainer_pb2.SolverOptions configuration. | |
learning_rate_multiplier: A float, the learning rate multiplier applied on | |
top of the base learning rate. Default to 1.0. | |
Returns: | |
A tf.keras.optimizer.Optimizer. | |
Raises: | |
ValueError: An error occurs when the desired optimizer or learning rate | |
scheduler is not supported. | |
""" | |
learning_rate = (solver_config.base_learning_rate * learning_rate_multiplier) | |
if solver_config.learning_policy == 'poly': | |
lr_scheduler = tf.keras.optimizers.schedules.PolynomialDecay( | |
initial_learning_rate=learning_rate, | |
decay_steps=solver_config.training_number_of_steps, | |
end_learning_rate=solver_config.poly_end_learning_rate, | |
power=solver_config.poly_learning_power, | |
cycle=False) | |
elif solver_config.learning_policy == 'cosine': | |
lr_scheduler = tf.keras.experimental.CosineDecay( | |
initial_learning_rate=learning_rate, | |
decay_steps=solver_config.training_number_of_steps, | |
alpha=0.0) | |
else: | |
raise ValueError('Learning rate policy %s is not supported.' % | |
solver_config.learning_policy) | |
if solver_config.warmup_steps: | |
lr_scheduler = WarmUp( | |
initial_learning_rate=learning_rate, | |
decay_schedule_fn=lr_scheduler, | |
warmup_steps=solver_config.warmup_steps, | |
name='linear_warmup') | |
if solver_config.optimizer == 'adam': | |
return tf.keras.optimizers.Adam(learning_rate=lr_scheduler) | |
elif solver_config.optimizer == 'sgd': | |
# We use momentum = 0.9, the most frequently used case. | |
return tf.keras.optimizers.SGD(learning_rate=lr_scheduler, | |
momentum=0.9) | |
raise ValueError('Optimizer %s is not supported.' % solver_config.optimizer) | |
class Trainer(orbit.StandardTrainer): | |
"""Implements a Trainer for training DeepLab models.""" | |
def __init__(self, config: config_pb2.ExperimentOptions, | |
model: tf.keras.Model, loss: tf.keras.losses.Loss, | |
global_step: tf.Variable): | |
"""Initializes the trainer. | |
Args: | |
config: A config_pb2.ExperimentOptions configuration. | |
model: A tf.keras.Model. | |
loss: A tf.keras.losses.Loss. | |
global_step: A tf.Variable that records the global training step. | |
""" | |
self._strategy = tf.distribute.get_strategy() | |
support_panoptic = (common.TASK_PANOPTIC_SEGMENTATION in | |
utils.get_supported_tasks(config)) | |
train_dataset = runner_utils.create_dataset( | |
config.train_dataset_options, | |
is_training=True, | |
only_semantic_annotations=not support_panoptic) | |
train_dataset = orbit.utils.make_distributed_dataset( | |
self.strategy, train_dataset) | |
super(Trainer, self).__init__(train_dataset) | |
self._config = config | |
self._model = model | |
self._loss = loss | |
solver_options = config.trainer_options.solver_options | |
self._optimizer = _create_optimizer(solver_options) | |
self._backbone_optimizer = None | |
if solver_options.HasField('backbone_learning_rate_multiplier'): | |
self._backbone_optimizer = _create_optimizer( | |
solver_options, learning_rate_multiplier=( | |
solver_options.backbone_learning_rate_multiplier)) | |
self._global_step = global_step | |
self._use_gradient_clipping = solver_options.use_gradient_clipping | |
self._clip_gradient_norm = solver_options.clip_gradient_norm | |
self._train_loss_metric_dict = runner_utils.create_loss_metric_dict( | |
loss.get_loss_names(), prefix='train_') | |
def train_loop_begin(self): | |
"""Called once at the beginning of the training loop. | |
This method is called before dataset iterators creation. | |
""" | |
for metric in self._train_loss_metric_dict.values(): | |
metric.reset_states() | |
def _apply_gradients_to_optimizers(self, gradients_and_variables): | |
"""Applies gradients to their optimizers. | |
This function divides all trainable variables (and their gradients) into | |
two groups. One group contains backbone variables that have been pretrained, | |
e.g., on ImageNet classification. The other group contains all other | |
variables that are added specifically for the dense prediction task, e.g., | |
panoptic segmentation. Then, we apply two optimizers, optionally with two | |
learning rates, to the variables and gradients. | |
Args: | |
gradients_and_variables: A list of tuple of (gradient, variable) tensors. | |
""" | |
if self._backbone_optimizer is None: | |
self._optimizer.apply_gradients(gradients_and_variables) | |
else: | |
optimizer_inputs = [] | |
backbone_optimizer_inputs = [] | |
encoder = self._model.checkpoint_items['encoder'] | |
encoder_variable_names = [x.name for x in encoder.trainable_variables] | |
encoder_name = self._config.model_options.backbone.name | |
for gradient, variable in gradients_and_variables: | |
if runner_utils.check_if_variable_in_backbone(variable, encoder_name, | |
encoder_variable_names): | |
backbone_optimizer_inputs.append((gradient, variable)) | |
else: | |
optimizer_inputs.append((gradient, variable)) | |
self._optimizer.apply_gradients(optimizer_inputs) | |
self._backbone_optimizer.apply_gradients(backbone_optimizer_inputs) | |
def train_step(self, iterator): | |
"""Implements one step of training. | |
Runs one step of evaluation with respect to the chosen strategy. In case of | |
a distributed strategy, the replica results are gathered and returned. | |
Note that all operations within `_train_step` are tf.function compatible, as | |
they will be traced with tf.function. Any other/numpy operations are put in | |
`train_loop_begin` or `train_loop_end` functions. | |
Args: | |
iterator: A tf.nest-compatible structure of tf.data Iterator or | |
DistributedIterator. | |
""" | |
def step_fn(inputs): | |
self._train_step(inputs) | |
self._global_step.assign_add(1) | |
self._strategy.run(step_fn, args=(next(iterator),)) | |
def _train_step(self, inputs: Dict[Text, Any]): | |
"""Performs a forward and backward pass. | |
Args: | |
inputs: A dictionary to be consumed by the model. | |
""" | |
with tf.GradientTape() as tape: | |
outputs = self._model(inputs[common.IMAGE], training=True) | |
# Get the average per-batch loss and scale it down by the number of | |
# replicas. This ensures that we don't end up multiplying our loss by the | |
# number of workers - gradients are summed, not averaged, across replicas | |
# during the apply_gradients call. | |
loss_dict = self._loss(inputs, outputs) | |
# Average over the batch. | |
average_loss_dict = { | |
key: tf.reduce_mean(value) for key, value in loss_dict.items()} | |
total_loss = average_loss_dict[common.TOTAL_LOSS] | |
scaled_loss = total_loss / self.strategy.num_replicas_in_sync | |
training_vars = self._model.trainable_variables | |
gradients = tape.gradient(scaled_loss, training_vars) | |
# Apply gradient clipping. | |
if self._clip_gradient_norm > 0.0 and self._use_gradient_clipping: | |
gradients, _ = tf.clip_by_global_norm(gradients, self._clip_gradient_norm) | |
self._apply_gradients_to_optimizers(list(zip(gradients, training_vars))) | |
for name, value in average_loss_dict.items(): | |
self._train_loss_metric_dict[name].update_state(value) | |
def train_loop_end(self) -> Dict[Text, tf.Tensor]: | |
"""Called at the end of the training loop. | |
The value returned from this function will be returned as-is from the | |
train() method. | |
Returns: | |
A dictionary of `Tensors`, which will be written to logs and as | |
TensorBoard summaries. | |
""" | |
train_logs = {} | |
for loss_metric in self._train_loss_metric_dict.values(): | |
train_logs['losses/' + loss_metric.name] = loss_metric.result() | |
if callable(self._optimizer.learning_rate): | |
train_logs['learning_rate'] = self._optimizer.learning_rate( | |
self._global_step) | |
else: | |
train_logs['learning_rate'] = self._optimizer.learning_rate | |
return train_logs | |
def optimizer(self): | |
return self._optimizer | |
def backbone_optimizer(self): | |
return self._backbone_optimizer | |
def strategy(self): | |
return self._strategy | |
def global_step(self): | |
return self._global_step | |
def model(self): | |
return self._model | |