|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
import random |
|
|
|
import omegaconf |
|
import pytest |
|
import pytorch_lightning as pl |
|
import torch |
|
import torch.optim |
|
from pytorch_lightning.utilities import rank_zero_only |
|
|
|
from nemo.core import config, optim |
|
from nemo.core.optim.lr_scheduler import AVAILABLE_SCHEDULERS |
|
from nemo.core.optim.optimizers import AVAILABLE_OPTIMIZERS |
|
from nemo.utils import logging |
|
|
|
|
|
class TempModel(torch.nn.Module): |
|
def __init__(self): |
|
super(TempModel, self).__init__() |
|
self.layer = torch.nn.Linear(5, 1) |
|
|
|
def forward(self, x): |
|
x = self.layer(x) |
|
return x |
|
|
|
|
|
class OptCounter(torch.optim.SGD): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
for group in self.param_groups: |
|
group.setdefault('count', 0) |
|
|
|
def step(self, closure=None): |
|
for group in self.param_groups: |
|
group['count'] += 1 |
|
super().step(closure) |
|
|
|
|
|
class RandomDataset(torch.utils.data.Dataset): |
|
def __init__(self, dataset_len): |
|
super().__init__() |
|
self.__dataset_len = dataset_len |
|
|
|
def __getitem__(self, *args): |
|
return torch.randn(2) |
|
|
|
def __len__(self): |
|
return self.__dataset_len |
|
|
|
|
|
class ExampleModel(pl.LightningModule): |
|
def __init__(self, batch_size, dataset_len, drop_last, max_steps): |
|
super().__init__() |
|
self.l1 = torch.nn.modules.Linear(in_features=2, out_features=1) |
|
self.batch_size = batch_size |
|
self.dataset_len = dataset_len |
|
self.drop_last = drop_last |
|
self.max_steps = max_steps |
|
|
|
def train_dataloader(self): |
|
dataset = RandomDataset(self.dataset_len) |
|
return torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, drop_last=self.drop_last) |
|
|
|
def training_step(self, batch, batch_idx): |
|
output = self.l1(batch) |
|
output = torch.nn.functional.l1_loss(output, torch.ones(output.size()).to(output.device)) |
|
return {"loss": output} |
|
|
|
def configure_optimizers(self): |
|
self.my_opt = OptCounter(self.parameters(), lr=0.02) |
|
return self.my_opt |
|
|
|
|
|
class Callback(pl.callbacks.Callback): |
|
@rank_zero_only |
|
def on_train_end(self, trainer, module): |
|
count = module.my_opt.param_groups[0]['count'] |
|
if trainer.global_step != count or trainer.global_step != module.max_steps: |
|
logging.debug(f"max_epochs: {trainer.max_epochs}") |
|
logging.debug(f"accumulate_grad_batches: {trainer.accumulate_grad_batches}") |
|
logging.debug(f"limit_train_batches: {trainer.limit_train_batches}") |
|
logging.debug(f"num_devices: {trainer.num_devices}") |
|
logging.debug(f"batch_size: {module.batch_size}") |
|
logging.debug(f"dataset_len: {module.dataset_len}") |
|
logging.debug(f"drop_last: {module.drop_last}") |
|
logging.debug(f"{len(trainer.train_dataloader)}") |
|
logging.debug(f"{trainer.num_training_batches }") |
|
|
|
self.assert_counts(trainer, module, count) |
|
|
|
def assert_counts(self, trainer, module, count): |
|
assert trainer.global_step == count, f"{trainer.global_step} != {count} != {module.max_steps}" |
|
assert trainer.global_step == module.max_steps, f"{trainer.global_step} != {count} != {module.max_steps}" |
|
|
|
|
|
class SchedulerNoOpCallback(Callback): |
|
def on_train_batch_end(self, trainer: pl.Trainer, pl_module, outputs, batch, batch_idx): |
|
|
|
if (trainer.global_step + 1) % 3 == 0 and (trainer.global_step + 1) < pl_module.max_steps: |
|
schedulers = trainer.lr_scheduler_configs |
|
|
|
for scheduler in schedulers: |
|
|
|
|
|
scheduler.scheduler.last_epoch -= 2 |
|
scheduler.scheduler.step() |
|
|
|
|
|
trainer.fit_loop.max_steps = trainer.fit_loop.max_steps + 1 |
|
|
|
def assert_counts(self, trainer, module, count): |
|
num_skips = module.max_steps // 3 |
|
extra_steps = module.max_steps + num_skips |
|
assert trainer.global_step == count, f"{trainer.global_step} != {count} != {extra_steps}" |
|
assert trainer.global_step == extra_steps, f"{trainer.global_step} != {count} != {extra_steps}" |
|
|
|
|
|
class TestOptimizersSchedulers: |
|
INITIAL_LR = 0.1 |
|
MIN_LR = 1e-3 |
|
MAX_STEPS = 10 |
|
D_MODEL = 16 |
|
|
|
|
|
@pytest.mark.unit |
|
def test_get_optimizer(self): |
|
model = TempModel() |
|
if torch.cuda.is_available(): |
|
model.cuda() |
|
|
|
for opt_name in AVAILABLE_OPTIMIZERS.keys(): |
|
if opt_name == 'fused_adam': |
|
if not torch.cuda.is_available(): |
|
continue |
|
if opt_name == 'distributed_fused_adam': |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
opt_cls = optim.get_optimizer(opt_name) |
|
if opt_name == 'adafactor': |
|
|
|
opt = opt_cls(model.parameters()) |
|
else: |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
assert isinstance(opt, AVAILABLE_OPTIMIZERS[opt_name]) |
|
|
|
@pytest.mark.unit |
|
def test_register_optimizer(self): |
|
class TempOpt(torch.optim.SGD): |
|
pass |
|
|
|
class TempOptParams(config.optimizers.SGDParams): |
|
pass |
|
|
|
optim.register_optimizer('TempOpt', TempOpt, TempOptParams) |
|
|
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('TempOpt') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
assert isinstance(opt, TempOpt) |
|
|
|
@pytest.mark.unit |
|
def test_optim_config_parse_bypass(self): |
|
basic_optim_config = {'weight_decay': 0.001, 'betas': [0.8, 0.5]} |
|
parsed_params = optim.parse_optimizer_args('novograd', basic_optim_config) |
|
assert parsed_params['weight_decay'] == basic_optim_config['weight_decay'] |
|
assert parsed_params['betas'][0] == basic_optim_config['betas'][0] |
|
assert parsed_params['betas'][1] == basic_optim_config['betas'][1] |
|
|
|
dict_config = omegaconf.OmegaConf.create(basic_optim_config) |
|
parsed_params = optim.parse_optimizer_args('novograd', dict_config) |
|
assert parsed_params['weight_decay'] == dict_config['weight_decay'] |
|
assert parsed_params['betas'][0] == dict_config['betas'][0] |
|
assert parsed_params['betas'][1] == dict_config['betas'][1] |
|
|
|
@pytest.mark.unit |
|
def test_optim_config_parse_arg_by_name(self): |
|
basic_optim_config = {'name': 'auto', 'weight_decay': 0.001, 'betas': [0.8, 0.5]} |
|
parsed_params = optim.parse_optimizer_args('novograd', basic_optim_config) |
|
assert parsed_params['weight_decay'] == basic_optim_config['weight_decay'] |
|
assert parsed_params['betas'][0] == basic_optim_config['betas'][0] |
|
assert parsed_params['betas'][1] == basic_optim_config['betas'][1] |
|
|
|
dict_config = omegaconf.OmegaConf.create(basic_optim_config) |
|
parsed_params = optim.parse_optimizer_args('novograd', dict_config) |
|
assert parsed_params['weight_decay'] == dict_config['weight_decay'] |
|
assert parsed_params['betas'][0] == dict_config['betas'][0] |
|
assert parsed_params['betas'][1] == dict_config['betas'][1] |
|
|
|
with pytest.raises(omegaconf.errors.ConfigKeyError): |
|
optim.parse_optimizer_args('sgd', dict_config) |
|
|
|
@pytest.mark.unit |
|
def test_optim_config_parse_arg_by_target(self): |
|
basic_optim_config = { |
|
'_target_': 'nemo.core.config.NovogradParams', |
|
'params': {'weight_decay': 0.001, 'betas': [0.8, 0.5]}, |
|
} |
|
basic_optim_config = omegaconf.OmegaConf.create(basic_optim_config) |
|
parsed_params = optim.parse_optimizer_args('novograd', basic_optim_config) |
|
assert parsed_params['weight_decay'] == basic_optim_config['params']['weight_decay'] |
|
assert parsed_params['betas'][0] == basic_optim_config['params']['betas'][0] |
|
assert parsed_params['betas'][1] == basic_optim_config['params']['betas'][1] |
|
|
|
dict_config = omegaconf.OmegaConf.create(basic_optim_config) |
|
parsed_params = optim.parse_optimizer_args('novograd', dict_config) |
|
assert parsed_params['weight_decay'] == dict_config['params']['weight_decay'] |
|
assert parsed_params['betas'][0] == dict_config['params']['betas'][0] |
|
assert parsed_params['betas'][1] == dict_config['params']['betas'][1] |
|
|
|
|
|
|
|
output_config = optim.parse_optimizer_args('sgd', dict_config) |
|
sgd_config = vars(config.SGDParams()) |
|
novograd_config = vars(config.NovogradParams()) |
|
|
|
assert set(output_config.keys()) != set(sgd_config.keys()) |
|
assert set(output_config.keys()) == set(novograd_config) |
|
|
|
@pytest.mark.unit |
|
def test_get_scheduler(self): |
|
model = TempModel() |
|
optimizer = optim.Novograd(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
for sched_name in AVAILABLE_SCHEDULERS.keys(): |
|
sched_cls = optim.lr_scheduler.get_scheduler(sched_name) |
|
|
|
try: |
|
sched = sched_cls(optimizer) |
|
assert isinstance(sched, AVAILABLE_SCHEDULERS[sched_name]) |
|
continue |
|
except Exception: |
|
pass |
|
|
|
try: |
|
sched = sched_cls(optimizer, max_steps=self.MAX_STEPS) |
|
assert isinstance(sched, AVAILABLE_SCHEDULERS[sched_name]) |
|
continue |
|
except Exception: |
|
pass |
|
|
|
@pytest.mark.unit |
|
def test_register_scheduler(self): |
|
class TempSched(optim.lr_scheduler.CosineAnnealing): |
|
pass |
|
|
|
class TempSchedParams(config.schedulers.CosineAnnealingParams): |
|
pass |
|
|
|
optim.lr_scheduler.register_scheduler('TempSched', TempSched, TempSchedParams) |
|
|
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
sched_cls = optim.lr_scheduler.get_scheduler('TempSched') |
|
sched = sched_cls(opt, max_steps=self.MAX_STEPS) |
|
|
|
assert isinstance(sched, TempSched) |
|
|
|
@pytest.mark.unit |
|
def test_sched_config_parse_simple(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
basic_sched_config = {'name': 'CosineAnnealing', 'max_steps': 10} |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, basic_sched_config) |
|
assert isinstance(scheduler_setup['scheduler'], optim.lr_scheduler.CosineAnnealing) |
|
|
|
dict_config = omegaconf.OmegaConf.create(basic_sched_config) |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, dict_config) |
|
assert isinstance(scheduler_setup['scheduler'], optim.lr_scheduler.CosineAnnealing) |
|
|
|
@pytest.mark.unit |
|
def test_sched_config_parse_from_cls(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
basic_sched_config = { |
|
'_target_': 'nemo.core.config.CosineAnnealingParams', |
|
'params': {'min_lr': 0.1}, |
|
'max_steps': self.MAX_STEPS, |
|
} |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, basic_sched_config) |
|
assert isinstance(scheduler_setup['scheduler'], optim.lr_scheduler.CosineAnnealing) |
|
|
|
dict_config = omegaconf.OmegaConf.create(basic_sched_config) |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, dict_config) |
|
assert isinstance(scheduler_setup['scheduler'], optim.lr_scheduler.CosineAnnealing) |
|
|
|
@pytest.mark.unit |
|
def test_sched_config_parse_reduce_on_plateau(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
reduce_on_plateau_parameters = { |
|
'mode': 'min', |
|
'factor': 0.5, |
|
'patience': 1, |
|
'threshold': 1e-4, |
|
'threshold_mode': 'rel', |
|
'min_lr': 1e-6, |
|
'eps': 1e-7, |
|
'verbose': True, |
|
'cooldown': 1, |
|
} |
|
basic_sched_config = { |
|
'name': 'ReduceLROnPlateau', |
|
'monitor': 'val_loss', |
|
'reduce_on_plateau': True, |
|
'max_steps': self.MAX_STEPS, |
|
} |
|
basic_sched_config.update(reduce_on_plateau_parameters) |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, basic_sched_config) |
|
assert isinstance(scheduler_setup['scheduler'], torch.optim.lr_scheduler.ReduceLROnPlateau) |
|
for k, v in reduce_on_plateau_parameters.items(): |
|
if k == 'min_lr': |
|
k += 's' |
|
v = [v] |
|
found_v = getattr(scheduler_setup['scheduler'], k) |
|
assert ( |
|
found_v == v |
|
), f"Wrong value `{repr(found_v)}` for `ReduceLROnPlateau` parameter `{k}`. Expected `{repr(v)}`." |
|
dict_config = omegaconf.OmegaConf.create(basic_sched_config) |
|
scheduler_setup = optim.lr_scheduler.prepare_lr_scheduler(opt, dict_config) |
|
assert isinstance(scheduler_setup['scheduler'], torch.optim.lr_scheduler.ReduceLROnPlateau) |
|
for k, v in reduce_on_plateau_parameters.items(): |
|
if k == 'min_lr': |
|
k += 's' |
|
v = [v] |
|
found_v = getattr(scheduler_setup['scheduler'], k) |
|
assert ( |
|
found_v == v |
|
), f"Wrong value `{repr(found_v)}` for `ReduceLROnPlateau` parameter `{k}`. Expected `{repr(v)}`." |
|
|
|
@pytest.mark.unit |
|
def test_WarmupPolicy(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupPolicy(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupPolicy(opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 4: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_WarmupHoldPolicy(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupHoldPolicy(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupHoldPolicy(opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 4: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupHoldPolicy( |
|
opt, warmup_steps=5, hold_steps=3, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 4: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_WarmupAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupAnnealing(opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.WarmupHoldPolicy( |
|
opt, warmup_steps=5, hold_steps=3, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 4: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_SquareAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.SquareAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.SquareAnnealing(opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_SquareRootAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.SquareRootAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.SquareRootAnnealing( |
|
opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_CosineAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.CosineAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.CosineAnnealing(opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.CosineAnnealing( |
|
opt, warmup_steps=3, constant_steps=2, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 3: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR + 1e-5 |
|
elif i > 3 and i <= 8: |
|
assert policy.get_last_lr()[0] == policy._get_lr(i)[0] |
|
else: |
|
assert policy.get_last_lr()[0] == self.MIN_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
@pytest.mark.unit |
|
def test_NoamAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt1 = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
opt2 = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy1 = optim.lr_scheduler.NoamAnnealing( |
|
opt1, d_model=self.D_MODEL, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
policy2 = optim.lr_scheduler.NoamAnnealing( |
|
opt2, d_model=self.D_MODEL, max_steps=self.MAX_STEPS * 2, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy1.get_last_lr()[0] |
|
|
|
assert initial_lr == self.D_MODEL ** (-0.5) * self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS * 2): |
|
assert self.MIN_LR < policy1.get_last_lr()[0] <= self.INITIAL_LR |
|
assert policy1.get_last_lr()[0] == policy2.get_last_lr()[0] |
|
opt1.step() |
|
opt2.step() |
|
policy1.step() |
|
policy2.step() |
|
|
|
|
|
policy1 = optim.lr_scheduler.NoamAnnealing( |
|
opt1, d_model=self.D_MODEL, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
policy2 = optim.lr_scheduler.NoamAnnealing( |
|
opt2, d_model=self.D_MODEL, warmup_steps=5, max_steps=self.MAX_STEPS * 2, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy1.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS * 2): |
|
if i <= 5: |
|
assert policy1.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert self.MIN_LR < policy1.get_last_lr()[0] < self.INITIAL_LR |
|
assert policy1.get_last_lr()[0] == policy2.get_last_lr()[0] |
|
|
|
opt1.step() |
|
opt2.step() |
|
policy1.step() |
|
policy2.step() |
|
|
|
@pytest.mark.unit |
|
def test_PolynomialDecayAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.PolynomialDecayAnnealing( |
|
opt, power=2, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.PolynomialDecayAnnealing( |
|
opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_PolynomialHoldDecayAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.PolynomialHoldDecayAnnealing( |
|
opt, power=2, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.PolynomialHoldDecayAnnealing( |
|
opt, power=2, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.PolynomialHoldDecayAnnealing( |
|
opt, warmup_steps=5, hold_steps=3, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR, power=2 |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 4: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
elif i <= 8: |
|
assert policy.get_last_lr()[0] == self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_InverseSquareRootAnnealing(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.InverseSquareRootAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
|
|
policy = optim.lr_scheduler.InverseSquareRootAnnealing( |
|
opt, warmup_steps=5, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR |
|
) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr < self.INITIAL_LR |
|
|
|
for i in range(self.MAX_STEPS): |
|
if i <= 5: |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
else: |
|
assert policy.get_last_lr()[0] < self.INITIAL_LR |
|
|
|
opt.step() |
|
policy.step() |
|
|
|
policy.step() |
|
final_lr = policy.get_last_lr()[0] |
|
|
|
assert final_lr == self.MIN_LR |
|
|
|
@pytest.mark.unit |
|
def test_CosineAnnealing_with_noop_steps(self): |
|
model = TempModel() |
|
opt_cls = optim.get_optimizer('novograd') |
|
opt = opt_cls(model.parameters(), lr=self.INITIAL_LR) |
|
|
|
|
|
policy = optim.lr_scheduler.CosineAnnealing(opt, max_steps=self.MAX_STEPS, min_lr=self.MIN_LR) |
|
initial_lr = policy.get_last_lr()[0] |
|
|
|
assert initial_lr == self.INITIAL_LR |
|
|
|
update_steps = 0 |
|
for i in range(self.MAX_STEPS): |
|
assert policy.get_last_lr()[0] <= self.INITIAL_LR |
|
opt.step() |
|
policy.step() |
|
|
|
|
|
if i % 2 == 0: |
|
policy.last_epoch -= 1 |
|
else: |
|
update_steps += 1 |
|
|
|
policy.step() |
|
update_steps += 1 |
|
|
|
assert update_steps < self.MAX_STEPS |
|
|
|
final_lr = policy.get_last_lr()[0] |
|
assert final_lr > self.MIN_LR |
|
|
|
|
|
true_end_lr = policy._get_lr(step=update_steps)[0] |
|
assert final_lr == true_end_lr |
|
|
|
@pytest.mark.unit |
|
@pytest.mark.run_only_on('CPU') |
|
def test_max_step_computation(self): |
|
def train( |
|
max_epochs, accumulate_grad_batches, limit_train_batches, devices, batch_size, dataset_len, drop_last |
|
): |
|
trainer = pl.Trainer( |
|
max_epochs=max_epochs, |
|
strategy="ddp_spawn", |
|
accelerator="cpu", |
|
devices=devices, |
|
accumulate_grad_batches=accumulate_grad_batches, |
|
limit_train_batches=limit_train_batches, |
|
enable_checkpointing=False, |
|
enable_progress_bar=False, |
|
) |
|
max_steps = optim.lr_scheduler.compute_max_steps( |
|
max_epochs, accumulate_grad_batches, limit_train_batches, devices, dataset_len, batch_size, drop_last, |
|
) |
|
model = ExampleModel(batch_size, dataset_len, drop_last, max_steps) |
|
trainer.callbacks.append(Callback()) |
|
trainer.fit(model) |
|
|
|
|
|
train( |
|
31, |
|
accumulate_grad_batches=1, |
|
limit_train_batches=1.0, |
|
devices=9, |
|
batch_size=60, |
|
dataset_len=1613, |
|
drop_last=True, |
|
) |
|
train( |
|
5, |
|
accumulate_grad_batches=1, |
|
limit_train_batches=0.5, |
|
devices=4, |
|
batch_size=97, |
|
dataset_len=498, |
|
drop_last=False, |
|
) |
|
train( |
|
5, |
|
accumulate_grad_batches=8, |
|
limit_train_batches=0.5, |
|
devices=4, |
|
batch_size=54, |
|
dataset_len=629, |
|
drop_last=True, |
|
) |
|
train( |
|
5, |
|
accumulate_grad_batches=1, |
|
limit_train_batches=0.5, |
|
devices=1, |
|
batch_size=68, |
|
dataset_len=488, |
|
drop_last=False, |
|
) |
|
for _ in range(5): |
|
drop_last = bool(random.randint(0, 1)) |
|
accumulate_grad_batches = random.randint(1, 10) |
|
|
|
limit_train_batches_int = random.randint(1, 10) |
|
limit_train_batches_float = random.uniform(0.5, 1) |
|
limit_train_batches = random.choice([limit_train_batches_int, limit_train_batches_float]) |
|
max_epochs = random.randint(4, 20) |
|
devices = random.randint(1, 5) |
|
dataset_len = random.randint(20, devices * 500) |
|
batch_size = random.randint(math.ceil(5.0 / devices), min(dataset_len // devices, 128)) |
|
train( |
|
max_epochs, accumulate_grad_batches, limit_train_batches, devices, batch_size, dataset_len, drop_last, |
|
) |
|
|
|
@pytest.mark.unit |
|
@pytest.mark.run_only_on('CPU') |
|
def test_max_step_computation_with_sched_no_ops(self): |
|
def train( |
|
max_steps, accumulate_grad_batches, limit_train_batches, devices, batch_size, dataset_len, drop_last |
|
): |
|
trainer = pl.Trainer( |
|
max_steps=max_steps, |
|
strategy="ddp_spawn", |
|
accelerator="cpu", |
|
devices=devices, |
|
accumulate_grad_batches=accumulate_grad_batches, |
|
limit_train_batches=limit_train_batches, |
|
enable_checkpointing=False, |
|
enable_progress_bar=False, |
|
) |
|
model = ExampleModel(batch_size, dataset_len, drop_last, max_steps) |
|
trainer.callbacks.append(SchedulerNoOpCallback()) |
|
trainer.fit(model) |
|
|
|
|
|
train( |
|
max_steps=20, |
|
accumulate_grad_batches=1, |
|
limit_train_batches=1.0, |
|
devices=4, |
|
batch_size=60, |
|
dataset_len=2000, |
|
drop_last=True, |
|
) |
|
|