import torch import torch.nn as nn import torch.optim as optim from import DataLoader, Dataset import torchaudio import torchvision.transforms as transforms import os import numpy as np from sklearn.metrics import confusion_matrix, classification_report import matplotlib.pyplot as plt import seaborn as sns import pandas as pd import torchvision.models as models import torch.nn.functional as F import time import librosa # from skopt import gp_minimize # from import Real, Categorical # from skopt.utils import use_named_args from torch.quantization import QuantStub, DeQuantStub # Check for CUDA availability device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def get_tfeb_pool_size_component(length): # print(length); c = []; index = 1; while index <= 6: if length >= 2: if index == 6: c.append(length); else: c.append(2); length = length // 2; else: c.append(1); index += 1; return c def get_tfeb_pool_sizes(conv2_ch, width): h = get_tfeb_pool_size_component(conv2_ch); w = get_tfeb_pool_size_component(width); # print(w); pool_size = []; for (h1, w1) in zip(h, w): pool_size.append((h1, w1)); return pool_size class ElephantCallerNet(nn.Module): def __init__(self, input_length, n_class, sr, ch_conf=None, quantize=False): super(ElephantCallerNet, self).__init__() self.input_length = input_length self.ch_config = ch_conf self.quantize = quantize stride1 = 2 stride2 = 2 channels = 8 k_size = (3, 3) n_frames = (sr / 1000) * 10 # No of frames per 10ms sfeb_pool_size = int(n_frames / (stride1 * stride2)) if self.ch_config is None: self.ch_config = [channels, channels * 8, channels * 4, channels * 8, channels * 8, channels * 16, channels * 16, channels * 32, channels * 32, channels * 64, channels * 64, n_class] fcn_no_of_inputs = self.ch_config[-1] self.conv1, self.bn1 = self.make_layers(1, self.ch_config[0], (1, 9), (1, stride1)) self.conv2, self.bn2 = self.make_layers(self.ch_config[0], self.ch_config[1], (1, 5), (1, stride2)) self.conv3, self.bn3 = self.make_layers(1, self.ch_config[2], k_size, padding=1) self.conv4, self.bn4 = self.make_layers(self.ch_config[2], self.ch_config[3], k_size, padding=1) self.conv5, self.bn5 = self.make_layers(self.ch_config[3], self.ch_config[4], k_size, padding=1) self.conv6, self.bn6 = self.make_layers(self.ch_config[4], self.ch_config[5], k_size, padding=1) self.conv7, self.bn7 = self.make_layers(self.ch_config[5], self.ch_config[6], k_size, padding=1) self.conv8, self.bn8 = self.make_layers(self.ch_config[6], self.ch_config[7], k_size, padding=1) self.conv9, self.bn9 = self.make_layers(self.ch_config[7], self.ch_config[8], k_size, padding=1) self.conv10, self.bn10 = self.make_layers(self.ch_config[8], self.ch_config[9], k_size, padding=1) self.conv11, self.bn11 = self.make_layers(self.ch_config[9], self.ch_config[10], k_size, padding=1) self.conv12, self.bn12 = self.make_layers(self.ch_config[10], self.ch_config[11], (1, 1)) self.fcn = nn.Linear(fcn_no_of_inputs, n_class) nn.init.kaiming_normal_(self.fcn.weight, nonlinearity='sigmoid') self.sfeb = nn.Sequential( self.conv1, self.bn1, nn.ReLU(), self.conv2, self.bn2, nn.ReLU(), nn.MaxPool2d(kernel_size=(1, sfeb_pool_size)) ) tfeb_modules = [] self.tfeb_width = int(((self.input_length / sr) * 1000) / 10) # 10ms frames of audio length in seconds tfeb_pool_sizes = get_tfeb_pool_sizes(self.ch_config[1], self.tfeb_width) p_index = 0 for i in [3, 4, 6, 8, 10]: tfeb_modules.extend([eval('self.conv{}'.format(i)), eval('{}'.format(i)), nn.ReLU()]) if i != 3: tfeb_modules.extend([eval('self.conv{}'.format(i + 1)), eval('{}'.format(i + 1)), nn.ReLU()]) h, w = tfeb_pool_sizes[p_index] if h > 1 or w > 1: tfeb_modules.append(nn.MaxPool2d(kernel_size=(h, w))) p_index += 1 tfeb_modules.append(nn.Dropout(0.2)) tfeb_modules.extend([self.conv12, self.bn12, nn.ReLU()]) h, w = tfeb_pool_sizes[-1] if h > 1 or w > 1: tfeb_modules.append(nn.AvgPool2d(kernel_size=(h, w))) tfeb_modules.extend([nn.Flatten(), self.fcn]) self.tfeb = nn.Sequential(*tfeb_modules) self.output = nn.Sequential( nn.Softmax(dim=1) ) if self.quantize: self.quant = QuantStub() self.dequant = DeQuantStub() def forward(self, x): x = x.unsqueeze(0).unsqueeze(0) if self.quantize: x = self.quant(x) x = self.sfeb(x) x = x.permute((2, 0, 1, 3)) x = self.tfeb(x) if self.quantize: x = self.dequant(x) y = self.output(x) y = F.softmax(y, dim=1) return y def make_layers(self, in_channels, out_channels, kernel_size, stride=(1, 1), padding=0, bias=False): conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias) nn.init.kaiming_normal_(conv.weight, nonlinearity='relu') bn = nn.BatchNorm2d(out_channels) return conv, bn class Residual_block(nn.Module): def __init__(self, nb_filts, first = False): super(Residual_block, self).__init__() self.first = first if not self.first: self.bn1 = nn.BatchNorm1d(num_features = nb_filts[0]) self.lrelu = nn.LeakyReLU() self.lrelu_keras = nn.LeakyReLU(negative_slope=0.3) self.conv1 = nn.Conv1d(in_channels = nb_filts[0], out_channels = nb_filts[1], kernel_size = 3, padding = 1, stride = 1) self.bn2 = nn.BatchNorm1d(num_features = nb_filts[1]) self.conv2 = nn.Conv1d(in_channels = nb_filts[1], out_channels = nb_filts[1], padding = 1, kernel_size = 3, stride = 1) if nb_filts[0] != nb_filts[1]: self.downsample = True self.conv_downsample = nn.Conv1d(in_channels = nb_filts[0], out_channels = nb_filts[1], padding = 0, kernel_size = 1, stride = 1) else: self.downsample = False = nn.MaxPool1d(3) def forward(self, x): identity = x if not self.first: out = self.bn1(x) out = self.lrelu_keras(out) else: out = x out = self.conv1(x) out = self.bn2(out) out = self.lrelu_keras(out) out = self.conv2(out) if self.downsample: identity = self.conv_downsample(identity) out += identity out = return out class RawNet(nn.Module): def __init__(self): super(RawNet, self).__init__() #self.negative_k = d_args['negative_k'] self.first_conv = nn.Conv1d(in_channels = 1, out_channels = 128, kernel_size = 3, padding = 0, stride = 3) self.first_bn = nn.BatchNorm1d(num_features = 128) self.lrelu = nn.LeakyReLU() self.lrelu_keras = nn.LeakyReLU(negative_slope = 0.3) self.block0 = self._make_layer(nb_blocks = 2, nb_filts = [128,128], first = True) self.block1 = self._make_layer(nb_blocks = 4, nb_filts = [128, 256]) self.bn_before_gru = nn.BatchNorm1d(num_features = 256) self.gru = nn.GRU(input_size = 256, hidden_size = 1024, num_layers = 1, batch_first = True) self.fc1_gru = nn.Linear(in_features = 1024, out_features = 1024) self.fc2_gru = nn.Linear(in_features =1024, out_features = 3, bias = True) def forward(self, x, y = 0, is_test=False): # print(x.shape) x = x.unsqueeze(1) # print(x.shape) x = self.first_conv(x) x = self.first_bn(x) x = self.lrelu_keras(x) x = self.block0(x) x = self.block1(x) x = self.bn_before_gru(x) x = self.lrelu_keras(x) x = x.permute(0, 2, 1)#(batch, filt, time) >> (batch, time, filt) x, _ = self.gru(x) x = x[:,-1,:] code = self.fc1_gru(x) if is_test: return code code_norm = code.norm(p=2,dim=1, keepdim=True) / 10. code = torch.div(code, code_norm) out = self.fc2_gru(code) return out ''' #for future updates, bc_loss, h_loss #h_loss norm = torch.norm(self.fc2_gru.weight, dim = 1, keepdim = True) normed_weight = torch.div(self.fc2_gru.weight, norm) cos_output_tmp =, normed_weight.t()) cos_impo = cos_output_tmp.gather(1, y2) cos_target = cos_output_tmp.gather(1, y.view(-1, 1)) hard_negatives, _ = torch.topk(cos_impo, self.negative_k, dim=1, sorted=False) hard_negatives = F.relu(hard_negatives, inplace=True) trg_score = cos_target*-1. h_loss = torch.log(1.+torch.exp(hard_negatives+trg_score).sum(dim=1)) return out, h_loss ''' def _make_layer(self, nb_blocks, nb_filts, first = False): layers = [] #def __init__(self, nb_filts, first = False): for i in range(nb_blocks): first = first if i == 0 else False layers.append(Residual_block(nb_filts = nb_filts, first = first)) if i == 0: nb_filts[0] = nb_filts[1] return nn.Sequential(*layers) def summary(self, input_size, batch_size=-1, device="cuda", print_fn = None): if print_fn == None: printfn = print model = self def register_hook(module): def hook(module, input, output): class_name = str(module.__class__).split(".")[-1].split("'")[0] module_idx = len(summary) m_key = "%s-%i" % (class_name, module_idx + 1) summary[m_key] = OrderedDict() summary[m_key]["input_shape"] = list(input[0].size()) summary[m_key]["input_shape"][0] = batch_size if isinstance(output, (list, tuple)): summary[m_key]["output_shape"] = [ [-1] + list(o.size())[1:] for o in output ] else: summary[m_key]["output_shape"] = list(output.size()) if len(summary[m_key]["output_shape"]) != 0: summary[m_key]["output_shape"][0] = batch_size params = 0 if hasattr(module, "weight") and hasattr(module.weight, "size"): params += summary[m_key]["trainable"] = module.weight.requires_grad if hasattr(module, "bias") and hasattr(module.bias, "size"): params += summary[m_key]["nb_params"] = params if ( not isinstance(module, nn.Sequential) and not isinstance(module, nn.ModuleList) and not (module == model) ): hooks.append(module.register_forward_hook(hook)) device = device.lower() assert device in [ "cuda", "cpu", ], "Input device is not valid, please specify 'cuda' or 'cpu'" if device == "cuda" and torch.cuda.is_available(): dtype = torch.cuda.FloatTensor else: dtype = torch.FloatTensor if isinstance(input_size, tuple): input_size = [input_size] x = [torch.rand(2, *in_size).type(dtype) for in_size in input_size] summary = OrderedDict() hooks = [] model.apply(register_hook) model(*x) for h in hooks: h.remove() print_fn("----------------------------------------------------------------") line_new = "{:>20} {:>25} {:>15}".format("Layer (type)", "Output Shape", "Param #") print_fn(line_new) print_fn("================================================================") total_params = 0 total_output = 0 trainable_params = 0 for layer in summary: # input_shape, output_shape, trainable, nb_params line_new = "{:>20} {:>25} {:>15}".format( layer, str(summary[layer]["output_shape"]), "{0:,}".format(summary[layer]["nb_params"]), ) total_params += summary[layer]["nb_params"] total_output +=[layer]["output_shape"]) if "trainable" in summary[layer]: if summary[layer]["trainable"] == True: trainable_params += summary[layer]["nb_params"] print_fn(line_new) # assume 4 bytes/number (float on cuda). total_input_size = abs( * batch_size * 4. / (1024 ** 2.)) total_output_size = abs(2. * total_output * 4. / (1024 ** 2.)) # x2 for gradients total_params_size = abs(total_params.numpy() * 4. / (1024 ** 2.)) total_size = total_params_size + total_output_size + total_input_size print_fn("================================================================") print_fn("Total params: {0:,}".format(total_params)) print_fn("Trainable params: {0:,}".format(trainable_params)) print_fn("Non-trainable params: {0:,}".format(total_params - trainable_params)) print_fn("----------------------------------------------------------------") print_fn("Input size (MB): %0.2f" % total_input_size) print_fn("Forward/backward pass size (MB): %0.2f" % total_output_size) print_fn("Params size (MB): %0.2f" % total_params_size) print_fn("Estimated Total Size (MB): %0.2f" % total_size) print_fn("----------------------------------------------------------------") return class MobileNetV2RawAudio(nn.Module): def __init__(self, num_classes, num_samples, dropout_rate=0.0, activation='ReLU'): super(MobileNetV2RawAudio, self).__init__() # Load MobileNetV2 model without the fully connected layer self.num_classes=num_classes self.num_samples = num_samples self.features = models.mobilenet_v2(pretrained=True).features # self.model.classifier[1] = nn.Linear(in_features=self.model.classifier[1].in_features, out_features=num_classes) # # Replace the first convolutional layer to accept 1-channel input self.features[0][0] = nn.Conv2d(1, 32, kernel_size=(3,3), stride=(2,2), padding=(1, 1), bias=False) # # Modify the classifier for the new input shape # num_features = self.model.classifier[1].in_features self.classifier = nn.Sequential( nn.Dropout(p=dropout_rate), nn.Linear(1280, num_classes), self.get_activation(activation) ) def forward(self, x): # if self.num_channels == 1: # x = x.unsqueeze(1) # Convert 1-channel input to 3-channel input expected by MobileNetV2 # x =[x, x, x], dim=1) x = x.view(-1, 1, self.num_samples // 60, 60) # Apply the feature extractor layers x = self.features(x) # Apply global average pooling x = nn.functional.adaptive_avg_pool2d(x, (1, 1)) # Flatten the tensor for the fully connected layers x = torch.flatten(x, 1) # Apply the classifier layers x = self.classifier(x) return x def get_activation(self, activation): if activation == 'ReLU': return nn.ReLU(inplace=True) elif activation == 'LeakyReLU': return nn.LeakyReLU(inplace=True) elif activation == 'Sigmoid': return nn.Sigmoid() else: raise ValueError("Invalid activation function. Please choose from 'ReLU', 'LeakyReLU', or 'Sigmoid'.") class YAMNet(nn.Module): def __init__(self, num_classes, num_samples, kernel_size): super(YAMNet, self).__init__() self.num_classes = num_classes self.num_samples = num_samples # Define the layers self.conv1 = nn.Sequential( nn.Conv1d(1, 32, kernel_size=kernel_size, stride=2, padding=1), nn.BatchNorm1d(32, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv1 = nn.Sequential( nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1, groups=32), # nn.BatchNorm1d(32, eps=1e-4), nn.Conv1d(64, 64, kernel_size=1), nn.BatchNorm1d(64, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv2 = nn.Sequential( nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1, groups=64), # nn.BatchNorm1d(64, eps=1e-4), nn.Conv1d(128, 128, kernel_size=1), nn.BatchNorm1d(128, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv3 = nn.Sequential( nn.Conv1d(128, 128, kernel_size=3, stride=1, padding=1, groups=128), nn.Conv1d(128, 128, kernel_size=1), nn.BatchNorm1d(128, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv4 = nn.Sequential( nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1, groups=128), nn.Conv1d(256, 256, kernel_size=1), nn.BatchNorm1d(256, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv5 = nn.Sequential( nn.Conv1d(256, 256, kernel_size=3, stride=1, padding=1, groups=256), nn.Conv1d(256, 256, kernel_size=1), nn.BatchNorm1d(256, eps=1e-4), nn.LeakyReLU(inplace=True) ) self.separable_conv6 = nn.Sequential( nn.Conv1d(256, 512, kernel_size=3, stride=2, padding=1, groups=256), # nn.BatchNorm1d(256), nn.Conv1d(512, 512, kernel_size=1), nn.BatchNorm1d(512), nn.LeakyReLU(inplace=True) ) self.separable_conv7 = nn.Sequential( nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512), # nn.BatchNorm1d(512), nn.Conv1d(512, 512, kernel_size=1), nn.BatchNorm1d(512), nn.LeakyReLU(inplace=True) ) self.separable_conv8 = nn.Sequential( nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512), # nn.BatchNorm1d(512), nn.Conv1d(512, 512, kernel_size=1), nn.BatchNorm1d(512), nn.LeakyReLU(inplace=True) ) self.separable_conv9 = nn.Sequential( nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1, groups=512), # nn.BatchNorm1d(512), nn.Conv1d(512, 512, kernel_size=1), nn.BatchNorm1d(512), nn.LeakyReLU(inplace=True) ) self.separable_conv10 = nn.Sequential( nn.Conv1d(512, 1024, kernel_size=3, stride=2, padding=1, groups=512), nn.Conv1d(1024, 1024, kernel_size=1), nn.BatchNorm1d(1024), nn.LeakyReLU(inplace=True) ) self.separable_conv11 = nn.Sequential( nn.Conv1d(1024, 1024, kernel_size=3, stride=2, padding=1, groups=1024), nn.Conv1d(1024, 1024, kernel_size=1), nn.BatchNorm1d(1024), nn.LeakyReLU(inplace=True) ) self.global_pool = nn.AdaptiveAvgPool1d(1) self.classifier = nn.Linear(1024, num_classes) self.apply(init_weights_he) def forward(self, x): # print(x.shape) # x = x.view(-1, 1, self.num_samples) x=x.unsqueeze(1) # print("after", x.shape) # Apply the convolutional layers x = F.relu(self.conv1(x)) x = self.separable_conv1(x) x = self.separable_conv2(x) x = self.separable_conv3(x) x = self.separable_conv4(x) x = self.separable_conv5(x) x = self.separable_conv6(x) x = self.separable_conv7(x) x = self.separable_conv8(x) x = self.separable_conv9(x) x = self.separable_conv10(x) x = self.separable_conv11(x) # Global average pooling x = self.global_pool(x) # Flatten the output x = x.view(x.size(0), -1) # Classifier x = self.classifier(x) return x