Spaces:
Build error
Build error
# Ultralytics YOLO 🚀, GPL-3.0 license | |
import contextlib | |
from copy import deepcopy | |
import thop | |
import torch | |
import torch.nn as nn | |
from ultralytics.nn.modules import (C1, C2, C3, C3TR, SPP, SPPF, Bottleneck, BottleneckCSP, C2f, C3Ghost, C3x, Classify, | |
Concat, Conv, ConvTranspose, Detect, DWConv, DWConvTranspose2d, Ensemble, Focus, | |
GhostBottleneck, GhostConv, Segment) | |
from ultralytics.yolo.utils import DEFAULT_CONFIG_DICT, DEFAULT_CONFIG_KEYS, LOGGER, colorstr, yaml_load | |
from ultralytics.yolo.utils.checks import check_yaml | |
from ultralytics.yolo.utils.torch_utils import (fuse_conv_and_bn, initialize_weights, intersect_dicts, make_divisible, | |
model_info, scale_img, time_sync) | |
class BaseModel(nn.Module): | |
''' | |
The BaseModel class is a base class for all the models in the Ultralytics YOLO family. | |
''' | |
def forward(self, x, profile=False, visualize=False): | |
""" | |
> `forward` is a wrapper for `_forward_once` that runs the model on a single scale | |
Args: | |
x: the input image | |
profile: whether to profile the model. Defaults to False | |
visualize: if True, will return the intermediate feature maps. Defaults to False | |
Returns: | |
The output of the network. | |
""" | |
return self._forward_once(x, profile, visualize) | |
def _forward_once(self, x, profile=False, visualize=False): | |
""" | |
> Forward pass of the network | |
Args: | |
x: input to the model | |
profile: if True, the time taken for each layer will be printed. Defaults to False | |
visualize: If True, it will save the feature maps of the model. Defaults to False | |
Returns: | |
The last layer of the model. | |
""" | |
y, dt = [], [] # outputs | |
for m in self.model: | |
if m.f != -1: # if not from previous layer | |
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers | |
if profile: | |
self._profile_one_layer(m, x, dt) | |
x = m(x) # run | |
y.append(x if m.i in self.save else None) # save output | |
if visualize: | |
pass | |
# TODO: feature_visualization(x, m.type, m.i, save_dir=visualize) | |
return x | |
def _profile_one_layer(self, m, x, dt): | |
""" | |
It takes a model, an input, and a list of times, and it profiles the model on the input, appending | |
the time to the list | |
Args: | |
m: the model | |
x: the input image | |
dt: list of time taken for each layer | |
""" | |
c = m == self.model[-1] # is final layer, copy input as inplace fix | |
o = thop.profile(m, inputs=(x.copy() if c else x,), verbose=False)[0] / 1E9 * 2 if thop else 0 # FLOPs | |
t = time_sync() | |
for _ in range(10): | |
m(x.copy() if c else x) | |
dt.append((time_sync() - t) * 100) | |
if m == self.model[0]: | |
LOGGER.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s} module") | |
LOGGER.info(f'{dt[-1]:10.2f} {o:10.2f} {m.np:10.0f} {m.type}') | |
if c: | |
LOGGER.info(f"{sum(dt):10.2f} {'-':>10s} {'-':>10s} Total") | |
def fuse(self): | |
""" | |
> It takes a model and fuses the Conv2d() and BatchNorm2d() layers into a single layer | |
Returns: | |
The model is being returned. | |
""" | |
LOGGER.info('Fusing layers... ') | |
for m in self.model.modules(): | |
if isinstance(m, (Conv, DWConv)) and hasattr(m, 'bn'): | |
m.conv = fuse_conv_and_bn(m.conv, m.bn) # update conv | |
delattr(m, 'bn') # remove batchnorm | |
m.forward = m.forward_fuse # update forward | |
self.info() | |
return self | |
def info(self, verbose=False, imgsz=640): | |
""" | |
Prints model information | |
Args: | |
verbose: if True, prints out the model information. Defaults to False | |
imgsz: the size of the image that the model will be trained on. Defaults to 640 | |
""" | |
model_info(self, verbose, imgsz) | |
def _apply(self, fn): | |
""" | |
`_apply()` is a function that applies a function to all the tensors in the model that are not | |
parameters or registered buffers | |
Args: | |
fn: the function to apply to the model | |
Returns: | |
A model that is a Detect() object. | |
""" | |
self = super()._apply(fn) | |
m = self.model[-1] # Detect() | |
if isinstance(m, (Detect, Segment)): | |
m.stride = fn(m.stride) | |
m.anchors = fn(m.anchors) | |
m.strides = fn(m.strides) | |
return self | |
def load(self, weights): | |
""" | |
> This function loads the weights of the model from a file | |
Args: | |
weights: The weights to load into the model. | |
""" | |
# Force all tasks to implement this function | |
raise NotImplementedError("This function needs to be implemented by derived classes!") | |
class DetectionModel(BaseModel): | |
# YOLOv5 detection model | |
def __init__(self, cfg='yolov8n.yaml', ch=3, nc=None, verbose=True): # model, input channels, number of classes | |
super().__init__() | |
self.yaml = cfg if isinstance(cfg, dict) else yaml_load(check_yaml(cfg), append_filename=True) # cfg dict | |
# Define model | |
ch = self.yaml['ch'] = self.yaml.get('ch', ch) # input channels | |
if nc and nc != self.yaml['nc']: | |
LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}") | |
self.yaml['nc'] = nc # override yaml value | |
self.model, self.save = parse_model(deepcopy(self.yaml), ch=[ch], verbose=verbose) # model, savelist | |
self.names = {i: f'{i}' for i in range(self.yaml['nc'])} # default names dict | |
self.inplace = self.yaml.get('inplace', True) | |
# Build strides | |
m = self.model[-1] # Detect() | |
if isinstance(m, (Detect, Segment)): | |
s = 256 # 2x min stride | |
m.inplace = self.inplace | |
forward = lambda x: self.forward(x)[0] if isinstance(m, Segment) else self.forward(x) | |
m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))]) # forward | |
self.stride = m.stride | |
m.bias_init() # only run once | |
# Init weights, biases | |
initialize_weights(self) | |
if verbose: | |
self.info() | |
LOGGER.info('') | |
def forward(self, x, augment=False, profile=False, visualize=False): | |
if augment: | |
return self._forward_augment(x) # augmented inference, None | |
return self._forward_once(x, profile, visualize) # single-scale inference, train | |
def _forward_augment(self, x): | |
img_size = x.shape[-2:] # height, width | |
s = [1, 0.83, 0.67] # scales | |
f = [None, 3, None] # flips (2-ud, 3-lr) | |
y = [] # outputs | |
for si, fi in zip(s, f): | |
xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max())) | |
yi = self._forward_once(xi)[0] # forward | |
# cv2.imwrite(f'img_{si}.jpg', 255 * xi[0].cpu().numpy().transpose((1, 2, 0))[:, :, ::-1]) # save | |
yi = self._descale_pred(yi, fi, si, img_size) | |
y.append(yi) | |
y = self._clip_augmented(y) # clip augmented tails | |
return torch.cat(y, -1), None # augmented inference, train | |
def _descale_pred(p, flips, scale, img_size, dim=1): | |
# de-scale predictions following augmented inference (inverse operation) | |
p[:, :4] /= scale # de-scale | |
x, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim) | |
if flips == 2: | |
y = img_size[0] - y # de-flip ud | |
elif flips == 3: | |
x = img_size[1] - x # de-flip lr | |
return torch.cat((x, y, wh, cls), dim) | |
def _clip_augmented(self, y): | |
# Clip YOLOv5 augmented inference tails | |
nl = self.model[-1].nl # number of detection layers (P3-P5) | |
g = sum(4 ** x for x in range(nl)) # grid points | |
e = 1 # exclude layer count | |
i = (y[0].shape[-1] // g) * sum(4 ** x for x in range(e)) # indices | |
y[0] = y[0][..., :-i] # large | |
i = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e)) # indices | |
y[-1] = y[-1][..., i:] # small | |
return y | |
def load(self, weights, verbose=True): | |
csd = weights.float().state_dict() # checkpoint state_dict as FP32 | |
csd = intersect_dicts(csd, self.state_dict()) # intersect | |
self.load_state_dict(csd, strict=False) # load | |
if verbose: | |
LOGGER.info(f'Transferred {len(csd)}/{len(self.model.state_dict())} items from pretrained weights') | |
class SegmentationModel(DetectionModel): | |
# YOLOv5 segmentation model | |
def __init__(self, cfg='yolov8n-seg.yaml', ch=3, nc=None, verbose=True): | |
super().__init__(cfg, ch, nc, verbose) | |
class ClassificationModel(BaseModel): | |
# YOLOv5 classification model | |
def __init__(self, | |
cfg=None, | |
model=None, | |
ch=3, | |
nc=1000, | |
cutoff=10, | |
verbose=True): # yaml, model, number of classes, cutoff index | |
super().__init__() | |
self._from_detection_model(model, nc, cutoff) if model is not None else self._from_yaml(cfg, ch, nc, verbose) | |
def _from_detection_model(self, model, nc=1000, cutoff=10): | |
# Create a YOLOv5 classification model from a YOLOv5 detection model | |
from ultralytics.nn.autobackend import AutoBackend | |
if isinstance(model, AutoBackend): | |
model = model.model # unwrap DetectMultiBackend | |
model.model = model.model[:cutoff] # backbone | |
m = model.model[-1] # last layer | |
ch = m.conv.in_channels if hasattr(m, 'conv') else m.cv1.conv.in_channels # ch into module | |
c = Classify(ch, nc) # Classify() | |
c.i, c.f, c.type = m.i, m.f, 'models.common.Classify' # index, from, type | |
model.model[-1] = c # replace | |
self.model = model.model | |
self.stride = model.stride | |
self.save = [] | |
self.nc = nc | |
def _from_yaml(self, cfg, ch, nc, verbose): | |
self.yaml = cfg if isinstance(cfg, dict) else yaml_load(check_yaml(cfg), append_filename=True) # cfg dict | |
# Define model | |
ch = self.yaml['ch'] = self.yaml.get('ch', ch) # input channels | |
if nc and nc != self.yaml['nc']: | |
LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}") | |
self.yaml['nc'] = nc # override yaml value | |
self.model, self.save = parse_model(deepcopy(self.yaml), ch=[ch], verbose=verbose) # model, savelist | |
self.names = {i: f'{i}' for i in range(self.yaml['nc'])} # default names dict | |
self.info() | |
def load(self, weights): | |
model = weights["model"] if isinstance(weights, dict) else weights # torchvision models are not dicts | |
csd = model.float().state_dict() | |
csd = intersect_dicts(csd, self.state_dict()) # intersect | |
self.load_state_dict(csd, strict=False) # load | |
def reshape_outputs(model, nc): | |
# Update a TorchVision classification model to class count 'n' if required | |
name, m = list((model.model if hasattr(model, 'model') else model).named_children())[-1] # last module | |
if isinstance(m, Classify): # YOLO Classify() head | |
if m.linear.out_features != nc: | |
m.linear = nn.Linear(m.linear.in_features, nc) | |
elif isinstance(m, nn.Linear): # ResNet, EfficientNet | |
if m.out_features != nc: | |
setattr(model, name, nn.Linear(m.in_features, nc)) | |
elif isinstance(m, nn.Sequential): | |
types = [type(x) for x in m] | |
if nn.Linear in types: | |
i = types.index(nn.Linear) # nn.Linear index | |
if m[i].out_features != nc: | |
m[i] = nn.Linear(m[i].in_features, nc) | |
elif nn.Conv2d in types: | |
i = types.index(nn.Conv2d) # nn.Conv2d index | |
if m[i].out_channels != nc: | |
m[i] = nn.Conv2d(m[i].in_channels, nc, m[i].kernel_size, m[i].stride, bias=m[i].bias is not None) | |
# Functions ------------------------------------------------------------------------------------------------------------ | |
def attempt_load_weights(weights, device=None, inplace=True, fuse=False): | |
# Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a | |
from ultralytics.yolo.utils.downloads import attempt_download | |
model = Ensemble() | |
for w in weights if isinstance(weights, list) else [weights]: | |
ckpt = torch.load(attempt_download(w), map_location='cpu') # load | |
args = {**DEFAULT_CONFIG_DICT, **ckpt['train_args']} # combine model and default args, preferring model args | |
ckpt = (ckpt.get('ema') or ckpt['model']).to(device).float() # FP32 model | |
# Model compatibility updates | |
ckpt.args = {k: v for k, v in args.items() if k in DEFAULT_CONFIG_KEYS} # attach args to model | |
ckpt.pt_path = weights # attach *.pt file path to model | |
if not hasattr(ckpt, 'stride'): | |
ckpt.stride = torch.tensor([32.]) | |
# Append | |
model.append(ckpt.fuse().eval() if fuse and hasattr(ckpt, 'fuse') else ckpt.eval()) # model in eval mode | |
# Module compatibility updates | |
for m in model.modules(): | |
t = type(m) | |
if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Segment): | |
m.inplace = inplace # torch 1.7.0 compatibility | |
elif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'): | |
m.recompute_scale_factor = None # torch 1.11.0 compatibility | |
# Return model | |
if len(model) == 1: | |
return model[-1] | |
# Return ensemble | |
print(f'Ensemble created with {weights}\n') | |
for k in 'names', 'nc', 'yaml': | |
setattr(model, k, getattr(model[0], k)) | |
model.stride = model[torch.argmax(torch.tensor([m.stride.max() for m in model])).int()].stride # max stride | |
assert all(model[0].nc == m.nc for m in model), f'Models have different class counts: {[m.nc for m in model]}' | |
return model | |
def attempt_load_one_weight(weight, device=None, inplace=True, fuse=False): | |
# Loads a single model weights | |
from ultralytics.yolo.utils.downloads import attempt_download | |
ckpt = torch.load(attempt_download(weight), map_location='cpu') # load | |
args = {**DEFAULT_CONFIG_DICT, **ckpt['train_args']} # combine model and default args, preferring model args | |
model = (ckpt.get('ema') or ckpt['model']).to(device).float() # FP32 model | |
# Model compatibility updates | |
model.args = {k: v for k, v in args.items() if k in DEFAULT_CONFIG_KEYS} # attach args to model | |
model.pt_path = weight # attach *.pt file path to model | |
if not hasattr(model, 'stride'): | |
model.stride = torch.tensor([32.]) | |
model = model.fuse().eval() if fuse and hasattr(model, 'fuse') else model.eval() # model in eval mode | |
# Module compatibility updates | |
for m in model.modules(): | |
t = type(m) | |
if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Segment): | |
m.inplace = inplace # torch 1.7.0 compatibility | |
elif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'): | |
m.recompute_scale_factor = None # torch 1.11.0 compatibility | |
# Return model and ckpt | |
return model, ckpt | |
def parse_model(d, ch, verbose=True): # model_dict, input_channels(3) | |
# Parse a YOLO model.yaml dictionary | |
if verbose: | |
LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10} {'module':<45}{'arguments':<30}") | |
nc, gd, gw, act = d['nc'], d['depth_multiple'], d['width_multiple'], d.get('activation') | |
if act: | |
Conv.default_act = eval(act) # redefine default activation, i.e. Conv.default_act = nn.SiLU() | |
if verbose: | |
LOGGER.info(f"{colorstr('activation:')} {act}") # print | |
layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out | |
for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']): # from, number, module, args | |
m = eval(m) if isinstance(m, str) else m # eval strings | |
for j, a in enumerate(args): | |
with contextlib.suppress(NameError): | |
args[j] = eval(a) if isinstance(a, str) else a # eval strings | |
n = n_ = max(round(n * gd), 1) if n > 1 else n # depth gain | |
if m in { | |
Classify, Conv, ConvTranspose, GhostConv, Bottleneck, GhostBottleneck, SPP, SPPF, DWConv, Focus, | |
BottleneckCSP, C1, C2, C2f, C3, C3TR, C3Ghost, nn.ConvTranspose2d, DWConvTranspose2d, C3x}: | |
c1, c2 = ch[f], args[0] | |
if c2 != nc: # if c2 not equal to number of classes (i.e. for Classify() output) | |
c2 = make_divisible(c2 * gw, 8) | |
args = [c1, c2, *args[1:]] | |
if m in {BottleneckCSP, C1, C2, C2f, C3, C3TR, C3Ghost, C3x}: | |
args.insert(2, n) # number of repeats | |
n = 1 | |
elif m is nn.BatchNorm2d: | |
args = [ch[f]] | |
elif m is Concat: | |
c2 = sum(ch[x] for x in f) | |
elif m in {Detect, Segment}: | |
args.append([ch[x] for x in f]) | |
if m is Segment: | |
args[2] = make_divisible(args[2] * gw, 8) | |
else: | |
c2 = ch[f] | |
m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # module | |
t = str(m)[8:-2].replace('__main__.', '') # module type | |
m.np = sum(x.numel() for x in m_.parameters()) # number params | |
m_.i, m_.f, m_.type = i, f, t # attach index, 'from' index, type | |
if verbose: | |
LOGGER.info(f'{i:>3}{str(f):>20}{n_:>3}{m.np:10.0f} {t:<45}{str(args):<30}') # print | |
save.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist | |
layers.append(m_) | |
if i == 0: | |
ch = [] | |
ch.append(c2) | |
return nn.Sequential(*layers), sorted(save) | |