Spaces:
Sleeping
Sleeping
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. | |
from bisect import bisect_right | |
import math | |
import torch | |
# FIXME ideally this would be achieved with a CombinedLRScheduler, | |
# separating MultiStepLR with WarmupLR | |
# but the current LRScheduler design doesn't allow it | |
class WarmupMultiStepLR(torch.optim.lr_scheduler._LRScheduler): | |
def __init__( | |
self, | |
optimizer, | |
milestones, | |
gamma=0.1, | |
warmup_factor=1.0 / 3, | |
warmup_iters=500, | |
warmup_method="linear", | |
last_epoch=-1, | |
): | |
if not list(milestones) == sorted(milestones): | |
raise ValueError( | |
"Milestones should be a list of" " increasing integers. Got {}", | |
milestones, | |
) | |
if warmup_method not in ("constant", "linear"): | |
raise ValueError("Only 'constant' or 'linear' warmup_method accepted" "got {}".format(warmup_method)) | |
self.milestones = milestones | |
self.gamma = gamma | |
self.warmup_factor = warmup_factor | |
self.warmup_iters = warmup_iters | |
self.warmup_method = warmup_method | |
super(WarmupMultiStepLR, self).__init__(optimizer, last_epoch) | |
def get_lr(self): | |
warmup_factor = 1 | |
if self.last_epoch < self.warmup_iters: | |
if self.warmup_method == "constant": | |
warmup_factor = self.warmup_factor | |
elif self.warmup_method == "linear": | |
alpha = float(self.last_epoch) / self.warmup_iters | |
warmup_factor = self.warmup_factor * (1 - alpha) + alpha | |
return [ | |
base_lr * warmup_factor * self.gamma ** bisect_right(self.milestones, self.last_epoch) | |
for base_lr in self.base_lrs | |
] | |
class WarmupCosineAnnealingLR(torch.optim.lr_scheduler._LRScheduler): | |
def __init__( | |
self, | |
optimizer, | |
max_iters, | |
gamma=0.1, | |
warmup_factor=1.0 / 3, | |
warmup_iters=500, | |
warmup_method="linear", | |
eta_min=0, | |
last_epoch=-1, | |
): | |
if warmup_method not in ("constant", "linear"): | |
raise ValueError("Only 'constant' or 'linear' warmup_method accepted" "got {}".format(warmup_method)) | |
self.max_iters = max_iters | |
self.gamma = gamma | |
self.warmup_factor = warmup_factor | |
self.warmup_iters = warmup_iters | |
self.warmup_method = warmup_method | |
self.eta_min = eta_min | |
super(WarmupCosineAnnealingLR, self).__init__(optimizer, last_epoch) | |
def get_lr(self): | |
warmup_factor = 1 | |
if self.last_epoch < self.warmup_iters: | |
if self.warmup_method == "constant": | |
warmup_factor = self.warmup_factor | |
elif self.warmup_method == "linear": | |
alpha = float(self.last_epoch) / self.warmup_iters | |
warmup_factor = self.warmup_factor * (1 - alpha) + alpha | |
return [base_lr * warmup_factor for base_lr in self.base_lrs] | |
else: | |
return [ | |
self.eta_min | |
+ (base_lr - self.eta_min) | |
* (1 + math.cos(math.pi * (self.last_epoch - self.warmup_iters) / self.max_iters)) | |
/ 2 | |
for base_lr in self.base_lrs | |
] | |
class WarmupReduceLROnPlateau(torch.optim.lr_scheduler.ReduceLROnPlateau): | |
def __init__( | |
self, | |
optimizer, | |
max_iters, | |
gamma=0.1, | |
warmup_factor=1.0 / 3, | |
warmup_iters=500, | |
warmup_method="linear", | |
eta_min=0, | |
last_epoch=-1, | |
patience=5, | |
verbose=False, | |
): | |
if warmup_method not in ("constant", "linear"): | |
raise ValueError("Only 'constant' or 'linear' warmup_method accepted" "got {}".format(warmup_method)) | |
self.warmup_factor = warmup_factor | |
self.warmup_iters = warmup_iters | |
self.warmup_method = warmup_method | |
self.eta_min = eta_min | |
if last_epoch == -1: | |
for group in optimizer.param_groups: | |
group.setdefault("initial_lr", group["lr"]) | |
else: | |
for i, group in enumerate(optimizer.param_groups): | |
if "initial_lr" not in group: | |
raise KeyError( | |
"param 'initial_lr' is not specified " | |
"in param_groups[{}] when resuming an optimizer".format(i) | |
) | |
self.base_lrs = list(map(lambda group: group["initial_lr"], optimizer.param_groups)) | |
super(WarmupReduceLROnPlateau, self).__init__( | |
optimizer, factor=gamma, patience=patience, mode="max", min_lr=eta_min, verbose=verbose | |
) | |
def step(self, metrics=None): | |
warmup_factor = 1 | |
if self.last_epoch < self.warmup_iters: | |
if self.warmup_method == "constant": | |
warmup_factor = self.warmup_factor | |
elif self.warmup_method == "linear": | |
alpha = float(self.last_epoch) / self.warmup_iters | |
warmup_factor = self.warmup_factor * (1 - alpha) + alpha | |
if self.last_epoch >= self.warmup_iters - 1: | |
warmup_factor = 1.0 | |
warmup_lrs = [base_lr * warmup_factor for base_lr in self.base_lrs] | |
for param_group, lr in zip(self.optimizer.param_groups, warmup_lrs): | |
param_group["lr"] = lr | |
self.last_epoch += 1 | |
elif metrics: | |
super().step(metrics) | |