Spaces:
Runtime error
Runtime error
| from utils.google_utils import * | |
| from utils.layers import * | |
| from utils.parse_config import * | |
| from utils import torch_utils | |
| ONNX_EXPORT = False | |
| def create_modules(module_defs, img_size, cfg): | |
| # Constructs module list of layer blocks from module configuration in module_defs | |
| img_size = [img_size] * 2 if isinstance(img_size, int) else img_size # expand if necessary | |
| _ = module_defs.pop(0) # cfg training hyperparams (unused) | |
| output_filters = [3] # input channels | |
| module_list = nn.ModuleList() | |
| routs = [] # list of layers which rout to deeper layers | |
| yolo_index = -1 | |
| for i, mdef in enumerate(module_defs): | |
| modules = nn.Sequential() | |
| if mdef['type'] == 'convolutional': | |
| bn = mdef['batch_normalize'] | |
| filters = mdef['filters'] | |
| k = mdef['size'] # kernel size | |
| stride = mdef['stride'] if 'stride' in mdef else (mdef['stride_y'], mdef['stride_x']) | |
| if isinstance(k, int): # single-size conv | |
| modules.add_module('Conv2d', nn.Conv2d(in_channels=output_filters[-1], | |
| out_channels=filters, | |
| kernel_size=k, | |
| stride=stride, | |
| padding=k // 2 if mdef['pad'] else 0, | |
| groups=mdef['groups'] if 'groups' in mdef else 1, | |
| bias=not bn)) | |
| else: # multiple-size conv | |
| modules.add_module('MixConv2d', MixConv2d(in_ch=output_filters[-1], | |
| out_ch=filters, | |
| k=k, | |
| stride=stride, | |
| bias=not bn)) | |
| if bn: | |
| modules.add_module('BatchNorm2d', nn.BatchNorm2d(filters, momentum=0.03, eps=1E-4)) | |
| else: | |
| routs.append(i) # detection output (goes into yolo layer) | |
| if mdef['activation'] == 'leaky': # activation study https://github.com/ultralytics/yolov3/issues/441 | |
| modules.add_module('activation', nn.LeakyReLU(0.1, inplace=True)) | |
| elif mdef['activation'] == 'swish': | |
| modules.add_module('activation', Swish()) | |
| elif mdef['activation'] == 'mish': | |
| modules.add_module('activation', Mish()) | |
| elif mdef['activation'] == 'emb': | |
| modules.add_module('activation', F.normalize()) | |
| elif mdef['activation'] == 'logistic': | |
| modules.add_module('activation', nn.Sigmoid()) | |
| elif mdef['activation'] == 'silu': | |
| modules.add_module('activation', nn.SiLU()) | |
| elif mdef['type'] == 'deformableconvolutional': | |
| bn = mdef['batch_normalize'] | |
| filters = mdef['filters'] | |
| k = mdef['size'] # kernel size | |
| stride = mdef['stride'] if 'stride' in mdef else (mdef['stride_y'], mdef['stride_x']) | |
| if isinstance(k, int): # single-size conv | |
| modules.add_module('DeformConv2d', DeformConv2d(output_filters[-1], | |
| filters, | |
| kernel_size=k, | |
| padding=k // 2 if mdef['pad'] else 0, | |
| stride=stride, | |
| bias=not bn, | |
| modulation=True)) | |
| else: # multiple-size conv | |
| modules.add_module('MixConv2d', MixConv2d(in_ch=output_filters[-1], | |
| out_ch=filters, | |
| k=k, | |
| stride=stride, | |
| bias=not bn)) | |
| if bn: | |
| modules.add_module('BatchNorm2d', nn.BatchNorm2d(filters, momentum=0.03, eps=1E-4)) | |
| else: | |
| routs.append(i) # detection output (goes into yolo layer) | |
| if mdef['activation'] == 'leaky': # activation study https://github.com/ultralytics/yolov3/issues/441 | |
| modules.add_module('activation', nn.LeakyReLU(0.1, inplace=True)) | |
| elif mdef['activation'] == 'swish': | |
| modules.add_module('activation', Swish()) | |
| elif mdef['activation'] == 'mish': | |
| modules.add_module('activation', Mish()) | |
| elif mdef['activation'] == 'silu': | |
| modules.add_module('activation', nn.SiLU()) | |
| elif mdef['type'] == 'dropout': | |
| p = mdef['probability'] | |
| modules = nn.Dropout(p) | |
| elif mdef['type'] == 'avgpool': | |
| modules = GAP() | |
| elif mdef['type'] == 'silence': | |
| filters = output_filters[-1] | |
| modules = Silence() | |
| elif mdef['type'] == 'scale_channels': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ScaleChannel(layers=layers) | |
| elif mdef['type'] == 'shift_channels': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ShiftChannel(layers=layers) | |
| elif mdef['type'] == 'shift_channels_2d': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ShiftChannel2D(layers=layers) | |
| elif mdef['type'] == 'control_channels': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ControlChannel(layers=layers) | |
| elif mdef['type'] == 'control_channels_2d': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ControlChannel2D(layers=layers) | |
| elif mdef['type'] == 'alternate_channels': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] * 2 | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = AlternateChannel(layers=layers) | |
| elif mdef['type'] == 'alternate_channels_2d': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] * 2 | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = AlternateChannel2D(layers=layers) | |
| elif mdef['type'] == 'select_channels': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = SelectChannel(layers=layers) | |
| elif mdef['type'] == 'select_channels_2d': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = SelectChannel2D(layers=layers) | |
| elif mdef['type'] == 'sam': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = ScaleSpatial(layers=layers) | |
| elif mdef['type'] == 'BatchNorm2d': | |
| filters = output_filters[-1] | |
| modules = nn.BatchNorm2d(filters, momentum=0.03, eps=1E-4) | |
| if i == 0 and filters == 3: # normalize RGB image | |
| # imagenet mean and var https://pytorch.org/docs/stable/torchvision/models.html#classification | |
| modules.running_mean = torch.tensor([0.485, 0.456, 0.406]) | |
| modules.running_var = torch.tensor([0.0524, 0.0502, 0.0506]) | |
| elif mdef['type'] == 'maxpool': | |
| k = mdef['size'] # kernel size | |
| stride = mdef['stride'] | |
| maxpool = nn.MaxPool2d(kernel_size=k, stride=stride, padding=(k - 1) // 2) | |
| if k == 2 and stride == 1: # yolov3-tiny | |
| modules.add_module('ZeroPad2d', nn.ZeroPad2d((0, 1, 0, 1))) | |
| modules.add_module('MaxPool2d', maxpool) | |
| else: | |
| modules = maxpool | |
| elif mdef['type'] == 'local_avgpool': | |
| k = mdef['size'] # kernel size | |
| stride = mdef['stride'] | |
| avgpool = nn.AvgPool2d(kernel_size=k, stride=stride, padding=(k - 1) // 2) | |
| if k == 2 and stride == 1: # yolov3-tiny | |
| modules.add_module('ZeroPad2d', nn.ZeroPad2d((0, 1, 0, 1))) | |
| modules.add_module('AvgPool2d', avgpool) | |
| else: | |
| modules = avgpool | |
| elif mdef['type'] == 'upsample': | |
| if ONNX_EXPORT: # explicitly state size, avoid scale_factor | |
| g = (yolo_index + 1) * 2 / 32 # gain | |
| modules = nn.Upsample(size=tuple(int(x * g) for x in img_size)) # img_size = (320, 192) | |
| else: | |
| modules = nn.Upsample(scale_factor=mdef['stride']) | |
| elif mdef['type'] == 'route': # nn.Sequential() placeholder for 'route' layer | |
| layers = mdef['layers'] | |
| filters = sum([output_filters[l + 1 if l > 0 else l] for l in layers]) | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = FeatureConcat(layers=layers) | |
| elif mdef['type'] == 'route2': # nn.Sequential() placeholder for 'route' layer | |
| layers = mdef['layers'] | |
| filters = sum([output_filters[l + 1 if l > 0 else l] for l in layers]) | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = FeatureConcat2(layers=layers) | |
| elif mdef['type'] == 'route3': # nn.Sequential() placeholder for 'route' layer | |
| layers = mdef['layers'] | |
| filters = sum([output_filters[l + 1 if l > 0 else l] for l in layers]) | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = FeatureConcat3(layers=layers) | |
| elif mdef['type'] == 'route_lhalf': # nn.Sequential() placeholder for 'route' layer | |
| layers = mdef['layers'] | |
| filters = sum([output_filters[l + 1 if l > 0 else l] for l in layers])//2 | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = FeatureConcat_l(layers=layers) | |
| elif mdef['type'] == 'shortcut': # nn.Sequential() placeholder for 'shortcut' layer | |
| layers = mdef['from'] | |
| filters = output_filters[-1] | |
| routs.extend([i + l if l < 0 else l for l in layers]) | |
| modules = WeightedFeatureFusion(layers=layers, weight='weights_type' in mdef) | |
| elif mdef['type'] == 'reorg3d': # yolov3-spp-pan-scale | |
| pass | |
| elif mdef['type'] == 'reorg': # yolov3-spp-pan-scale | |
| filters = 4 * output_filters[-1] | |
| modules.add_module('Reorg', Reorg()) | |
| elif mdef['type'] == 'dwt': # yolov3-spp-pan-scale | |
| filters = 4 * output_filters[-1] | |
| modules.add_module('DWT', DWT()) | |
| elif mdef['type'] == 'implicit_add': # yolov3-spp-pan-scale | |
| filters = mdef['filters'] | |
| modules = ImplicitA(channel=filters) | |
| elif mdef['type'] == 'implicit_mul': # yolov3-spp-pan-scale | |
| filters = mdef['filters'] | |
| modules = ImplicitM(channel=filters) | |
| elif mdef['type'] == 'implicit_cat': # yolov3-spp-pan-scale | |
| filters = mdef['filters'] | |
| modules = ImplicitC(channel=filters) | |
| elif mdef['type'] == 'implicit_add_2d': # yolov3-spp-pan-scale | |
| channels = mdef['filters'] | |
| filters = mdef['atoms'] | |
| modules = Implicit2DA(atom=filters, channel=channels) | |
| elif mdef['type'] == 'implicit_mul_2d': # yolov3-spp-pan-scale | |
| channels = mdef['filters'] | |
| filters = mdef['atoms'] | |
| modules = Implicit2DM(atom=filters, channel=channels) | |
| elif mdef['type'] == 'implicit_cat_2d': # yolov3-spp-pan-scale | |
| channels = mdef['filters'] | |
| filters = mdef['atoms'] | |
| modules = Implicit2DC(atom=filters, channel=channels) | |
| elif mdef['type'] == 'yolo': | |
| yolo_index += 1 | |
| stride = [8, 16, 32, 64, 128] # P3, P4, P5, P6, P7 strides | |
| if any(x in cfg for x in ['yolov4-tiny', 'fpn', 'yolov3']): # P5, P4, P3 strides | |
| stride = [32, 16, 8] | |
| layers = mdef['from'] if 'from' in mdef else [] | |
| modules = YOLOLayer(anchors=mdef['anchors'][mdef['mask']], # anchor list | |
| nc=mdef['classes'], # number of classes | |
| img_size=img_size, # (416, 416) | |
| yolo_index=yolo_index, # 0, 1, 2... | |
| layers=layers, # output layers | |
| stride=stride[yolo_index]) | |
| # Initialize preceding Conv2d() bias (https://arxiv.org/pdf/1708.02002.pdf section 3.3) | |
| try: | |
| j = layers[yolo_index] if 'from' in mdef else -2 | |
| bias_ = module_list[j][0].bias # shape(255,) | |
| bias = bias_[:modules.no * modules.na].view(modules.na, -1) # shape(3,85) | |
| #bias[:, 4] += -4.5 # obj | |
| bias.data[:, 4] += math.log(8 / (640 / stride[yolo_index]) ** 2) # obj (8 objects per 640 image) | |
| bias.data[:, 5:] += math.log(0.6 / (modules.nc - 0.99)) # cls (sigmoid(p) = 1/nc) | |
| module_list[j][0].bias = torch.nn.Parameter(bias_, requires_grad=bias_.requires_grad) | |
| #j = [-2, -5, -8] | |
| #for sj in j: | |
| # bias_ = module_list[sj][0].bias | |
| # bias = bias_[:modules.no * 1].view(1, -1) | |
| # bias.data[:, 4] += math.log(8 / (640 / stride[yolo_index]) ** 2) | |
| # bias.data[:, 5:] += math.log(0.6 / (modules.nc - 0.99)) | |
| # module_list[sj][0].bias = torch.nn.Parameter(bias_, requires_grad=bias_.requires_grad) | |
| except: | |
| print('WARNING: smart bias initialization failure.') | |
| elif mdef['type'] == 'jde': | |
| yolo_index += 1 | |
| stride = [8, 16, 32, 64, 128] # P3, P4, P5, P6, P7 strides | |
| if any(x in cfg for x in ['yolov4-tiny', 'fpn', 'yolov3']): # P5, P4, P3 strides | |
| stride = [32, 16, 8] | |
| layers = mdef['from'] if 'from' in mdef else [] | |
| modules = JDELayer(anchors=mdef['anchors'][mdef['mask']], # anchor list | |
| nc=mdef['classes'], # number of classes | |
| img_size=img_size, # (416, 416) | |
| yolo_index=yolo_index, # 0, 1, 2... | |
| layers=layers, # output layers | |
| stride=stride[yolo_index]) | |
| # Initialize preceding Conv2d() bias (https://arxiv.org/pdf/1708.02002.pdf section 3.3) | |
| try: | |
| j = layers[yolo_index] if 'from' in mdef else -1 | |
| bias_ = module_list[j][0].bias # shape(255,) | |
| bias = bias_[:modules.no * modules.na].view(modules.na, -1) # shape(3,85) | |
| #bias[:, 4] += -4.5 # obj | |
| bias.data[:, 4] += math.log(8 / (640 / stride[yolo_index]) ** 2) # obj (8 objects per 640 image) | |
| bias.data[:, 5:] += math.log(0.6 / (modules.nc - 0.99)) # cls (sigmoid(p) = 1/nc) | |
| module_list[j][0].bias = torch.nn.Parameter(bias_, requires_grad=bias_.requires_grad) | |
| except: | |
| print('WARNING: smart bias initialization failure.') | |
| else: | |
| print('Warning: Unrecognized Layer Type: ' + mdef['type']) | |
| # Register module list and number of output filters | |
| module_list.append(modules) | |
| output_filters.append(filters) | |
| routs_binary = [False] * (i + 1) | |
| for i in routs: | |
| routs_binary[i] = True | |
| return module_list, routs_binary | |
| class YOLOLayer(nn.Module): | |
| def __init__(self, anchors, nc, img_size, yolo_index, layers, stride): | |
| super(YOLOLayer, self).__init__() | |
| self.anchors = torch.Tensor(anchors) | |
| self.index = yolo_index # index of this layer in layers | |
| self.layers = layers # model output layer indices | |
| self.stride = stride # layer stride | |
| self.nl = len(layers) # number of output layers (3) | |
| self.na = len(anchors) # number of anchors (3) | |
| self.nc = nc # number of classes (80) | |
| self.no = nc + 5 # number of outputs (85) | |
| self.nx, self.ny, self.ng = 0, 0, 0 # initialize number of x, y gridpoints | |
| self.anchor_vec = self.anchors / self.stride | |
| self.anchor_wh = self.anchor_vec.view(1, self.na, 1, 1, 2) | |
| if ONNX_EXPORT: | |
| self.training = False | |
| self.create_grids((img_size[1] // stride, img_size[0] // stride)) # number x, y grid points | |
| def create_grids(self, ng=(13, 13), device='cpu'): | |
| self.nx, self.ny = ng # x and y grid size | |
| self.ng = torch.tensor(ng, dtype=torch.float) | |
| # build xy offsets | |
| if not self.training: | |
| yv, xv = torch.meshgrid([torch.arange(self.ny, device=device), torch.arange(self.nx, device=device)]) | |
| self.grid = torch.stack((xv, yv), 2).view((1, 1, self.ny, self.nx, 2)).float() | |
| if self.anchor_vec.device != device: | |
| self.anchor_vec = self.anchor_vec.to(device) | |
| self.anchor_wh = self.anchor_wh.to(device) | |
| def forward(self, p, out): | |
| ASFF = False # https://arxiv.org/abs/1911.09516 | |
| if ASFF: | |
| i, n = self.index, self.nl # index in layers, number of layers | |
| p = out[self.layers[i]] | |
| bs, _, ny, nx = p.shape # bs, 255, 13, 13 | |
| if (self.nx, self.ny) != (nx, ny): | |
| self.create_grids((nx, ny), p.device) | |
| # outputs and weights | |
| # w = F.softmax(p[:, -n:], 1) # normalized weights | |
| w = torch.sigmoid(p[:, -n:]) * (2 / n) # sigmoid weights (faster) | |
| # w = w / w.sum(1).unsqueeze(1) # normalize across layer dimension | |
| # weighted ASFF sum | |
| p = out[self.layers[i]][:, :-n] * w[:, i:i + 1] | |
| for j in range(n): | |
| if j != i: | |
| p += w[:, j:j + 1] * \ | |
| F.interpolate(out[self.layers[j]][:, :-n], size=[ny, nx], mode='bilinear', align_corners=False) | |
| elif ONNX_EXPORT: | |
| bs = 1 # batch size | |
| else: | |
| bs, _, ny, nx = p.shape # bs, 255, 13, 13 | |
| if (self.nx, self.ny) != (nx, ny): | |
| self.create_grids((nx, ny), p.device) | |
| # p.view(bs, 255, 13, 13) -- > (bs, 3, 13, 13, 85) # (bs, anchors, grid, grid, classes + xywh) | |
| p = p.view(bs, self.na, self.no, self.ny, self.nx).permute(0, 1, 3, 4, 2).contiguous() # prediction | |
| if self.training: | |
| return p | |
| elif ONNX_EXPORT: | |
| # Avoid broadcasting for ANE operations | |
| m = self.na * self.nx * self.ny | |
| ng = 1. / self.ng.repeat(m, 1) | |
| grid = self.grid.repeat(1, self.na, 1, 1, 1).view(m, 2) | |
| anchor_wh = self.anchor_wh.repeat(1, 1, self.nx, self.ny, 1).view(m, 2) * ng | |
| p = p.view(m, self.no) | |
| xy = torch.sigmoid(p[:, 0:2]) + grid # x, y | |
| wh = torch.exp(p[:, 2:4]) * anchor_wh # width, height | |
| p_cls = torch.sigmoid(p[:, 4:5]) if self.nc == 1 else \ | |
| torch.sigmoid(p[:, 5:self.no]) * torch.sigmoid(p[:, 4:5]) # conf | |
| return p_cls, xy * ng, wh | |
| else: # inference | |
| io = p.sigmoid() | |
| io[..., :2] = (io[..., :2] * 2. - 0.5 + self.grid) | |
| io[..., 2:4] = (io[..., 2:4] * 2) ** 2 * self.anchor_wh | |
| io[..., :4] *= self.stride | |
| #io = p.clone() # inference output | |
| #io[..., :2] = torch.sigmoid(io[..., :2]) + self.grid # xy | |
| #io[..., 2:4] = torch.exp(io[..., 2:4]) * self.anchor_wh # wh yolo method | |
| #io[..., :4] *= self.stride | |
| #torch.sigmoid_(io[..., 4:]) | |
| return io.view(bs, -1, self.no), p # view [1, 3, 13, 13, 85] as [1, 507, 85] | |
| class JDELayer(nn.Module): | |
| def __init__(self, anchors, nc, img_size, yolo_index, layers, stride): | |
| super(JDELayer, self).__init__() | |
| self.anchors = torch.Tensor(anchors) | |
| self.index = yolo_index # index of this layer in layers | |
| self.layers = layers # model output layer indices | |
| self.stride = stride # layer stride | |
| self.nl = len(layers) # number of output layers (3) | |
| self.na = len(anchors) # number of anchors (3) | |
| self.nc = nc # number of classes (80) | |
| self.no = nc + 5 # number of outputs (85) | |
| self.nx, self.ny, self.ng = 0, 0, 0 # initialize number of x, y gridpoints | |
| self.anchor_vec = self.anchors / self.stride | |
| self.anchor_wh = self.anchor_vec.view(1, self.na, 1, 1, 2) | |
| if ONNX_EXPORT: | |
| self.training = False | |
| self.create_grids((img_size[1] // stride, img_size[0] // stride)) # number x, y grid points | |
| def create_grids(self, ng=(13, 13), device='cpu'): | |
| self.nx, self.ny = ng # x and y grid size | |
| self.ng = torch.tensor(ng, dtype=torch.float) | |
| # build xy offsets | |
| if not self.training: | |
| yv, xv = torch.meshgrid([torch.arange(self.ny, device=device), torch.arange(self.nx, device=device)]) | |
| self.grid = torch.stack((xv, yv), 2).view((1, 1, self.ny, self.nx, 2)).float() | |
| if self.anchor_vec.device != device: | |
| self.anchor_vec = self.anchor_vec.to(device) | |
| self.anchor_wh = self.anchor_wh.to(device) | |
| def forward(self, p, out): | |
| ASFF = False # https://arxiv.org/abs/1911.09516 | |
| if ASFF: | |
| i, n = self.index, self.nl # index in layers, number of layers | |
| p = out[self.layers[i]] | |
| bs, _, ny, nx = p.shape # bs, 255, 13, 13 | |
| if (self.nx, self.ny) != (nx, ny): | |
| self.create_grids((nx, ny), p.device) | |
| # outputs and weights | |
| # w = F.softmax(p[:, -n:], 1) # normalized weights | |
| w = torch.sigmoid(p[:, -n:]) * (2 / n) # sigmoid weights (faster) | |
| # w = w / w.sum(1).unsqueeze(1) # normalize across layer dimension | |
| # weighted ASFF sum | |
| p = out[self.layers[i]][:, :-n] * w[:, i:i + 1] | |
| for j in range(n): | |
| if j != i: | |
| p += w[:, j:j + 1] * \ | |
| F.interpolate(out[self.layers[j]][:, :-n], size=[ny, nx], mode='bilinear', align_corners=False) | |
| elif ONNX_EXPORT: | |
| bs = 1 # batch size | |
| else: | |
| bs, _, ny, nx = p.shape # bs, 255, 13, 13 | |
| if (self.nx, self.ny) != (nx, ny): | |
| self.create_grids((nx, ny), p.device) | |
| # p.view(bs, 255, 13, 13) -- > (bs, 3, 13, 13, 85) # (bs, anchors, grid, grid, classes + xywh) | |
| p = p.view(bs, self.na, self.no, self.ny, self.nx).permute(0, 1, 3, 4, 2).contiguous() # prediction | |
| if self.training: | |
| return p | |
| elif ONNX_EXPORT: | |
| # Avoid broadcasting for ANE operations | |
| m = self.na * self.nx * self.ny | |
| ng = 1. / self.ng.repeat(m, 1) | |
| grid = self.grid.repeat(1, self.na, 1, 1, 1).view(m, 2) | |
| anchor_wh = self.anchor_wh.repeat(1, 1, self.nx, self.ny, 1).view(m, 2) * ng | |
| p = p.view(m, self.no) | |
| xy = torch.sigmoid(p[:, 0:2]) + grid # x, y | |
| wh = torch.exp(p[:, 2:4]) * anchor_wh # width, height | |
| p_cls = torch.sigmoid(p[:, 4:5]) if self.nc == 1 else \ | |
| torch.sigmoid(p[:, 5:self.no]) * torch.sigmoid(p[:, 4:5]) # conf | |
| return p_cls, xy * ng, wh | |
| else: # inference | |
| #io = p.sigmoid() | |
| #io[..., :2] = (io[..., :2] * 2. - 0.5 + self.grid) | |
| #io[..., 2:4] = (io[..., 2:4] * 2) ** 2 * self.anchor_wh | |
| #io[..., :4] *= self.stride | |
| io = p.clone() # inference output | |
| io[..., :2] = torch.sigmoid(io[..., :2]) * 2. - 0.5 + self.grid # xy | |
| io[..., 2:4] = (torch.sigmoid(io[..., 2:4]) * 2) ** 2 * self.anchor_wh # wh yolo method | |
| io[..., :4] *= self.stride | |
| io[..., 4:] = F.softmax(io[..., 4:]) | |
| return io.view(bs, -1, self.no), p # view [1, 3, 13, 13, 85] as [1, 507, 85] | |
| class Darknet(nn.Module): | |
| # YOLOv3 object detection model | |
| def __init__(self, cfg, img_size=(416, 416), verbose=False): | |
| super(Darknet, self).__init__() | |
| self.module_defs = parse_model_cfg(cfg) | |
| self.module_list, self.routs = create_modules(self.module_defs, img_size, cfg) | |
| self.yolo_layers = get_yolo_layers(self) | |
| # torch_utils.initialize_weights(self) | |
| # Darknet Header https://github.com/AlexeyAB/darknet/issues/2914#issuecomment-496675346 | |
| self.version = np.array([0, 2, 5], dtype=np.int32) # (int32) version info: major, minor, revision | |
| self.seen = np.array([0], dtype=np.int64) # (int64) number of images seen during training | |
| self.info(verbose) if not ONNX_EXPORT else None # print model description | |
| def forward(self, x, augment=False, verbose=False): | |
| if not augment: | |
| return self.forward_once(x) | |
| else: # Augment images (inference and test only) https://github.com/ultralytics/yolov3/issues/931 | |
| img_size = x.shape[-2:] # height, width | |
| s = [0.83, 0.67] # scales | |
| y = [] | |
| for i, xi in enumerate((x, | |
| torch_utils.scale_img(x.flip(3), s[0], same_shape=False), # flip-lr and scale | |
| torch_utils.scale_img(x, s[1], same_shape=False), # scale | |
| )): | |
| # cv2.imwrite('img%g.jpg' % i, 255 * xi[0].numpy().transpose((1, 2, 0))[:, :, ::-1]) | |
| y.append(self.forward_once(xi)[0]) | |
| y[1][..., :4] /= s[0] # scale | |
| y[1][..., 0] = img_size[1] - y[1][..., 0] # flip lr | |
| y[2][..., :4] /= s[1] # scale | |
| # for i, yi in enumerate(y): # coco small, medium, large = < 32**2 < 96**2 < | |
| # area = yi[..., 2:4].prod(2)[:, :, None] | |
| # if i == 1: | |
| # yi *= (area < 96. ** 2).float() | |
| # elif i == 2: | |
| # yi *= (area > 32. ** 2).float() | |
| # y[i] = yi | |
| y = torch.cat(y, 1) | |
| return y, None | |
| def forward_once(self, x, augment=False, verbose=False): | |
| img_size = x.shape[-2:] # height, width | |
| yolo_out, out = [], [] | |
| if verbose: | |
| print('0', x.shape) | |
| str = '' | |
| # Augment images (inference and test only) | |
| if augment: # https://github.com/ultralytics/yolov3/issues/931 | |
| nb = x.shape[0] # batch size | |
| s = [0.83, 0.67] # scales | |
| x = torch.cat((x, | |
| torch_utils.scale_img(x.flip(3), s[0]), # flip-lr and scale | |
| torch_utils.scale_img(x, s[1]), # scale | |
| ), 0) | |
| for i, module in enumerate(self.module_list): | |
| name = module.__class__.__name__ | |
| #print(name) | |
| if name in ['WeightedFeatureFusion', 'FeatureConcat', 'FeatureConcat2', 'FeatureConcat3', 'FeatureConcat_l', 'ScaleChannel', 'ShiftChannel', 'ShiftChannel2D', 'ControlChannel', 'ControlChannel2D', 'AlternateChannel', 'AlternateChannel2D', 'SelectChannel', 'SelectChannel2D', 'ScaleSpatial']: # sum, concat | |
| if verbose: | |
| l = [i - 1] + module.layers # layers | |
| sh = [list(x.shape)] + [list(out[i].shape) for i in module.layers] # shapes | |
| str = ' >> ' + ' + '.join(['layer %g %s' % x for x in zip(l, sh)]) | |
| x = module(x, out) # WeightedFeatureFusion(), FeatureConcat() | |
| elif name in ['ImplicitA', 'ImplicitM', 'ImplicitC', 'Implicit2DA', 'Implicit2DM', 'Implicit2DC']: | |
| x = module() | |
| elif name == 'YOLOLayer': | |
| yolo_out.append(module(x, out)) | |
| elif name == 'JDELayer': | |
| yolo_out.append(module(x, out)) | |
| else: # run module directly, i.e. mtype = 'convolutional', 'upsample', 'maxpool', 'batchnorm2d' etc. | |
| #print(module) | |
| #print(x.shape) | |
| x = module(x) | |
| out.append(x if self.routs[i] else []) | |
| if verbose: | |
| print('%g/%g %s -' % (i, len(self.module_list), name), list(x.shape), str) | |
| str = '' | |
| if self.training: # train | |
| return yolo_out | |
| elif ONNX_EXPORT: # export | |
| x = [torch.cat(x, 0) for x in zip(*yolo_out)] | |
| return x[0], torch.cat(x[1:3], 1) # scores, boxes: 3780x80, 3780x4 | |
| else: # inference or test | |
| x, p = zip(*yolo_out) # inference output, training output | |
| x = torch.cat(x, 1) # cat yolo outputs | |
| if augment: # de-augment results | |
| x = torch.split(x, nb, dim=0) | |
| x[1][..., :4] /= s[0] # scale | |
| x[1][..., 0] = img_size[1] - x[1][..., 0] # flip lr | |
| x[2][..., :4] /= s[1] # scale | |
| x = torch.cat(x, 1) | |
| return x, p | |
| def fuse(self): | |
| # Fuse Conv2d + BatchNorm2d layers throughout model | |
| print('Fusing layers...') | |
| fused_list = nn.ModuleList() | |
| for a in list(self.children())[0]: | |
| if isinstance(a, nn.Sequential): | |
| for i, b in enumerate(a): | |
| if isinstance(b, nn.modules.batchnorm.BatchNorm2d): | |
| # fuse this bn layer with the previous conv2d layer | |
| conv = a[i - 1] | |
| fused = torch_utils.fuse_conv_and_bn(conv, b) | |
| a = nn.Sequential(fused, *list(a.children())[i + 1:]) | |
| break | |
| fused_list.append(a) | |
| self.module_list = fused_list | |
| self.info() if not ONNX_EXPORT else None # yolov3-spp reduced from 225 to 152 layers | |
| def info(self, verbose=False): | |
| torch_utils.model_info(self, verbose) | |
| def get_yolo_layers(model): | |
| return [i for i, m in enumerate(model.module_list) if m.__class__.__name__ in ['YOLOLayer', 'JDELayer']] # [89, 101, 113] | |
| def load_darknet_weights(self, weights, cutoff=-1): | |
| # Parses and loads the weights stored in 'weights' | |
| # Establish cutoffs (load layers between 0 and cutoff. if cutoff = -1 all are loaded) | |
| file = Path(weights).name | |
| if file == 'darknet53.conv.74': | |
| cutoff = 75 | |
| elif file == 'yolov3-tiny.conv.15': | |
| cutoff = 15 | |
| # Read weights file | |
| with open(weights, 'rb') as f: | |
| # Read Header https://github.com/AlexeyAB/darknet/issues/2914#issuecomment-496675346 | |
| self.version = np.fromfile(f, dtype=np.int32, count=3) # (int32) version info: major, minor, revision | |
| self.seen = np.fromfile(f, dtype=np.int64, count=1) # (int64) number of images seen during training | |
| weights = np.fromfile(f, dtype=np.float32) # the rest are weights | |
| ptr = 0 | |
| for i, (mdef, module) in enumerate(zip(self.module_defs[:cutoff], self.module_list[:cutoff])): | |
| if mdef['type'] == 'convolutional': | |
| conv = module[0] | |
| if mdef['batch_normalize']: | |
| # Load BN bias, weights, running mean and running variance | |
| bn = module[1] | |
| nb = bn.bias.numel() # number of biases | |
| # Bias | |
| bn.bias.data.copy_(torch.from_numpy(weights[ptr:ptr + nb]).view_as(bn.bias)) | |
| ptr += nb | |
| # Weight | |
| bn.weight.data.copy_(torch.from_numpy(weights[ptr:ptr + nb]).view_as(bn.weight)) | |
| ptr += nb | |
| # Running Mean | |
| bn.running_mean.data.copy_(torch.from_numpy(weights[ptr:ptr + nb]).view_as(bn.running_mean)) | |
| ptr += nb | |
| # Running Var | |
| bn.running_var.data.copy_(torch.from_numpy(weights[ptr:ptr + nb]).view_as(bn.running_var)) | |
| ptr += nb | |
| else: | |
| # Load conv. bias | |
| nb = conv.bias.numel() | |
| conv_b = torch.from_numpy(weights[ptr:ptr + nb]).view_as(conv.bias) | |
| conv.bias.data.copy_(conv_b) | |
| ptr += nb | |
| # Load conv. weights | |
| nw = conv.weight.numel() # number of weights | |
| conv.weight.data.copy_(torch.from_numpy(weights[ptr:ptr + nw]).view_as(conv.weight)) | |
| ptr += nw | |
| def save_weights(self, path='model.weights', cutoff=-1): | |
| # Converts a PyTorch model to Darket format (*.pt to *.weights) | |
| # Note: Does not work if model.fuse() is applied | |
| with open(path, 'wb') as f: | |
| # Write Header https://github.com/AlexeyAB/darknet/issues/2914#issuecomment-496675346 | |
| self.version.tofile(f) # (int32) version info: major, minor, revision | |
| self.seen.tofile(f) # (int64) number of images seen during training | |
| # Iterate through layers | |
| for i, (mdef, module) in enumerate(zip(self.module_defs[:cutoff], self.module_list[:cutoff])): | |
| if mdef['type'] == 'convolutional': | |
| conv_layer = module[0] | |
| # If batch norm, load bn first | |
| if mdef['batch_normalize']: | |
| bn_layer = module[1] | |
| bn_layer.bias.data.cpu().numpy().tofile(f) | |
| bn_layer.weight.data.cpu().numpy().tofile(f) | |
| bn_layer.running_mean.data.cpu().numpy().tofile(f) | |
| bn_layer.running_var.data.cpu().numpy().tofile(f) | |
| # Load conv bias | |
| else: | |
| conv_layer.bias.data.cpu().numpy().tofile(f) | |
| # Load conv weights | |
| conv_layer.weight.data.cpu().numpy().tofile(f) | |
| def convert(cfg='cfg/yolov3-spp.cfg', weights='weights/yolov3-spp.weights', saveto='converted.weights'): | |
| # Converts between PyTorch and Darknet format per extension (i.e. *.weights convert to *.pt and vice versa) | |
| # from models import *; convert('cfg/yolov3-spp.cfg', 'weights/yolov3-spp.weights') | |
| # Initialize model | |
| model = Darknet(cfg) | |
| ckpt = torch.load(weights) # load checkpoint | |
| try: | |
| ckpt['model'] = {k: v for k, v in ckpt['model'].items() if model.state_dict()[k].numel() == v.numel()} | |
| model.load_state_dict(ckpt['model'], strict=False) | |
| save_weights(model, path=saveto, cutoff=-1) | |
| except KeyError as e: | |
| print(e) | |
| def attempt_download(weights): | |
| # Attempt to download pretrained weights if not found locally | |
| weights = weights.strip() | |
| msg = weights + ' missing, try downloading from https://drive.google.com/open?id=1LezFG5g3BCW6iYaV89B2i64cqEUZD7e0' | |
| if len(weights) > 0 and not os.path.isfile(weights): | |
| d = {''} | |
| file = Path(weights).name | |
| if file in d: | |
| r = gdrive_download(id=d[file], name=weights) | |
| else: # download from pjreddie.com | |
| url = 'https://pjreddie.com/media/files/' + file | |
| print('Downloading ' + url) | |
| r = os.system('curl -f ' + url + ' -o ' + weights) | |
| # Error check | |
| if not (r == 0 and os.path.exists(weights) and os.path.getsize(weights) > 1E6): # weights exist and > 1MB | |
| os.system('rm ' + weights) # remove partial downloads | |
| raise Exception(msg) | |