aiface's picture
Upload 11 files
907b7f3
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm
import pdb
"""Implements Temporal Convolutional Network (TCN)
__https://arxiv.org/pdf/1803.01271.pdf
"""
# Casual Conv1D
class Chomp1d(nn.Module):
def __init__(self, chomp_size, symm_chomp):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
self.symm_chomp = symm_chomp
if self.symm_chomp:
assert self.chomp_size % 2 == 0, "If symmetric chomp, chomp size needs to be even"
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
if self.chomp_size == 0:
return x
if self.symm_chomp:
return x[:, :, self.chomp_size//2:-self.chomp_size//2].contiguous()
else:
return x[:, :, :-self.chomp_size].contiguous()
# Conv1D + BatchNorm1D + Casual Conv1D + ReLU
class ConvBatchChompRelu(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, relu_type, dwpw=False):
super(ConvBatchChompRelu, self).__init__()
self.dwpw = dwpw
if dwpw:
self.conv = nn.Sequential(
# -- dw
nn.Conv1d( n_inputs, n_inputs, kernel_size, stride=stride, # Conv1D
padding=padding, dilation=dilation, groups=n_inputs, bias=False),
nn.BatchNorm1d(n_inputs), # BatchNorm1D
Chomp1d(padding, True), # Casual Conv1D
nn.PReLU(num_parameters=n_inputs) if relu_type == 'prelu' else nn.ReLU(inplace=True), # PReLU or ReLU
# -- pw
nn.Conv1d( n_inputs, n_outputs, 1, 1, 0, bias=False), # Conv1D
nn.BatchNorm1d(n_outputs), # BatchNorm1D
nn.PReLU(num_parameters=n_outputs) if relu_type == 'prelu' else nn.ReLU(inplace=True) # PReLU or ReLU
)
else:
self.conv = nn.Conv1d(n_inputs, n_outputs, kernel_size, # Conv1D
stride=stride, padding=padding, dilation=dilation)
self.batchnorm = nn.BatchNorm1d(n_outputs) # BatchNorm1D
self.chomp = Chomp1d(padding,True) # Casual Conv1D
self.non_lin = nn.PReLU(num_parameters=n_outputs) if relu_type == 'prelu' else nn.ReLU() # PReLU or ReLU
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
if self.dwpw:
return self.conv(x)
else:
out = self.conv( x )
out = self.batchnorm( out )
out = self.chomp( out )
return self.non_lin( out )
# --------- MULTI-BRANCH VERSION ---------------
class MultibranchTemporalBlock(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_sizes, stride, dilation, padding, dropout=0.2,
relu_type = 'relu', dwpw=False):
super(MultibranchTemporalBlock, self).__init__()
self.kernel_sizes = kernel_sizes
self.num_kernels = len( kernel_sizes )
self.n_outputs_branch = n_outputs // self.num_kernels
assert n_outputs % self.num_kernels == 0, "Number of output channels needs to be divisible by number of kernels"
for k_idx,k in enumerate( self.kernel_sizes ):
cbcr = ConvBatchChompRelu( n_inputs, self.n_outputs_branch, k, stride, dilation, padding[k_idx], relu_type, dwpw=dwpw) # Conv1D + BatchNorm1D + Casual Conv1D + ReLU
setattr( self,'cbcr0_{}'.format(k_idx), cbcr ) # object 에 존재하는 속성의 값을 바꾸거나 새로운 속성을 생성하여 값을 부여함
self.dropout0 = nn.Dropout(dropout) # Dropout
for k_idx,k in enumerate( self.kernel_sizes ):
cbcr = ConvBatchChompRelu( n_outputs, self.n_outputs_branch, k, stride, dilation, padding[k_idx], relu_type, dwpw=dwpw) # Conv1D + BatchNorm1D + Casual Conv1D + ReLU
setattr( self,'cbcr1_{}'.format(k_idx), cbcr ) # object 에 존재하는 속성의 값을 바꾸거나 새로운 속성을 생성하여 값을 부여함
self.dropout1 = nn.Dropout(dropout) # Dropout
# downsample?
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if (n_inputs//self.num_kernels) != n_outputs else None # Conv1D or None
# final relu
if relu_type == 'relu':
self.relu_final = nn.ReLU() # ReLU
elif relu_type == 'prelu':
self.relu_final = nn.PReLU(num_parameters=n_outputs) # PReLU
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
# first multi-branch set of convolutions
outputs = []
for k_idx in range( self.num_kernels ):
branch_convs = getattr(self,'cbcr0_{}'.format(k_idx))
outputs.append( branch_convs(x) )
out0 = torch.cat(outputs, 1)
out0 = self.dropout0( out0 )
# second multi-branch set of convolutions
outputs = []
for k_idx in range( self.num_kernels ):
branch_convs = getattr(self,'cbcr1_{}'.format(k_idx))
outputs.append( branch_convs(out0) )
out1 = torch.cat(outputs, 1)
out1 = self.dropout1( out1 )
# downsample?
res = x if self.downsample is None else self.downsample(x)
return self.relu_final(out1 + res)
class MultibranchTemporalConvNet(nn.Module):
def __init__(self, num_inputs, num_channels, tcn_options, dropout=0.2, relu_type='relu', dwpw=False):
super(MultibranchTemporalConvNet, self).__init__()
self.ksizes = tcn_options['kernel_size']
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i
in_channels = num_inputs if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
padding = [ (s-1)*dilation_size for s in self.ksizes]
layers.append( MultibranchTemporalBlock( in_channels, out_channels, self.ksizes,
stride=1, dilation=dilation_size, padding = padding, dropout=dropout, relu_type = relu_type,
dwpw=dwpw) )
self.network = nn.Sequential(*layers) # 설정한 레이어 반환
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
return self.network(x)
# --------------------------------
# --------------- STANDARD VERSION (SINGLE BRANCH) ------------------------
class TemporalBlock(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2,
symm_chomp = False, no_padding = False, relu_type = 'relu', dwpw=False):
super(TemporalBlock, self).__init__()
self.no_padding = no_padding
if self.no_padding:
downsample_chomp_size = 2*padding-4
padding = 1 # hack-ish thing so that we can use 3 layers
if dwpw:
self.net = nn.Sequential(
# -- first conv set within block
# -- dw
nn.Conv1d( n_inputs, n_inputs, kernel_size, stride=stride, # Conv1D
padding=padding, dilation=dilation, groups=n_inputs, bias=False),
nn.BatchNorm1d(n_inputs), # BatchNorm1D
Chomp1d(padding, True), # Casual Conv1D
nn.PReLU(num_parameters=n_inputs) if relu_type == 'prelu' else nn.ReLU(inplace=True), # PReLU or ReLU
# -- pw
nn.Conv1d( n_inputs, n_outputs, 1, 1, 0, bias=False), # Conv1D (1,1)
nn.BatchNorm1d(n_outputs), # BatchNorm1D
nn.PReLU(num_parameters=n_outputs) if relu_type == 'prelu' else nn.ReLU(inplace=True), # PReLU or ReLU
nn.Dropout(dropout), # Dropout
# -- second conv set within block
# -- dw
nn.Conv1d( n_outputs, n_outputs, kernel_size, stride=stride, # Conv1D
padding=padding, dilation=dilation, groups=n_outputs, bias=False),
nn.BatchNorm1d(n_outputs), # BatchNorm1D
Chomp1d(padding, True), # Casual Conv1D
nn.PReLU(num_parameters=n_outputs) if relu_type == 'prelu' else nn.ReLU(inplace=True), # PReLU or ReLU
# -- pw
nn.Conv1d( n_outputs, n_outputs, 1, 1, 0, bias=False), # Conv1D
nn.BatchNorm1d(n_outputs), # BatchNorm1D
nn.PReLU(num_parameters=n_outputs) if relu_type == 'prelu' else nn.ReLU(inplace=True), # PReLU or ReLU
nn.Dropout(dropout), # Dropout
)
else:
self.conv1 = nn.Conv1d(n_inputs, n_outputs, kernel_size, # Conv1D
stride=stride, padding=padding, dilation=dilation)
self.batchnorm1 = nn.BatchNorm1d(n_outputs) # BatchNorm1D
self.chomp1 = Chomp1d(padding,symm_chomp) if not self.no_padding else None # Casual Conv1D or None
if relu_type == 'relu':
self.relu1 = nn.ReLU() # ReLU
elif relu_type == 'prelu':
self.relu1 = nn.PReLU(num_parameters=n_outputs) # PReLU
self.dropout1 = nn.Dropout(dropout) # Dropout
self.conv2 = nn.Conv1d(n_outputs, n_outputs, kernel_size, # Conv1D
stride=stride, padding=padding, dilation=dilation)
self.batchnorm2 = nn.BatchNorm1d(n_outputs) # BatchNorm1D
self.chomp2 = Chomp1d(padding,symm_chomp) if not self.no_padding else None # Casual Conv1D or None
if relu_type == 'relu':
self.relu2 = nn.ReLU() # ReLU
elif relu_type == 'prelu':
self.relu2 = nn.PReLU(num_parameters=n_outputs) # PReLU
self.dropout2 = nn.Dropout(dropout) # Dropout
if self.no_padding:
self.net = nn.Sequential(self.conv1, self.batchnorm1, self.relu1, self.dropout1,
self.conv2, self.batchnorm2, self.relu2, self.dropout2)
else:
self.net = nn.Sequential(self.conv1, self.batchnorm1, self.chomp1, self.relu1, self.dropout1,
self.conv2, self.batchnorm2, self.chomp2, self.relu2, self.dropout2)
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None # Conv1D or None
if self.no_padding:
self.downsample_chomp = Chomp1d(downsample_chomp_size,True) # Casual Conv1D
if relu_type == 'relu':
self.relu = nn.ReLU() # ReLU
elif relu_type == 'prelu':
self.relu = nn.PReLU(num_parameters=n_outputs) # PReLU
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
out = self.net(x)
if self.no_padding:
x = self.downsample_chomp(x)
res = x if self.downsample is None else self.downsample(x)
return self.relu(out + res)
# TCN 모델
class TemporalConvNet(nn.Module):
def __init__(self, num_inputs, num_channels, tcn_options, dropout=0.2, relu_type='relu', dwpw=False):
super(TemporalConvNet, self).__init__()
self.ksize = tcn_options['kernel_size'][0] if isinstance(tcn_options['kernel_size'], list) else tcn_options['kernel_size']
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i
in_channels = num_inputs if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
layers.append( TemporalBlock(in_channels, out_channels, self.ksize, stride=1, dilation=dilation_size,
padding=(self.ksize-1) * dilation_size, dropout=dropout, symm_chomp = True,
no_padding = False, relu_type=relu_type, dwpw=dwpw) )
self.network = nn.Sequential(*layers) # 설정한 레이어 반환
# 모델이 학습데이터를 입력받아서 forward propagation 진행
def forward(self, x):
return self.network(x)
# --------------------------------