# This file contains modules common to various models import math import numpy as np import requests import torch import torch.nn as nn from PIL import Image, ImageDraw from utils.datasets import letterbox from utils.general import non_max_suppression, make_divisible, scale_coords, xyxy2xywh from utils.plots import color_list def autopad(k, p=None): # kernel, padding # Pad to 'same' if p is None: p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad return p def channel_shuffle(x, groups): batchsize, num_channels, height, width = x.data.size() channels_per_group = num_channels // groups # reshape x = x.view(batchsize, groups, channels_per_group, height, width) x = torch.transpose(x, 1, 2).contiguous() # flatten x = x.view(batchsize, -1, height, width) return x def DWConv(c1, c2, k=1, s=1, act=True): # Depthwise convolution return Conv(c1, c2, k, s, g=math.gcd(c1, c2), act=act) class Conv(nn.Module): # Standard convolution def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups super(Conv, self).__init__() self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False) self.bn = nn.BatchNorm2d(c2) self.act = nn.SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity()) #self.act = self.act = nn.LeakyReLU(0.1, inplace=True) if act is True else (act if isinstance(act, nn.Module) else nn.Identity()) def forward(self, x): return self.act(self.bn(self.conv(x))) def fuseforward(self, x): return self.act(self.conv(x)) class StemBlock(nn.Module): def __init__(self, c1, c2, k=3, s=2, p=None, g=1, act=True): super(StemBlock, self).__init__() self.stem_1 = Conv(c1, c2, k, s, p, g, act) self.stem_2a = Conv(c2, c2 // 2, 1, 1, 0) self.stem_2b = Conv(c2 // 2, c2, 3, 2, 1) self.stem_2p = nn.MaxPool2d(kernel_size=2,stride=2,ceil_mode=True) self.stem_3 = Conv(c2 * 2, c2, 1, 1, 0) def forward(self, x): stem_1_out = self.stem_1(x) stem_2a_out = self.stem_2a(stem_1_out) stem_2b_out = self.stem_2b(stem_2a_out) stem_2p_out = self.stem_2p(stem_1_out) out = self.stem_3(torch.cat((stem_2b_out,stem_2p_out),1)) return out class Bottleneck(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion super(Bottleneck, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_, c2, 3, 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class BottleneckCSP(nn.Module): # CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super(BottleneckCSP, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False) self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False) self.cv4 = Conv(2 * c_, c2, 1, 1) self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3) self.act = nn.LeakyReLU(0.1, inplace=True) self.m = nn.Sequential(*[Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)]) def forward(self, x): y1 = self.cv3(self.m(self.cv1(x))) y2 = self.cv2(x) return self.cv4(self.act(self.bn(torch.cat((y1, y2), dim=1)))) class C3(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super(C3, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # act=FReLU(c2) self.m = nn.Sequential(*[Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)]) def forward(self, x): return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), dim=1)) class ShuffleV2Block(nn.Module): def __init__(self, inp, oup, stride): super(ShuffleV2Block, self).__init__() if not (1 <= stride <= 3): raise ValueError('illegal stride value') self.stride = stride branch_features = oup // 2 assert (self.stride != 1) or (inp == branch_features << 1) if self.stride > 1: self.branch1 = nn.Sequential( self.depthwise_conv(inp, inp, kernel_size=3, stride=self.stride, padding=1), nn.BatchNorm2d(inp), nn.Conv2d(inp, branch_features, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(branch_features), nn.SiLU(), ) else: self.branch1 = nn.Sequential() self.branch2 = nn.Sequential( nn.Conv2d(inp if (self.stride > 1) else branch_features, branch_features, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(branch_features), nn.SiLU(), self.depthwise_conv(branch_features, branch_features, kernel_size=3, stride=self.stride, padding=1), nn.BatchNorm2d(branch_features), nn.Conv2d(branch_features, branch_features, kernel_size=1, stride=1, padding=0, bias=False), nn.BatchNorm2d(branch_features), nn.SiLU(), ) @staticmethod def depthwise_conv(i, o, kernel_size, stride=1, padding=0, bias=False): return nn.Conv2d(i, o, kernel_size, stride, padding, bias=bias, groups=i) def forward(self, x): if self.stride == 1: x1, x2 = x.chunk(2, dim=1) out = torch.cat((x1, self.branch2(x2)), dim=1) else: out = torch.cat((self.branch1(x), self.branch2(x)), dim=1) out = channel_shuffle(out, 2) return out class BlazeBlock(nn.Module): def __init__(self, in_channels,out_channels,mid_channels=None,stride=1): super(BlazeBlock, self).__init__() mid_channels = mid_channels or in_channels assert stride in [1, 2] if stride>1: self.use_pool = True else: self.use_pool = False self.branch1 = nn.Sequential( nn.Conv2d(in_channels=in_channels,out_channels=mid_channels,kernel_size=5,stride=stride,padding=2,groups=in_channels), nn.BatchNorm2d(mid_channels), nn.Conv2d(in_channels=mid_channels,out_channels=out_channels,kernel_size=1,stride=1), nn.BatchNorm2d(out_channels), ) if self.use_pool: self.shortcut = nn.Sequential( nn.MaxPool2d(kernel_size=stride, stride=stride), nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1), nn.BatchNorm2d(out_channels), ) self.relu = nn.SiLU(inplace=True) def forward(self, x): branch1 = self.branch1(x) out = (branch1+self.shortcut(x)) if self.use_pool else (branch1+x) return self.relu(out) class DoubleBlazeBlock(nn.Module): def __init__(self,in_channels,out_channels,mid_channels=None,stride=1): super(DoubleBlazeBlock, self).__init__() mid_channels = mid_channels or in_channels assert stride in [1, 2] if stride > 1: self.use_pool = True else: self.use_pool = False self.branch1 = nn.Sequential( nn.Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=5, stride=stride,padding=2,groups=in_channels), nn.BatchNorm2d(in_channels), nn.Conv2d(in_channels=in_channels, out_channels=mid_channels, kernel_size=1, stride=1), nn.BatchNorm2d(mid_channels), nn.SiLU(inplace=True), nn.Conv2d(in_channels=mid_channels, out_channels=mid_channels, kernel_size=5, stride=1,padding=2), nn.BatchNorm2d(mid_channels), nn.Conv2d(in_channels=mid_channels, out_channels=out_channels, kernel_size=1, stride=1), nn.BatchNorm2d(out_channels), ) if self.use_pool: self.shortcut = nn.Sequential( nn.MaxPool2d(kernel_size=stride, stride=stride), nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1), nn.BatchNorm2d(out_channels), ) self.relu = nn.SiLU(inplace=True) def forward(self, x): branch1 = self.branch1(x) out = (branch1 + self.shortcut(x)) if self.use_pool else (branch1 + x) return self.relu(out) class SPP(nn.Module): # Spatial pyramid pooling layer used in YOLOv3-SPP def __init__(self, c1, c2, k=(5, 9, 13)): super(SPP, self).__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1) self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) def forward(self, x): x = self.cv1(x) return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1)) class SPPF(nn.Module): # Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13)) super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * 4, c2, 1, 1) self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2) def forward(self, x): x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning y1 = self.m(x) y2 = self.m(y1) return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1)) class Focus(nn.Module): # Focus wh information into c-space def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups super(Focus, self).__init__() self.conv = Conv(c1 * 4, c2, k, s, p, g, act) # self.contract = Contract(gain=2) def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2) return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1)) # return self.conv(self.contract(x)) class Contract(nn.Module): # Contract width-height into channels, i.e. x(1,64,80,80) to x(1,256,40,40) def __init__(self, gain=2): super().__init__() self.gain = gain def forward(self, x): N, C, H, W = x.size() # assert (H / s == 0) and (W / s == 0), 'Indivisible gain' s = self.gain x = x.view(N, C, H // s, s, W // s, s) # x(1,64,40,2,40,2) x = x.permute(0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40) return x.view(N, C * s * s, H // s, W // s) # x(1,256,40,40) class Expand(nn.Module): # Expand channels into width-height, i.e. x(1,64,80,80) to x(1,16,160,160) def __init__(self, gain=2): super().__init__() self.gain = gain def forward(self, x): N, C, H, W = x.size() # assert C / s ** 2 == 0, 'Indivisible gain' s = self.gain x = x.view(N, s, s, C // s ** 2, H, W) # x(1,2,2,16,80,80) x = x.permute(0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2) return x.view(N, C // s ** 2, H * s, W * s) # x(1,16,160,160) class Concat(nn.Module): # Concatenate a list of tensors along dimension def __init__(self, dimension=1): super(Concat, self).__init__() self.d = dimension def forward(self, x): return torch.cat(x, self.d) class NMS(nn.Module): # Non-Maximum Suppression (NMS) module conf = 0.25 # confidence threshold iou = 0.45 # IoU threshold classes = None # (optional list) filter by class def __init__(self): super(NMS, self).__init__() def forward(self, x): return non_max_suppression(x[0], conf_thres=self.conf, iou_thres=self.iou, classes=self.classes) class autoShape(nn.Module): # input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS img_size = 640 # inference size (pixels) conf = 0.25 # NMS confidence threshold iou = 0.45 # NMS IoU threshold classes = None # (optional list) filter by class def __init__(self, model): super(autoShape, self).__init__() self.model = model.eval() def autoshape(self): print('autoShape already enabled, skipping... ') # model already converted to model.autoshape() return self def forward(self, imgs, size=640, augment=False, profile=False): # Inference from various sources. For height=720, width=1280, RGB images example inputs are: # filename: imgs = 'data/samples/zidane.jpg' # URI: = 'https://github.com/ultralytics/yolov5/releases/download/v1.0/zidane.jpg' # OpenCV: = cv2.imread('image.jpg')[:,:,::-1] # HWC BGR to RGB x(720,1280,3) # PIL: = Image.open('image.jpg') # HWC x(720,1280,3) # numpy: = np.zeros((720,1280,3)) # HWC # torch: = torch.zeros(16,3,720,1280) # BCHW # multiple: = [Image.open('image1.jpg'), Image.open('image2.jpg'), ...] # list of images p = next(self.model.parameters()) # for device and type if isinstance(imgs, torch.Tensor): # torch return self.model(imgs.to(p.device).type_as(p), augment, profile) # inference # Pre-process n, imgs = (len(imgs), imgs) if isinstance(imgs, list) else (1, [imgs]) # number of images, list of images shape0, shape1 = [], [] # image and inference shapes for i, im in enumerate(imgs): if isinstance(im, str): # filename or uri im = Image.open(requests.get(im, stream=True).raw if im.startswith('http') else im) # open im = np.array(im) # to numpy if im.shape[0] < 5: # image in CHW im = im.transpose((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1) im = im[:, :, :3] if im.ndim == 3 else np.tile(im[:, :, None], 3) # enforce 3ch input s = im.shape[:2] # HWC shape0.append(s) # image shape g = (size / max(s)) # gain shape1.append([y * g for y in s]) imgs[i] = im # update shape1 = [make_divisible(x, int(self.stride.max())) for x in np.stack(shape1, 0).max(0)] # inference shape x = [letterbox(im, new_shape=shape1, auto=False)[0] for im in imgs] # pad x = np.stack(x, 0) if n > 1 else x[0][None] # stack x = np.ascontiguousarray(x.transpose((0, 3, 1, 2))) # BHWC to BCHW x = torch.from_numpy(x).to(p.device).type_as(p) / 255. # uint8 to fp16/32 # Inference with torch.no_grad(): y = self.model(x, augment, profile)[0] # forward y = non_max_suppression(y, conf_thres=self.conf, iou_thres=self.iou, classes=self.classes) # NMS # Post-process for i in range(n): scale_coords(shape1, y[i][:, :4], shape0[i]) return Detections(imgs, y, self.names) class Detections: # detections class for YOLOv5 inference results def __init__(self, imgs, pred, names=None): super(Detections, self).__init__() d = pred[0].device # device gn = [torch.tensor([*[im.shape[i] for i in [1, 0, 1, 0]], 1., 1.], device=d) for im in imgs] # normalizations self.imgs = imgs # list of images as numpy arrays self.pred = pred # list of tensors pred[0] = (xyxy, conf, cls) self.names = names # class names self.xyxy = pred # xyxy pixels self.xywh = [xyxy2xywh(x) for x in pred] # xywh pixels self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)] # xyxy normalized self.xywhn = [x / g for x, g in zip(self.xywh, gn)] # xywh normalized self.n = len(self.pred) def display(self, pprint=False, show=False, save=False, render=False): colors = color_list() for i, (img, pred) in enumerate(zip(self.imgs, self.pred)): str = f'Image {i + 1}/{len(self.pred)}: {img.shape[0]}x{img.shape[1]} ' if pred is not None: for c in pred[:, -1].unique(): n = (pred[:, -1] == c).sum() # detections per class if len(self.names) > int(c): str += f'{n} {self.names[int(c)]}s, ' # add to string if show or save or render: img = Image.fromarray(img.astype(np.uint8)) if isinstance(img, np.ndarray) else img # from np for *box, conf, cls in pred: # xyxy, confidence, class # str += '%s %.2f, ' % (names[int(cls)], conf) # label ImageDraw.Draw(img).rectangle(box, width=4, outline=colors[int(cls) % 10]) # plot if pprint: print(str) if show: img.show(f'Image {i}') # show if save: f = f'results{i}.jpg' str += f"saved to '{f}'" img.save(f) # save if render: self.imgs[i] = np.asarray(img) def print(self): self.display(pprint=True) # print results def show(self): self.display(show=True) # show results def save(self): self.display(save=True) # save results def render(self): self.display(render=True) # render results return self.imgs def __len__(self): return self.n def tolist(self): # return a list of Detections objects, i.e. 'for result in results.tolist():' x = [Detections([self.imgs[i]], [self.pred[i]], self.names) for i in range(self.n)] for d in x: for k in ['imgs', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']: setattr(d, k, getattr(d, k)[0]) # pop out of list return x class Classify(nn.Module): # Classification head, i.e. x(b,c1,20,20) to x(b,c2) def __init__(self, c1, c2, k=1, s=1, p=None, g=1): # ch_in, ch_out, kernel, stride, padding, groups super(Classify, self).__init__() self.aap = nn.AdaptiveAvgPool2d(1) # to x(b,c1,1,1) self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g) # to x(b,c2,1,1) self.flat = nn.Flatten() def forward(self, x): z = torch.cat([self.aap(y) for y in (x if isinstance(x, list) else [x])], 1) # cat if list return self.flat(self.conv(z)) # flatten to x(b,c2)