|
import os |
|
from tqdm import tqdm |
|
import torch |
|
from torch.utils.data import DataLoader |
|
from logger import Logger, Visualizer |
|
import numpy as np |
|
import imageio |
|
from sync_batchnorm import DataParallelWithCallback |
|
|
|
|
|
def reconstruction(config, generator, kp_detector, checkpoint, log_dir, dataset): |
|
png_dir = os.path.join(log_dir, 'reconstruction/png') |
|
log_dir = os.path.join(log_dir, 'reconstruction') |
|
|
|
if checkpoint is not None: |
|
Logger.load_cpk(checkpoint, generator=generator, kp_detector=kp_detector) |
|
else: |
|
raise AttributeError("Checkpoint should be specified for mode='reconstruction'.") |
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=1) |
|
|
|
if not os.path.exists(log_dir): |
|
os.makedirs(log_dir) |
|
|
|
if not os.path.exists(png_dir): |
|
os.makedirs(png_dir) |
|
|
|
loss_list = [] |
|
if torch.cuda.is_available(): |
|
generator = DataParallelWithCallback(generator) |
|
kp_detector = DataParallelWithCallback(kp_detector) |
|
|
|
generator.eval() |
|
kp_detector.eval() |
|
|
|
for it, x in tqdm(enumerate(dataloader)): |
|
if config['reconstruction_params']['num_videos'] is not None: |
|
if it > config['reconstruction_params']['num_videos']: |
|
break |
|
with torch.no_grad(): |
|
predictions = [] |
|
visualizations = [] |
|
if torch.cuda.is_available(): |
|
x['video'] = x['video'].cuda() |
|
kp_source = kp_detector(x['video'][:, :, 0]) |
|
for frame_idx in range(x['video'].shape[2]): |
|
source = x['video'][:, :, 0] |
|
driving = x['video'][:, :, frame_idx] |
|
kp_driving = kp_detector(driving) |
|
out = generator(source, kp_source=kp_source, kp_driving=kp_driving) |
|
out['kp_source'] = kp_source |
|
out['kp_driving'] = kp_driving |
|
del out['sparse_deformed'] |
|
predictions.append(np.transpose(out['prediction'].data.cpu().numpy(), [0, 2, 3, 1])[0]) |
|
|
|
visualization = Visualizer(**config['visualizer_params']).visualize(source=source, |
|
driving=driving, out=out) |
|
visualizations.append(visualization) |
|
|
|
loss_list.append(torch.abs(out['prediction'] - driving).mean().cpu().numpy()) |
|
|
|
predictions = np.concatenate(predictions, axis=1) |
|
imageio.imsave(os.path.join(png_dir, x['name'][0] + '.png'), (255 * predictions).astype(np.uint8)) |
|
|
|
image_name = x['name'][0] + config['reconstruction_params']['format'] |
|
imageio.mimsave(os.path.join(log_dir, image_name), visualizations) |
|
|
|
print("Reconstruction loss: %s" % np.mean(loss_list)) |
|
|