ECON / apps /Normal.py
Yuliang's picture
Support TEXTure
487ee6d
import numpy as np
import pytorch_lightning as pl
import torch
from skimage.transform import resize
from lib.common.train_util import batch_mean
from lib.net import NormalNet
class Normal(pl.LightningModule):
def __init__(self, cfg):
super(Normal, self).__init__()
self.cfg = cfg
self.batch_size = self.cfg.batch_size
self.lr_F = self.cfg.lr_netF
self.lr_B = self.cfg.lr_netB
self.lr_D = self.cfg.lr_netD
self.overfit = cfg.overfit
self.F_losses = [item[0] for item in self.cfg.net.front_losses]
self.B_losses = [item[0] for item in self.cfg.net.back_losses]
self.ALL_losses = self.F_losses + self.B_losses
self.automatic_optimization = False
self.schedulers = []
self.netG = NormalNet(self.cfg)
self.in_nml = [item[0] for item in cfg.net.in_nml]
# Training related
def configure_optimizers(self):
optim_params_N_D = None
optimizer_N_D = None
scheduler_N_D = None
# set optimizer
optim_params_N_F = [{"params": self.netG.netF.parameters(), "lr": self.lr_F}]
optim_params_N_B = [{"params": self.netG.netB.parameters(), "lr": self.lr_B}]
optimizer_N_F = torch.optim.Adam(optim_params_N_F, lr=self.lr_F, betas=(0.5, 0.999))
optimizer_N_B = torch.optim.Adam(optim_params_N_B, lr=self.lr_B, betas=(0.5, 0.999))
scheduler_N_F = torch.optim.lr_scheduler.MultiStepLR(
optimizer_N_F, milestones=self.cfg.schedule, gamma=self.cfg.gamma
)
scheduler_N_B = torch.optim.lr_scheduler.MultiStepLR(
optimizer_N_B, milestones=self.cfg.schedule, gamma=self.cfg.gamma
)
if 'gan' in self.ALL_losses:
optim_params_N_D = [{"params": self.netG.netD.parameters(), "lr": self.lr_D}]
optimizer_N_D = torch.optim.Adam(optim_params_N_D, lr=self.lr_D, betas=(0.5, 0.999))
scheduler_N_D = torch.optim.lr_scheduler.MultiStepLR(
optimizer_N_D, milestones=self.cfg.schedule, gamma=self.cfg.gamma
)
self.schedulers = [scheduler_N_F, scheduler_N_B, scheduler_N_D]
optims = [optimizer_N_F, optimizer_N_B, optimizer_N_D]
else:
self.schedulers = [scheduler_N_F, scheduler_N_B]
optims = [optimizer_N_F, optimizer_N_B]
return optims, self.schedulers
def render_func(self, render_tensor, dataset, idx):
height = render_tensor["image"].shape[2]
result_list = []
for name in render_tensor.keys():
result_list.append(
resize(
((render_tensor[name].cpu().numpy()[0] + 1.0) / 2.0).transpose(1, 2, 0),
(height, height),
anti_aliasing=True,
)
)
self.logger.log_image(
key=f"Normal/{dataset}/{idx if not self.overfit else 1}",
images=[(np.concatenate(result_list, axis=1) * 255.0).astype(np.uint8)]
)
def training_step(self, batch, batch_idx):
self.netG.train()
# retrieve the data
in_tensor = {}
for name in self.in_nml:
in_tensor[name] = batch[name]
FB_tensor = {"normal_F": batch["normal_F"], "normal_B": batch["normal_B"]}
in_tensor.update(FB_tensor)
preds_F, preds_B = self.netG(in_tensor)
error_dict = self.netG.get_norm_error(preds_F, preds_B, FB_tensor)
if 'gan' in self.ALL_losses:
(opt_F, opt_B, opt_D) = self.optimizers()
opt_F.zero_grad()
self.manual_backward(error_dict["netF"])
opt_B.zero_grad()
self.manual_backward(error_dict["netB"], retain_graph=True)
opt_D.zero_grad()
self.manual_backward(error_dict["netD"])
opt_F.step()
opt_B.step()
opt_D.step()
else:
(opt_F, opt_B) = self.optimizers()
opt_F.zero_grad()
self.manual_backward(error_dict["netF"])
opt_B.zero_grad()
self.manual_backward(error_dict["netB"])
opt_F.step()
opt_B.step()
if batch_idx > 0 and batch_idx % int(
self.cfg.freq_show_train
) == 0 and self.cfg.devices == 1:
self.netG.eval()
with torch.no_grad():
nmlF, nmlB = self.netG(in_tensor)
in_tensor.update({"nmlF": nmlF, "nmlB": nmlB})
self.render_func(in_tensor, "train", self.global_step)
# metrics processing
metrics_log = {"loss": error_dict["netF"] + error_dict["netB"]}
if "gan" in self.ALL_losses:
metrics_log["loss"] += error_dict["netD"]
for key in error_dict.keys():
metrics_log["train/loss_" + key] = error_dict[key].item()
self.log_dict(
metrics_log, prog_bar=True, logger=True, on_step=True, on_epoch=False, sync_dist=True
)
return metrics_log
def training_epoch_end(self, outputs):
# metrics processing
metrics_log = {}
for key in outputs[0].keys():
if "/" in key:
[stage, loss_name] = key.split("/")
else:
stage = "train"
loss_name = key
metrics_log[f"{stage}/avg-{loss_name}"] = batch_mean(outputs, key)
self.log_dict(
metrics_log,
prog_bar=False,
logger=True,
on_step=False,
on_epoch=True,
rank_zero_only=True
)
def validation_step(self, batch, batch_idx):
self.netG.eval()
self.netG.training = False
# retrieve the data
in_tensor = {}
for name in self.in_nml:
in_tensor[name] = batch[name]
FB_tensor = {"normal_F": batch["normal_F"], "normal_B": batch["normal_B"]}
in_tensor.update(FB_tensor)
preds_F, preds_B = self.netG(in_tensor)
error_dict = self.netG.get_norm_error(preds_F, preds_B, FB_tensor)
if batch_idx % int(self.cfg.freq_show_train) == 0 and self.cfg.devices == 1:
with torch.no_grad():
nmlF, nmlB = self.netG(in_tensor)
in_tensor.update({"nmlF": nmlF, "nmlB": nmlB})
self.render_func(in_tensor, "val", batch_idx)
# metrics processing
metrics_log = {"val/loss": error_dict["netF"] + error_dict["netB"]}
if "gan" in self.ALL_losses:
metrics_log["val/loss"] += error_dict["netD"]
for key in error_dict.keys():
metrics_log["val/" + key] = error_dict[key].item()
return metrics_log
def validation_epoch_end(self, outputs):
# metrics processing
metrics_log = {}
for key in outputs[0].keys():
[stage, loss_name] = key.split("/")
metrics_log[f"{stage}/avg-{loss_name}"] = batch_mean(outputs, key)
self.log_dict(
metrics_log,
prog_bar=False,
logger=True,
on_step=False,
on_epoch=True,
rank_zero_only=True
)