In [None]:
#Source: https://medium.com/dataseries/convolutional-autoencoder-in-pytorch-on-mnist-dataset-d65145c132ac

In [46]:
import matplotlib.pyplot as plt # plotting library
from sklearn.model_selection import train_test_split
import numpy as np # this module is useful to work with numerical arrays
import pandas as pd 
import random 
import os
import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader,random_split
from torch import nn
import torch.nn.functional as F
import torch.optim as optim

In [3]:
def find_candidate_images(images_path):
 """
 Finds all candidate images in the given folder and its sub-folders.

 Returns:
 images: a list of absolute paths to the discovered images.
 """
 images = []
 for root, dirs, files in os.walk(images_path):
 for name in files:
 file_path = os.path.abspath(os.path.join(root, name))
 if ((os.path.splitext(name)[1]).lower() in ['.jpg','.png','.jpeg']):
 images.append(file_path)
 return images

In [49]:
class MyDataset(torch.utils.data.Dataset):
 def __init__(self, img_list, augmentations):
 super(MyDataset, self).__init__()
 self.img_list = img_list
 self.augmentations = augmentations

 def __len__(self):
 return len(self.img_list)

 def __getitem__(self, idx):
 img = self.img_list[idx]
 return self.augmentations(img)

In [51]:
images = find_candidate_images('../SD_sample_f_m_pt2')

In [43]:
transform = transforms.Compose([
transforms.ToTensor(),
])

In [55]:
data = MyDataset(images, transform)
dataset_iterator = DataLoader(data, batch_size=1)

In [56]:
train_images, test_images = train_test_split(data, test_size=0.33, random_state=42)
print(len(train_images))
print(len(test_images))

TypeError: pic should be PIL Image or ndarray. Got 

In [16]:
m=len(train_images)

In [23]:
train_data, val_data = random_split(train_images, [int(m-m*0.2), int(m*0.2)])
test_dataset = test_images

In [24]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val_data, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size,shuffle=True)

In [25]:
class Encoder(nn.Module):
 
 def __init__(self, encoded_space_dim,fc2_input_dim):
 super().__init__()
 
 ### Convolutional section
 self.encoder_cnn = nn.Sequential(
 nn.Conv2d(1, 8, 3, stride=2, padding=1),
 nn.ReLU(True),
 nn.Conv2d(8, 16, 3, stride=2, padding=1),
 nn.BatchNorm2d(16),
 nn.ReLU(True),
 nn.Conv2d(16, 32, 3, stride=2, padding=0),
 nn.ReLU(True)
 )
 
 ### Flatten layer
 self.flatten = nn.Flatten(start_dim=1)
### Linear section
 self.encoder_lin = nn.Sequential(
 nn.Linear(3 * 3 * 32, 128),
 nn.ReLU(True),
 nn.Linear(128, encoded_space_dim)
 )
 
 def forward(self, x):
 x = self.encoder_cnn(x)
 x = self.flatten(x)
 x = self.encoder_lin(x)
 return x
class Decoder(nn.Module):
 
 def __init__(self, encoded_space_dim,fc2_input_dim):
 super().__init__()
 self.decoder_lin = nn.Sequential(
 nn.Linear(encoded_space_dim, 128),
 nn.ReLU(True),
 nn.Linear(128, 3 * 3 * 32),
 nn.ReLU(True)
 )

 self.unflatten = nn.Unflatten(dim=1, 
 unflattened_size=(32, 3, 3))

 self.decoder_conv = nn.Sequential(
 nn.ConvTranspose2d(32, 16, 3, 
 stride=2, output_padding=0),
 nn.BatchNorm2d(16),
 nn.ReLU(True),
 nn.ConvTranspose2d(16, 8, 3, stride=2, 
 padding=1, output_padding=1),
 nn.BatchNorm2d(8),
 nn.ReLU(True),
 nn.ConvTranspose2d(8, 1, 3, stride=2, 
 padding=1, output_padding=1)
 )
 
 def forward(self, x):
 x = self.decoder_lin(x)
 x = self.unflatten(x)
 x = self.decoder_conv(x)
 x = torch.sigmoid(x)
 return x

In [26]:
### Define the loss function
loss_fn = torch.nn.MSELoss()

### Define an optimizer (both for the encoder and the decoder!)
lr= 0.001

### Set the random seed for reproducible results
torch.manual_seed(0)

### Initialize the two networks
d = 4

#model = Autoencoder(encoded_space_dim=encoded_space_dim)
encoder = Encoder(encoded_space_dim=d,fc2_input_dim=128)
decoder = Decoder(encoded_space_dim=d,fc2_input_dim=128)
params_to_optimize = [
 {'params': encoder.parameters()},
 {'params': decoder.parameters()}
]

optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-05)

# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
encoder.to(device)
decoder.to(device)

Selected device: cuda


Decoder(
 (decoder_lin): Sequential(
 (0): Linear(in_features=4, out_features=128, bias=True)
 (1): ReLU(inplace=True)
 (2): Linear(in_features=128, out_features=288, bias=True)
 (3): ReLU(inplace=True)
 )
 (unflatten): Unflatten(dim=1, unflattened_size=(32, 3, 3))
 (decoder_conv): Sequential(
 (0): ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2))
 (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
 (2): ReLU(inplace=True)
 (3): ConvTranspose2d(16, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
 (4): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
 (5): ReLU(inplace=True)
 (6): ConvTranspose2d(8, 1, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
 )
)

In [33]:
### Training function
def train_epoch(encoder, decoder, device, dataloader, loss_fn, optimizer):
 # Set train mode for both the encoder and the decoder
 encoder.train()
 decoder.train()
 train_loss = []
 # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
 for image_batch, _ in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple)
 # Move tensor to the proper device
 image_batch = image_batch.to(device)
 # Encode data
 encoded_data = encoder(image_batch)
 # Decode data
 decoded_data = decoder(encoded_data)
 # Evaluate loss
 loss = loss_fn(decoded_data, image_batch)
 # Backward pass
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 # Print batch loss
 print('\t partial train loss (single batch): %f' % (loss.data))
 train_loss.append(loss.detach().cpu().numpy())

 return np.mean(train_loss)

In [28]:
### Testing function
def test_epoch(encoder, decoder, device, dataloader, loss_fn):
 # Set evaluation mode for encoder and decoder
 encoder.eval()
 decoder.eval()
 with torch.no_grad(): # No need to track the gradients
 # Define the lists to store the outputs for each batch
 conc_out = []
 conc_label = []
 for image_batch, _ in dataloader:
 # Move tensor to the proper device
 image_batch = image_batch.to(device)
 # Encode data
 encoded_data = encoder(image_batch)
 # Decode data
 decoded_data = decoder(encoded_data)
 # Append the network output and the original image to the lists
 conc_out.append(decoded_data.cpu())
 conc_label.append(image_batch.cpu())
 # Create a single tensor with all the values in the lists
 conc_out = torch.cat(conc_out)
 conc_label = torch.cat(conc_label) 
 # Evaluate global loss
 val_loss = loss_fn(conc_out, conc_label)
 return val_loss.data

In [29]:
def plot_ae_outputs(encoder,decoder,n=10):
 plt.figure(figsize=(16,4.5))
 targets = test_dataset.targets.numpy()
 t_idx = {i:np.where(targets==i)[0][0] for i in range(n)}
 for i in range(n):
 ax = plt.subplot(2,n,i+1)
 img = test_dataset[t_idx[i]][0].unsqueeze(0).to(device)
 encoder.eval()
 decoder.eval()
 with torch.no_grad():
 rec_img = decoder(encoder(img))
 plt.imshow(img.cpu().squeeze().numpy(), cmap='gist_gray')
 ax.get_xaxis().set_visible(False)
 ax.get_yaxis().set_visible(False) 
 if i == n//2:
 ax.set_title('Original images')
 ax = plt.subplot(2, n, i + 1 + n)
 plt.imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray') 
 ax.get_xaxis().set_visible(False)
 ax.get_yaxis().set_visible(False) 
 if i == n//2:
 ax.set_title('Reconstructed images')
 plt.show() 

In [34]:
num_epochs = 30
diz_loss = {'train_loss':[],'val_loss':[]}
for epoch in range(num_epochs):
 train_loss =train_epoch(encoder,decoder,device,train_loader,loss_fn,optim)
 val_loss = test_epoch(encoder,decoder,device,test_loader,loss_fn)
 print('\n EPOCH {}/{} \t train loss {} \t val loss {}'.format(epoch + 1, num_epochs,train_loss,val_loss))
 diz_loss['train_loss'].append(train_loss)
 diz_loss['val_loss'].append(val_loss)
 plot_ae_outputs(encoder,decoder,n=10)

ValueError: too many values to unpack (expected 2)