|
""" |
|
resnet for 1-d signal data, pytorch version |
|
|
|
Shenda Hong, Oct 2019 |
|
""" |
|
|
|
import numpy as np |
|
from collections import Counter |
|
from tqdm import tqdm |
|
from matplotlib import pyplot as plt |
|
from sklearn.metrics import classification_report |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import torch.nn.functional as F |
|
from torch.utils.data import Dataset, DataLoader |
|
|
|
class MyDataset(Dataset): |
|
def __init__(self, data, label): |
|
self.data = data |
|
self.label = label |
|
|
|
def __getitem__(self, index): |
|
return (torch.tensor(self.data[index], dtype=torch.float), torch.tensor(self.label[index], dtype=torch.long)) |
|
|
|
def __len__(self): |
|
return len(self.data) |
|
|
|
class MyConv1dPadSame(nn.Module): |
|
""" |
|
extend nn.Conv1d to support SAME padding |
|
""" |
|
def __init__(self, in_channels, out_channels, kernel_size, stride, groups=1): |
|
super(MyConv1dPadSame, self).__init__() |
|
self.in_channels = in_channels |
|
self.out_channels = out_channels |
|
self.kernel_size = kernel_size |
|
self.stride = stride |
|
self.groups = groups |
|
self.conv = torch.nn.Conv1d( |
|
in_channels=self.in_channels, |
|
out_channels=self.out_channels, |
|
kernel_size=self.kernel_size, |
|
stride=self.stride, |
|
groups=self.groups) |
|
|
|
def forward(self, x): |
|
|
|
net = x |
|
|
|
|
|
in_dim = net.shape[-1] |
|
out_dim = (in_dim + self.stride - 1) // self.stride |
|
p = max(0, (out_dim - 1) * self.stride + self.kernel_size - in_dim) |
|
pad_left = p // 2 |
|
pad_right = p - pad_left |
|
net = F.pad(net, (pad_left, pad_right), "constant", 0) |
|
|
|
net = self.conv(net) |
|
|
|
return net |
|
|
|
class MyMaxPool1dPadSame(nn.Module): |
|
""" |
|
extend nn.MaxPool1d to support SAME padding |
|
""" |
|
def __init__(self, kernel_size): |
|
super(MyMaxPool1dPadSame, self).__init__() |
|
self.kernel_size = kernel_size |
|
self.stride = 1 |
|
self.max_pool = torch.nn.MaxPool1d(kernel_size=self.kernel_size) |
|
|
|
def forward(self, x): |
|
|
|
net = x |
|
|
|
|
|
in_dim = net.shape[-1] |
|
out_dim = (in_dim + self.stride - 1) // self.stride |
|
p = max(0, (out_dim - 1) * self.stride + self.kernel_size - in_dim) |
|
pad_left = p // 2 |
|
pad_right = p - pad_left |
|
net = F.pad(net, (pad_left, pad_right), "constant", 0) |
|
|
|
net = self.max_pool(net) |
|
|
|
return net |
|
|
|
class BasicBlock(nn.Module): |
|
""" |
|
ResNet Basic Block |
|
""" |
|
def __init__(self, in_channels, out_channels, kernel_size, stride, groups, downsample, use_bn, use_do, is_first_block=False): |
|
super(BasicBlock, self).__init__() |
|
|
|
self.in_channels = in_channels |
|
self.kernel_size = kernel_size |
|
self.out_channels = out_channels |
|
self.stride = stride |
|
self.groups = groups |
|
self.downsample = downsample |
|
if self.downsample: |
|
self.stride = stride |
|
else: |
|
self.stride = 1 |
|
self.is_first_block = is_first_block |
|
self.use_bn = use_bn |
|
self.use_do = use_do |
|
|
|
|
|
self.bn1 = nn.BatchNorm1d(in_channels) |
|
self.relu1 = nn.ReLU() |
|
self.do1 = nn.Dropout(p=0.5) |
|
self.conv1 = MyConv1dPadSame( |
|
in_channels=in_channels, |
|
out_channels=out_channels, |
|
kernel_size=kernel_size, |
|
stride=self.stride, |
|
groups=self.groups) |
|
|
|
|
|
self.bn2 = nn.BatchNorm1d(out_channels) |
|
self.relu2 = nn.ReLU() |
|
self.do2 = nn.Dropout(p=0.5) |
|
self.conv2 = MyConv1dPadSame( |
|
in_channels=out_channels, |
|
out_channels=out_channels, |
|
kernel_size=kernel_size, |
|
stride=1, |
|
groups=self.groups) |
|
|
|
self.max_pool = MyMaxPool1dPadSame(kernel_size=self.stride) |
|
|
|
def forward(self, x): |
|
|
|
identity = x |
|
|
|
|
|
out = x |
|
if not self.is_first_block: |
|
if self.use_bn: |
|
out = self.bn1(out) |
|
out = self.relu1(out) |
|
if self.use_do: |
|
out = self.do1(out) |
|
out = self.conv1(out) |
|
|
|
|
|
if self.use_bn: |
|
out = self.bn2(out) |
|
out = self.relu2(out) |
|
if self.use_do: |
|
out = self.do2(out) |
|
out = self.conv2(out) |
|
|
|
|
|
if self.downsample: |
|
identity = self.max_pool(identity) |
|
|
|
|
|
if self.out_channels != self.in_channels: |
|
identity = identity.transpose(-1,-2) |
|
ch1 = (self.out_channels-self.in_channels)//2 |
|
ch2 = self.out_channels-self.in_channels-ch1 |
|
identity = F.pad(identity, (ch1, ch2), "constant", 0) |
|
identity = identity.transpose(-1,-2) |
|
|
|
|
|
out += identity |
|
|
|
return out |
|
|
|
class ResNet1D(nn.Module): |
|
""" |
|
|
|
Input: |
|
X: (n_samples, n_channel, n_length) |
|
Y: (n_samples) |
|
|
|
Output: |
|
out: (n_samples) |
|
|
|
Pararmetes: |
|
in_channels: dim of input, the same as n_channel |
|
base_filters: number of filters in the first several Conv layer, it will double at every 4 layers |
|
kernel_size: width of kernel |
|
stride: stride of kernel moving |
|
groups: set larget to 1 as ResNeXt |
|
n_block: number of blocks |
|
n_classes: number of classes |
|
|
|
""" |
|
|
|
def __init__(self, in_channels, base_filters, kernel_size, stride, groups, n_block, n_classes, downsample_gap=2, increasefilter_gap=4, use_bn=True, use_do=True, verbose=False): |
|
super(ResNet1D, self).__init__() |
|
|
|
self.verbose = verbose |
|
self.n_block = n_block |
|
self.kernel_size = kernel_size |
|
self.stride = stride |
|
self.groups = groups |
|
self.use_bn = use_bn |
|
self.use_do = use_do |
|
|
|
self.downsample_gap = downsample_gap |
|
self.increasefilter_gap = increasefilter_gap |
|
|
|
|
|
self.first_block_conv = MyConv1dPadSame(in_channels=in_channels, out_channels=base_filters, kernel_size=self.kernel_size, stride=1) |
|
self.first_block_bn = nn.BatchNorm1d(base_filters) |
|
self.first_block_relu = nn.ReLU() |
|
out_channels = base_filters |
|
|
|
|
|
self.basicblock_list = nn.ModuleList() |
|
for i_block in range(self.n_block): |
|
|
|
if i_block == 0: |
|
is_first_block = True |
|
else: |
|
is_first_block = False |
|
|
|
if i_block % self.downsample_gap == 1: |
|
downsample = True |
|
else: |
|
downsample = False |
|
|
|
if is_first_block: |
|
in_channels = base_filters |
|
out_channels = in_channels |
|
else: |
|
|
|
in_channels = int(base_filters*2**((i_block-1)//self.increasefilter_gap)) |
|
if (i_block % self.increasefilter_gap == 0) and (i_block != 0): |
|
out_channels = in_channels * 2 |
|
else: |
|
out_channels = in_channels |
|
|
|
tmp_block = BasicBlock( |
|
in_channels=in_channels, |
|
out_channels=out_channels, |
|
kernel_size=self.kernel_size, |
|
stride = self.stride, |
|
groups = self.groups, |
|
downsample=downsample, |
|
use_bn = self.use_bn, |
|
use_do = self.use_do, |
|
is_first_block=is_first_block) |
|
self.basicblock_list.append(tmp_block) |
|
|
|
|
|
self.final_bn = nn.BatchNorm1d(out_channels) |
|
self.final_relu = nn.ReLU(inplace=True) |
|
self.do = nn.Dropout(p=0.3) |
|
self.dense = nn.Linear(out_channels, n_classes) |
|
|
|
|
|
def forward(self, x): |
|
|
|
out = x |
|
|
|
|
|
if self.verbose: |
|
print('input shape', out.shape) |
|
out = self.first_block_conv(out) |
|
if self.verbose: |
|
print('after first conv', out.shape) |
|
if self.use_bn: |
|
out = self.first_block_bn(out) |
|
out = self.first_block_relu(out) |
|
|
|
|
|
for i_block in range(self.n_block): |
|
net = self.basicblock_list[i_block] |
|
if self.verbose: |
|
print('i_block: {0}, in_channels: {1}, out_channels: {2}, downsample: {3}'.format(i_block, net.in_channels, net.out_channels, net.downsample)) |
|
out = net(out) |
|
if self.verbose: |
|
print(out.shape) |
|
|
|
|
|
if self.use_bn: |
|
out = self.final_bn(out) |
|
out = self.final_relu(out) |
|
out = out.mean(-1) |
|
if self.verbose: |
|
print('final pooling', out.shape) |
|
|
|
out = self.dense(out) |
|
if self.verbose: |
|
print('dense', out.shape) |
|
|
|
if self.verbose: |
|
print('softmax', out.shape) |
|
|
|
return out |
|
|