unet_code / src /lr_scheduler.py
weatherforecast1024's picture
Upload folder using huggingface_hub
f3b050a verified
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import math
import warnings
from typing import List
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler
class LinearWarmupCosineAnnealingLR(_LRScheduler):
"""Sets the learning rate of each parameter group to follow a linear warmup schedule between
warmup_start_lr and base_lr followed by a cosine annealing schedule between base_lr and
eta_min."""
def __init__(
self,
optimizer: Optimizer,
warmup_epochs: int,
max_epochs: int,
warmup_start_lr: float = 0.0,
eta_min: float = 0.0,
last_epoch: int = -1,
) -> None:
"""
Args:
optimizer (Optimizer): Wrapped optimizer.
warmup_epochs (int): Maximum number of iterations for linear warmup
max_epochs (int): Maximum number of iterations
warmup_start_lr (float): Learning rate to start the linear warmup. Default: 0.
eta_min (float): Minimum learning rate. Default: 0.
last_epoch (int): The index of last epoch. Default: -1.
"""
self.warmup_epochs = warmup_epochs
self.max_epochs = max_epochs
self.warmup_start_lr = warmup_start_lr
self.eta_min = eta_min
super().__init__(optimizer, last_epoch)
def get_lr(self) -> List[float]:
"""Compute learning rate using chainable form of the scheduler."""
if not self._get_lr_called_within_step:
warnings.warn(
"To get the last learning rate computed by the scheduler, " "please use `get_last_lr()`.",
UserWarning,
)
if self.last_epoch == self.warmup_epochs:
return self.base_lrs
if self.last_epoch == 0:
return [self.warmup_start_lr] * len(self.base_lrs)
if self.last_epoch < self.warmup_epochs:
return [
group["lr"] + (base_lr - self.warmup_start_lr) / (self.warmup_epochs - 1)
for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups)
]
if (self.last_epoch - 1 - self.max_epochs) % (2 * (self.max_epochs - self.warmup_epochs)) == 0:
return [
group["lr"]
+ (base_lr - self.eta_min) * (1 - math.cos(math.pi / (self.max_epochs - self.warmup_epochs))) / 2
for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups)
]
return [
(1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs)))
/ (
1
+ math.cos(
math.pi * (self.last_epoch - self.warmup_epochs - 1) / (self.max_epochs - self.warmup_epochs)
)
)
* (group["lr"] - self.eta_min)
+ self.eta_min
for group in self.optimizer.param_groups
]
def _get_closed_form_lr(self) -> List[float]:
"""Called when epoch is passed as a param to the `step` function of the scheduler."""
if self.last_epoch < self.warmup_epochs:
return [
self.warmup_start_lr
+ self.last_epoch * (base_lr - self.warmup_start_lr) / max(1, self.warmup_epochs - 1)
for base_lr in self.base_lrs
]
return [
self.eta_min
+ 0.5
* (base_lr - self.eta_min)
* (1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs)))
for base_lr in self.base_lrs
]