TestingViscosity / functions.py
biplab2008's picture
single file added
dc7407d
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from tqdm import tqdm
## ------------------- label conversion tools ------------------ ##
def labels2cat(label_encoder, list):
return label_encoder.transform(list)
def labels2onehot(OneHotEncoder, label_encoder, list):
return OneHotEncoder.transform(label_encoder.transform(list).reshape(-1, 1)).toarray()
def onehot2labels(label_encoder, y_onehot):
return label_encoder.inverse_transform(np.where(y_onehot == 1)[1]).tolist()
def cat2labels(label_encoder, y_cat):
return label_encoder.inverse_transform(y_cat).tolist()
## ---------------------- Dataloaders ---------------------- ##
# for 3DCNN
class Dataset_3DCNN(data.Dataset):
"Characterizes a dataset for PyTorch"
def __init__(self, data_path, folders, labels, frames, transform=None):
"Initialization"
self.data_path = data_path
self.labels = labels
self.folders = folders
self.transform = transform
self.frames = frames
def __len__(self):
"Denotes the total number of samples"
return len(self.folders)
def read_images(self, path, selected_folder, use_transform):
X = []
for i in self.frames:
image = Image.open(os.path.join(path, selected_folder, 'frame_{:01d}.jpg'.format(i))).convert('L')
if use_transform is not None:
image = use_transform(image)
X.append(image.squeeze_(0))
X = torch.stack(X, dim=0)
return X
def __getitem__(self, index):
"Generates one sample of data"
# Select sample
folder = self.folders[index]
# Load data
X = self.read_images(self.data_path, folder, self.transform).unsqueeze_(0) # (input) spatial images
y = torch.LongTensor([self.labels[index]]) # (labels) LongTensor are for int64 instead of FloatTensor
# print(X.shape)
return X, y
# for CRNN
class Dataset_CRNN(data.Dataset):
"Characterizes a dataset for PyTorch"
def __init__(self, data_path, folders, labels, frames, transform=None):
"Initialization"
self.data_path = data_path
self.labels = labels
self.folders = folders
self.transform = transform
self.frames = frames
def __len__(self):
"Denotes the total number of samples"
return len(self.folders)
def read_images(self, path, selected_folder, use_transform):
X = []
for i in self.frames:
image = Image.open(os.path.join(path, selected_folder, 'frame{:01d}.jpg'.format(i)))
if use_transform is not None:
image = use_transform(image)
X.append(image)
X = torch.stack(X, dim=0)
return X
def __getitem__(self, index):
"Generates one sample of data"
# Select sample
folder = self.folders[index]
# Load data
X = self.read_images(self.data_path, folder, self.transform) # (input) spatial images
y = torch.LongTensor([self.labels[index]]) # (labels) LongTensor are for int64 instead of FloatTensor
# print(X.shape)
return X, y
## ---------------------- end of Dataloaders ---------------------- ##
## -------------------- (reload) model prediction ---------------------- ##
def Conv3d_final_prediction(model, device, loader):
model.eval()
all_y_pred = []
with torch.no_grad():
for batch_idx, (X, y) in enumerate(tqdm(loader)):
# distribute data to device
X = X.to(device)
output = model(X)
y_pred = output.max(1, keepdim=True)[1] # location of max log-probability as prediction
all_y_pred.append(y_pred.data.squeeze().numpy().tolist())
return all_y_pred
def CRNN_final_prediction(model, device, loader):
cnn_encoder, rnn_decoder = model
cnn_encoder.eval()
rnn_decoder.eval()
all_y_pred = []
with torch.no_grad():
for batch_idx, (X, y) in enumerate(tqdm(loader)):
# distribute data to device
X = X.to(device)
output = rnn_decoder(cnn_encoder(X))
y_pred = output.max(1, keepdim=True)[1] # location of max log-probability as prediction
all_y_pred.extend(y_pred.cpu().data.squeeze().numpy().tolist())
return all_y_pred
## -------------------- end of model prediction ---------------------- ##
## ------------------------ 3D CNN module ---------------------- ##
def conv3D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv3D
outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int),
np.floor((img_size[2] + 2 * padding[2] - (kernel_size[2] - 1) - 1) / stride[2] + 1).astype(int))
return outshape
class CNN3D(nn.Module):
def __init__(self, t_dim=120, img_x=90, img_y=120, drop_p=0.2, fc_hidden1=256, fc_hidden2=128, num_classes=50):
super(CNN3D, self).__init__()
# set video dimension
self.t_dim = t_dim
self.img_x = img_x
self.img_y = img_y
# fully connected layer hidden nodes
self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
self.drop_p = drop_p
self.num_classes = num_classes
self.ch1, self.ch2 = 32, 48
self.k1, self.k2 = (5, 5, 5), (3, 3, 3) # 3d kernel size
self.s1, self.s2 = (2, 2, 2), (2, 2, 2) # 3d strides
self.pd1, self.pd2 = (0, 0, 0), (0, 0, 0) # 3d padding
# compute conv1 & conv2 output shape
self.conv1_outshape = conv3D_output_size((self.t_dim, self.img_x, self.img_y), self.pd1, self.k1, self.s1)
self.conv2_outshape = conv3D_output_size(self.conv1_outshape, self.pd2, self.k2, self.s2)
self.conv1 = nn.Conv3d(in_channels=1, out_channels=self.ch1, kernel_size=self.k1, stride=self.s1,
padding=self.pd1)
self.bn1 = nn.BatchNorm3d(self.ch1)
self.conv2 = nn.Conv3d(in_channels=self.ch1, out_channels=self.ch2, kernel_size=self.k2, stride=self.s2,
padding=self.pd2)
self.bn2 = nn.BatchNorm3d(self.ch2)
self.relu = nn.ReLU(inplace=True)
self.drop = nn.Dropout3d(self.drop_p)
self.pool = nn.MaxPool3d(2)
self.fc1 = nn.Linear(self.ch2 * self.conv2_outshape[0] * self.conv2_outshape[1] * self.conv2_outshape[2],
self.fc_hidden1) # fully connected hidden layer
self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
self.fc3 = nn.Linear(self.fc_hidden2, self.num_classes) # fully connected layer, output = multi-classes
def forward(self, x_3d):
# Conv 1
x = self.conv1(x_3d)
x = self.bn1(x)
x = self.relu(x)
x = self.drop(x)
# Conv 2
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.drop(x)
# FC 1 and 2
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.dropout(x, p=self.drop_p, training=self.training)
x = self.fc3(x)
return x
## --------------------- end of 3D CNN module ---------------- ##
## ------------------------ CRNN module ---------------------- ##
def conv2D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv2D
outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int))
return outshape
# 2D CNN encoder train from scratch (no transfer learning)
class EncoderCNN(nn.Module):
def __init__(self, img_x=90, img_y=120, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
super(EncoderCNN, self).__init__()
self.img_x = img_x
self.img_y = img_y
self.CNN_embed_dim = CNN_embed_dim
# CNN architechtures
self.ch1, self.ch2, self.ch3, self.ch4 = 32, 64, 128, 256
self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3) # 2d kernal size
self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2) # 2d strides
self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0) # 2d padding
# conv2D output shapes
self.conv1_outshape = conv2D_output_size((self.img_x, self.img_y), self.pd1, self.k1, self.s1) # Conv1 output shape
self.conv2_outshape = conv2D_output_size(self.conv1_outshape, self.pd2, self.k2, self.s2)
self.conv3_outshape = conv2D_output_size(self.conv2_outshape, self.pd3, self.k3, self.s3)
self.conv4_outshape = conv2D_output_size(self.conv3_outshape, self.pd4, self.k4, self.s4)
# fully connected layer hidden nodes
self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
self.drop_p = drop_p
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=self.ch1, kernel_size=self.k1, stride=self.s1, padding=self.pd1),
nn.BatchNorm2d(self.ch1, momentum=0.01),
nn.ReLU(inplace=True),
# nn.MaxPool2d(kernel_size=2),
)
self.conv2 = nn.Sequential(
nn.Conv2d(in_channels=self.ch1, out_channels=self.ch2, kernel_size=self.k2, stride=self.s2, padding=self.pd2),
nn.BatchNorm2d(self.ch2, momentum=0.01),
nn.ReLU(inplace=True),
# nn.MaxPool2d(kernel_size=2),
)
self.conv3 = nn.Sequential(
nn.Conv2d(in_channels=self.ch2, out_channels=self.ch3, kernel_size=self.k3, stride=self.s3, padding=self.pd3),
nn.BatchNorm2d(self.ch3, momentum=0.01),
nn.ReLU(inplace=True),
# nn.MaxPool2d(kernel_size=2),
)
self.conv4 = nn.Sequential(
nn.Conv2d(in_channels=self.ch3, out_channels=self.ch4, kernel_size=self.k4, stride=self.s4, padding=self.pd4),
nn.BatchNorm2d(self.ch4, momentum=0.01),
nn.ReLU(inplace=True),
# nn.MaxPool2d(kernel_size=2),
)
self.drop = nn.Dropout2d(self.drop_p)
self.pool = nn.MaxPool2d(2)
self.fc1 = nn.Linear(self.ch4 * self.conv4_outshape[0] * self.conv4_outshape[1], self.fc_hidden1) # fully connected layer, output k classes
self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
self.fc3 = nn.Linear(self.fc_hidden2, self.CNN_embed_dim) # output = CNN embedding latent variables
def forward(self, x_3d):
cnn_embed_seq = []
for t in range(x_3d.size(1)):
# CNNs
x = self.conv1(x_3d[:, t, :, :, :])
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = x.view(x.size(0), -1) # flatten the output of conv
# FC layers
x = F.relu(self.fc1(x))
# x = F.dropout(x, p=self.drop_p, training=self.training)
x = F.relu(self.fc2(x))
x = F.dropout(x, p=self.drop_p, training=self.training)
x = self.fc3(x)
cnn_embed_seq.append(x)
# swap time and sample dim such that (sample dim, time dim, CNN latent dim)
cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
# cnn_embed_seq: shape=(batch, time_step, input_size)
return cnn_embed_seq
# 2D CNN encoder using ResNet-152 pretrained
class ResCNNEncoder(nn.Module):
def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
"""Load the pretrained ResNet-152 and replace top fc layer."""
super(ResCNNEncoder, self).__init__()
self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
self.drop_p = drop_p
resnet = models.resnet152(pretrained=True)
modules = list(resnet.children())[:-1] # delete the last fc layer.
self.resnet = nn.Sequential(*modules)
self.fc1 = nn.Linear(resnet.fc.in_features, fc_hidden1)
self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
def forward(self, x_3d):
cnn_embed_seq = []
for t in range(x_3d.size(1)):
# ResNet CNNFcnn
with torch.no_grad():
x = self.resnet(x_3d[:, t, :, :, :]) # ResNet
x = x.view(x.size(0), -1) # flatten output of conv
# FC layers
x = self.bn1(self.fc1(x))
x = F.relu(x)
x = self.bn2(self.fc2(x))
x = F.relu(x)
x = F.dropout(x, p=self.drop_p, training=self.training)
x = self.fc3(x)
cnn_embed_seq.append(x)
# swap time and sample dim such that (sample dim, time dim, CNN latent dim)
cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
# cnn_embed_seq: shape=(batch, time_step, input_size)
return cnn_embed_seq
class DecoderRNN(nn.Module):
def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=50):
super(DecoderRNN, self).__init__()
self.RNN_input_size = CNN_embed_dim
self.h_RNN_layers = h_RNN_layers # RNN hidden layers
self.h_RNN = h_RNN # RNN hidden nodes
self.h_FC_dim = h_FC_dim
self.drop_p = drop_p
self.num_classes = num_classes
self.LSTM = nn.LSTM(
input_size=self.RNN_input_size,
hidden_size=self.h_RNN,
num_layers=h_RNN_layers,
batch_first=True, # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
)
self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)
def forward(self, x_RNN):
self.LSTM.flatten_parameters()
RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)
""" h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """
""" None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """
# FC layers
x = self.fc1(RNN_out[:, -1, :]) # choose RNN_out at the last time step
x = F.relu(x)
x = F.dropout(x, p=self.drop_p, training=self.training)
x = self.fc2(x)
return x
## ---------------------- end of CRNN module ---------------------- ##