jpterry's picture
got rid of print statements
98f445c
from transformers import PretrainedConfig, PreTrainedModel
from typing import List
import copy
import math
import warnings
from dataclasses import dataclass
from functools import partial
import sys
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
import torch
from torch import Tensor, nn
from torchvision.models._utils import _make_divisible
from torchvision.ops import StochasticDepth
sys.path.insert(1, "../")
from utils.vision_modifications import Conv2dNormActivation, SqueezeExcitation
@dataclass
class _MBConvConfig:
expand_ratio: float
kernel: int
stride: int
input_channels: int
out_channels: int
num_layers: int
block: Callable[..., nn.Module]
@staticmethod
def adjust_channels(
channels: int, width_mult: float, min_value: Optional[int] = None
) -> int:
return _make_divisible(channels * width_mult, 8, min_value)
class MBConvConfig(_MBConvConfig):
# Stores information listed at Table 1 of the EfficientNet paper & Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
width_mult: float = 1.0,
depth_mult: float = 1.0,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
input_channels = self.adjust_channels(input_channels, width_mult)
out_channels = self.adjust_channels(out_channels, width_mult)
num_layers = self.adjust_depth(num_layers, depth_mult)
if block is None:
block = MBConv
super().__init__(
expand_ratio,
kernel,
stride,
input_channels,
out_channels,
num_layers,
block,
)
@staticmethod
def adjust_depth(num_layers: int, depth_mult: float):
return int(math.ceil(num_layers * depth_mult))
class FusedMBConvConfig(_MBConvConfig):
# Stores information listed at Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
if block is None:
block = FusedMBConv
super().__init__(
expand_ratio,
kernel,
stride,
input_channels,
out_channels,
num_layers,
block,
)
class MBConv(nn.Module):
def __init__(
self,
cnf: MBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
se_layer: Callable[..., nn.Module] = SqueezeExcitation,
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = (
cnf.stride == 1 and cnf.input_channels == cnf.out_channels
)
layers: List[nn.Module] = []
activation_layer = nn.SiLU
# expand
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# depthwise
layers.append(
Conv2dNormActivation(
expanded_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
groups=expanded_channels,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# squeeze and excitation
squeeze_channels = max(1, cnf.input_channels // 4)
layers.append(
se_layer(
expanded_channels,
squeeze_channels,
activation=partial(nn.SiLU, inplace=True),
)
)
# project
layers.append(
Conv2dNormActivation(
expanded_channels,
cnf.out_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=None,
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
class FusedMBConv(nn.Module):
def __init__(
self,
cnf: FusedMBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = (
cnf.stride == 1 and cnf.input_channels == cnf.out_channels
)
layers: List[nn.Module] = []
activation_layer = nn.SiLU
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
# fused expand
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# project
layers.append(
Conv2dNormActivation(
expanded_channels,
cnf.out_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=None,
)
)
else:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
cnf.out_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
class EfficientNetConfig(PretrainedConfig):
# model_type = "efficientnet"
model_type = "efficientnet_61_planet_detection"
def __init__(
self,
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float=0.25,
num_channels: int = 61,
stochastic_depth_prob: float = 0.2,
num_classes: int = 2,
norm_layer: Optional[Callable[..., nn.Module]] = None,
# last_channel: Optional[int] = None,
size: str='v2_s',
width_mult: float = 1.0,
depth_mult: float = 1.0,
**kwargs: Any,
) -> None:
"""
EfficientNet V1 and V2 main class
Args:
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure
dropout (float): The droupout probability
stochastic_depth_prob (float): The stochastic depth probability
num_classes (int): Number of classes
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
last_channel (int): The number of channels on the penultimate layer
"""
# self.model = EfficientNet(
# dropout=dropout,
# num_channels=num_channels,
# num_classes=num_classes,
# size=size,
# stochastic_depth_prob=stochastic_depth_prob,
# width_mult=width_mult,
# depth_mult=depth_mult,
# )
#
self.dropout=dropout
self.num_channels=num_channels
self.num_classes=num_classes
self.size=size
self.stochastic_depth_prob=stochastic_depth_prob
self.width_mult=width_mult
self.depth_mult=depth_mult
super().__init__(**kwargs)
class EfficientNetPreTrained(PreTrainedModel):
config_class = EfficientNetConfig
def __init__(
self,
config
):
super().__init__(config)
self.model = EfficientNet(dropout=config.dropout,
num_channels=config.num_channels,
num_classes=config.num_classes,
size=config.size,
stochastic_depth_prob=config.stochastic_depth_prob,
width_mult=config.width_mult,
depth_mult=config.depth_mult,)
def forward(self, tensor):
return self.model.forward(tensor)
def _init_weights(self, module):
# initialize weights before loading
# not all will have weights and biases
try:
module.weight.data.normal_(mean=0.0)
except Exception as e:
# print('weight 1', e)
try:
module.weight.data.fill_(1.0)
except Exception as e:
# print('weight 2', e)
_ = None
try:
module.bias.data.zero_()
except AttributeError as e:
# print('bias', e)
_ = None
class EfficientNet(nn.Module):
def __init__(
self,
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float=0.25,
num_channels: int = 61,
stochastic_depth_prob: float = 0.2,
num_classes: int = 2,
norm_layer: Optional[Callable[..., nn.Module]] = None,
# last_channel: Optional[int] = None,
size: str='v2_s',
width_mult: float = 1.0,
depth_mult: float = 1.0,
**kwargs: Any,
) -> None:
"""
EfficientNet V1 and V2 main class
Args:
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure
dropout (float): The droupout probability
stochastic_depth_prob (float): The stochastic depth probability
num_classes (int): Number of classes
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
last_channel (int): The number of channels on the penultimate layer
"""
super().__init__()
# _log_api_usage_once(self)
inverted_residual_setting, last_channel = _efficientnet_conf(
"efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult
)
if not inverted_residual_setting:
raise ValueError("The inverted_residual_setting should not be empty")
elif not (
isinstance(inverted_residual_setting, Sequence)
and all([isinstance(s, _MBConvConfig) for s in inverted_residual_setting])
):
raise TypeError(
"The inverted_residual_setting should be List[MBConvConfig]"
)
if "block" in kwargs:
warnings.warn(
"The parameter 'block' is deprecated since 0.13 and will be removed 0.15. "
"Please pass this information on 'MBConvConfig.block' instead."
)
if kwargs["block"] is not None:
for s in inverted_residual_setting:
if isinstance(s, MBConvConfig):
s.block = kwargs["block"]
if norm_layer is None:
norm_layer = nn.BatchNorm2d
layers: List[nn.Module] = []
# building first layer
firstconv_output_channels = inverted_residual_setting[0].input_channels
layers.append(
Conv2dNormActivation(
num_channels,
firstconv_output_channels,
kernel_size=3,
stride=2,
norm_layer=norm_layer,
activation_layer=nn.SiLU,
)
)
# building inverted residual blocks
total_stage_blocks = sum(cnf.num_layers for cnf in inverted_residual_setting)
stage_block_id = 0
for cnf in inverted_residual_setting:
stage: List[nn.Module] = []
for _ in range(cnf.num_layers):
# copy to avoid modifications. shallow copy is enough
block_cnf = copy.copy(cnf)
# overwrite info if not the first conv in the stage
if stage:
block_cnf.input_channels = block_cnf.out_channels
block_cnf.stride = 1
# adjust stochastic depth probability based on the depth of the stage block
sd_prob = (
stochastic_depth_prob * float(stage_block_id) / total_stage_blocks
)
stage.append(block_cnf.block(block_cnf, sd_prob, norm_layer))
stage_block_id += 1
layers.append(nn.Sequential(*stage))
# building last several layers
lastconv_input_channels = inverted_residual_setting[-1].out_channels
lastconv_output_channels = (
last_channel if last_channel is not None else 4 * lastconv_input_channels
)
layers.append(
Conv2dNormActivation(
lastconv_input_channels,
lastconv_output_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.SiLU,
)
)
self.features = nn.Sequential(*layers)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Sequential(
nn.Dropout(p=dropout, inplace=True),
nn.Linear(lastconv_output_channels, num_classes),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
init_range = 1.0 / math.sqrt(m.out_features)
nn.init.uniform_(m.weight, -init_range, init_range)
nn.init.zeros_(m.bias)
# super().__init__(**kwargs)
def _forward_impl(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
# def _efficientnet(
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
# dropout: float,
# last_channel: Optional[int],
# weights=None,
# num_channels: int = 61,
# stochastic_depth_prob: float = 0.2,
# progress: bool = True,
# num_classes: int = 2,
# **kwargs: Any,
# ) -> EfficientNetCongig:
# model = EfficientNetCongif(
# inverted_residual_setting,
# dropout,
# num_classes=num_classes,
# num_channels=num_channels,
# stochastic_depth_prob=stochastic_depth_prob,
# last_channel=last_channel,
# **kwargs,
# )
# return model
def _efficientnet_conf(
arch: str,
**kwargs: Any,
) -> Tuple[Sequence[Union[MBConvConfig, FusedMBConvConfig]], Optional[int]]:
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]]
if arch.startswith("efficientnet_b"):
bneck_conf = partial(
MBConvConfig,
width_mult=kwargs.pop("width_mult"),
depth_mult=kwargs.pop("depth_mult"),
)
inverted_residual_setting = [
bneck_conf(1, 3, 1, 32, 16, 1),
bneck_conf(6, 3, 2, 16, 24, 2),
bneck_conf(6, 5, 2, 24, 40, 2),
bneck_conf(6, 3, 2, 40, 80, 3),
bneck_conf(6, 5, 1, 80, 112, 3),
bneck_conf(6, 5, 2, 112, 192, 4),
bneck_conf(6, 3, 1, 192, 320, 1),
]
last_channel = None
elif arch.startswith("efficientnet_v2_s"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 2),
FusedMBConvConfig(4, 3, 2, 24, 48, 4),
FusedMBConvConfig(4, 3, 2, 48, 64, 4),
MBConvConfig(4, 3, 2, 64, 128, 6),
MBConvConfig(6, 3, 1, 128, 160, 9),
MBConvConfig(6, 3, 2, 160, 256, 15),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_m"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 3),
FusedMBConvConfig(4, 3, 2, 24, 48, 5),
FusedMBConvConfig(4, 3, 2, 48, 80, 5),
MBConvConfig(4, 3, 2, 80, 160, 7),
MBConvConfig(6, 3, 1, 160, 176, 14),
MBConvConfig(6, 3, 2, 176, 304, 18),
MBConvConfig(6, 3, 1, 304, 512, 5),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_l"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 32, 32, 4),
FusedMBConvConfig(4, 3, 2, 32, 64, 7),
FusedMBConvConfig(4, 3, 2, 64, 96, 7),
MBConvConfig(4, 3, 2, 96, 192, 10),
MBConvConfig(6, 3, 1, 192, 224, 19),
MBConvConfig(6, 3, 2, 224, 384, 25),
MBConvConfig(6, 3, 1, 384, 640, 7),
]
last_channel = 1280
else:
raise ValueError(f"Unsupported model type {arch}")
return inverted_residual_setting, last_channel
# def create_an_efficientnet(
# num_channels: int = 61,
# size: str = "v2_s",
# width_mult: float = 1.0,
# depth_mult: float = 1.0,
# dropout: float = 0.25,
# stochastic_depth_prob: float = 0.2,
# num_classes: int = 2,
# **kwargs,
# ):
# """Makes an EfficientNet of a given size and set of parameters"""
# inverted_residual_setting, last_channel = _efficientnet_conf(
# "efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult
# )
# model = _efficientnet(
# inverted_residual_setting,
# dropout,
# last_channel,
# weights=None,
# num_classes=num_classes,
# num_channels=num_channels,
# stochastic_depth_prob=stochastic_depth_prob,
# progress=True,
# **kwargs,
# )
# return model