FBAGSTM's picture
STM32 AI Experimentation Hub
747451d
# /*---------------------------------------------------------------------------------------------
# * Copyright (c) 2022-2023 STMicroelectronics.
# * All rights reserved.
# *
# * This software is licensed under terms that can be found in the LICENSE file in
# * the root directory of this software component.
# * If no LICENSE file comes with this software, it is provided AS-IS.
# *--------------------------------------------------------------------------------------------*/
from onnxruntime.quantization import QuantType, CalibrationMethod
import keras
import onnx
from onnx import numpy_helper
import os
import numpy as np
from omegaconf import DictConfig
import tensorflow
dict_types = {"Int16": QuantType.QInt16,
"UInt16": QuantType.QUInt16,
"Int8": QuantType.QInt8,
"UInt8": QuantType.QUInt8,
"Int4": QuantType.QInt4,
"UInt4": QuantType.QUInt4
}
def get_weights_activations_quant_type(cfg: DictConfig):
"""
Converts bit_width type string in onnx type
Inputs:
cfg: dict of input parameters
Returns:
weight, activation type
"""
# get Onnx type for weights and activations: Int4, Int8, Int16
if cfg.quantization.onnx_quant_parameters:
if cfg.quantization.onnx_quant_parameters.weightType:
weight_type = dict_types[cfg.quantization.onnx_quant_parameters.weightType]
else: weight_type = QuantType.QInt8
if cfg.quantization.onnx_quant_parameters.activType:
activation_type = dict_types[cfg.quantization.onnx_quant_parameters.activType]
else: activation_type = QuantType.QInt8
else:
weight_type = QuantType.QInt8
activation_type = QuantType.QInt8
return weight_type, activation_type
def get_calibration_method(cfg: DictConfig):
"""
Converts calibration method from string in onnx class type
Inputs:
cfg: dict of input parameters
Returns:
calibration_method
"""
if cfg.quantization.onnx_quant_parameters:
calibration_param = cfg.quantization.onnx_quant_parameters.calibrate_method
else: calibration_param = None
if calibration_param is None:
calibration_method = CalibrationMethod.MinMax
elif calibration_param == "MinMax":
calibration_method = CalibrationMethod.MinMax
elif calibration_param == "Entropy":
calibration_method = CalibrationMethod.Entropy
else:
raise ValueError(f"Unsupported calibration method: {calibration_param}. Review your config yaml file at section"
f"quantization_parameters. Only MinMax or Entropy are supported so far.")
return calibration_method
def update_bit_width(tensor_type: str = None, order: int = None, direction: str = None):
"""
update the bit width order times, increasing or decreasing
Inputs:
tensor_type(str): Int4, UInt4, Int8, UInt8, Int16, or UInt16
order (int): number of times we update the type in the specified way, must be 1 or 2
direction (str): 'up' increase the bit width, 'down': decrease the bit_width
Returns:
updated_type (str): Int4, UInt4, Int8, UInt8, Int16, or UInt16
"""
if tensor_type == 'Int4':
if direction == 'down':
return dict_types[tensor_type]
else:
if order == 1:
return dict_types['Int8']
if order >= 2:
return dict_types['Int16']
elif tensor_type == 'UInt4':
if direction == 'down':
return dict_types[tensor_type]
else:
if order == 1:
return dict_types['UInt8']
if order >= 2:
return dict_types['UInt16']
elif tensor_type == 'Int8':
if direction == 'down':
if order >= 1:
return dict_types['Int4']
else:
if order >= 1:
return dict_types['Int16']
elif tensor_type == 'UInt8':
if direction == 'down':
if order >= 1:
return dict_types['UInt4']
else:
if order >= 1:
return dict_types['UInt16']
elif tensor_type == 'Int16':
if direction == 'down':
if order == 1:
return dict_types['Int8']
if order >= 2:
return dict_types['Int4']
else:
return dict_types[tensor_type]
elif tensor_type == 'UInt16':
if direction == 'down':
if order == 1:
return dict_types['UInt8']
if order >= 2:
return dict_types['UInt4']
else:
return dict_types[tensor_type]
else:
raise ValueError(f"Unsupported 'update_bit_width' function parameters. Check tensor_type = {tensor_type}, "
f"order = {order} and direction = {direction}")
def define_extra_options(cfg: dict = None, list_weights_tensors=None, list_activation_tensors=None, axis_list=None,
bit_width_w=None, bit_width_a=None):
"""
Set ONNX quantizer extra options according to config file
Inputs:
cfg (dict): dictionary of configuration parameters
list_weights_tensors (str): list of weights tensor names for which we want to override quantization
parameters. If None, ignored
list_activation_tensors (str): list of activation tensor names for which we want to override
quantization parameters. If None, ignored
bit_width_w (QuantType): QuantType.QInt16 or QuantType.QUInt16 or QuantType.QInt8 or QuantType.QUInt8,
or QuantType.QInt4 or QuantType.QUInt4 for all weights
bit_width_a (QuantType): QuantType.QInt16 or QuantType.QUInt16 or QuantType.QInt8 or QuantType.QUInt8,
or QuantType.QInt4 or QuantType.QUInt4 for all activations
Returns:
a dictionary with all extra options set
"""
extra_options = dict()
# when variable is not defined in cfg, extra_options dict receives None. Therefore, Onnx uses its default values
if cfg.quantization.onnx_extra_options:
extra_options["WeightSymmetric"] = cfg.quantization.onnx_extra_options.WeightSymmetric \
if cfg.quantization.onnx_extra_options.WeightSymmetric is not None else True
extra_options["ActivationSymmetric"] = cfg.quantization.onnx_extra_options.ActivationSymmetric \
if cfg.quantization.onnx_extra_options.ActivationSymmetric is not None else False
extra_options["CalibMovingAverage"] = cfg.quantization.onnx_extra_options.CalibMovingAverage \
if cfg.quantization.onnx_extra_options.CalibMovingAverage is not None else False
extra_options["QuantizeBias"] = cfg.quantization.onnx_extra_options.QuantizeBias \
if cfg.quantization.onnx_extra_options.QuantizeBias is not None else True
extra_options["SmoothQuant"] = cfg.quantization.onnx_extra_options.SmoothQuant \
if cfg.quantization.onnx_extra_options.SmoothQuant is not None else False
extra_options["SmoothQuantAlpha"] = cfg.quantization.onnx_extra_options.SmoothQuantAlpha \
if cfg.quantization.onnx_extra_options.SmoothQuantAlpha is not None else 0.5
extra_options["SmoothQuantFolding"] = cfg.quantization.onnx_extra_options.SmoothQuantFolding \
if cfg.quantization.onnx_extra_options.SmoothQuantFolding is not None else True
else:
extra_options["WeightSymmetric"] = True
extra_options["ActivationSymmetric"] = False
extra_options["CalibMovingAverage"] = False
extra_options["QuantizeBias"] = True
extra_options["SmoothQuant"] = False
extra_options["SmoothQuantAlpha"] = 0.5
extra_options["SmoothQuantFolding"] = True
extra_options["TensorQuantOverrides"] = {}
# Code for setting a specific bit_width for some weights tensor.
# if we want to keep per-channel quantization we need to add "axis" field for the weights
if list_weights_tensors:
for idx, e in enumerate(list_weights_tensors):
extra_options["TensorQuantOverrides"][e] = [{'quant_type': bit_width_w}]
if cfg.quantization.granularity == 'per_channel':
extra_options["TensorQuantOverrides"][e][0]["axis"] = axis_list[idx]
# Code for setting a specific bit_width for some activations tensors
if list_activation_tensors:
for e in list_activation_tensors:
extra_options["TensorQuantOverrides"][e] = [{'quant_type': bit_width_a}]
# case where overrides would be specified in the yaml
# we consider quant_type is mandatory, scale and offset optional. We don't support (yet?) scale and offset
# per channel but that would be very impractical to write the full list in the yaml. Per-tensor is ok
if not list_weights_tensors and not list_activation_tensors:
list_override_tensor = []
if cfg.quantization.onnx_extra_options:
if cfg.quantization.onnx_extra_options.weights_tensor_override:
list_override_tensor = cfg.quantization.onnx_extra_options.weights_tensor_override
if cfg.quantization.onnx_extra_options.activations_tensor_override:
list_override_tensor = list_override_tensor + cfg.quantization.onnx_extra_options.activations_tensor_override
if list_override_tensor:
for idx, t in enumerate(list_override_tensor):
# conversion string to type and zero point to int, and scale to np.array float32 as required by ONNX
t[1].quant_type = dict_types[t[1].quant_type]
if t[1].zero_point is not None: t[1].zero_point = np.array(t[1].zero_point)
if t[1].scale is not None: t[1].scale = np.array(t[1].scale, dtype=np.float32)
extra_options["TensorQuantOverrides"][t[0]] = [dict(t[1])]
return extra_options
def count_weights(onnx_model_path_quant, w_tensor_name: str = None):
"""
Count weights that are in 4 bits and weights that are in 8 bits and output the ratio
Args:
onnx_model_path_quant: the ONNX quantized model path
w_tensor_name (str): name of any tensor that has weights
Returns:
nb_weights_4bits, nb_weights_8bits, total_count_weights
"""
nb_weights_4bits = nb_weights_8bits = total_count_weights = 0
w_values_t_names = [w + '_quantized' for w in w_tensor_name] # extension needed to get the weights values
model_quant = onnx.load(onnx_model_path_quant)
initializers = [initializer for initializer in model_quant.graph.initializer if initializer.name in w_values_t_names]
for initializer in initializers:
tensor_values = numpy_helper.to_array(initializer)
n_weights = tensor_values.size
total_count_weights += n_weights
# since np.int4 does not exist we identify 4 bits tensors by the dynamic range having in mind that 4 bits
# means 16 integers
dyn_range = int(tensor_values.max()) - int(tensor_values.min())
if dyn_range <= 15: # 4 bits
nb_weights_4bits += n_weights
elif tensor_values.dtype == np.int8:
nb_weights_8bits += n_weights
return nb_weights_4bits, nb_weights_8bits, total_count_weights
def weights_based_layer_ranking(model, extension: str = None):
"""
Count weights (not bias) per layer in a Keras or onnx model
Args:
model: the keras or onnx model
extension (str): model backend, expected 'onnx' or 'keras'
Returns:
list of layer name and weight number after ranking
"""
layer_params = []
if extension == 'keras':
for layer in model.layers:
if hasattr(layer, "kernel"): # Only include layers with weights
num_params = layer.get_weights()[0].size
layer_params.append((layer.name, num_params))
elif extension == 'onnx':
initializers = {init.name: init for init in model.graph.initializer}
for node in model.graph.node:
if node.op_type in ['Conv', 'ConvTranspose', 'Gemm', 'MatMul']: # there are weights
initializer = initializers[node.input[1]]
num_params = int(np.prod(list(initializer.dims)))
layer_params.append((initializer.name, num_params))
# Sort by number of parameters in descending order
layer_params_ranked = sorted(layer_params, key=lambda x: x[1], reverse=True)
return layer_params_ranked
def _get_initializer_tensor(model, name: str = None):
"""
Report weights (not bias) corresponding to tensor 'name' if exists, in an onnx model
Args:
model: the onnx model
name (str): tensor name
Returns: weight tensor values
"""
for tensor in model.graph.initializer:
if tensor.name == name:
return onnx.numpy_helper.to_array(tensor)
return None
def _onnx_node_identity_card(model, node):
"""
Returns a list with node characteristic in order to make a ranking later-on
Args:
model: the onnx model
node: node under consideration
Returns:
list of node characteristics
"""
node_card_list = []
group = 1
if node.op_type == "Conv":
for attr in node.attribute:
if attr.name == "group":
group = attr.i
break
weight_name = node.input[1]
weight = _get_initializer_tensor(model, weight_name)
# onnx 'Conv' weight expected shape: [out_channels, in_channels_per_group, kH, kW]
out_channels, in_channels_per_group, kH, kW = weight.shape
nparams = int(np.prod([out_channels, in_channels_per_group, kH, kW]))
params_per_scale = int(np.prod([in_channels_per_group, kH, kW]))
# For depthwise, we expect in_channels_per_group == 1 and group == in_channels == out_channels
if in_channels_per_group == 1 and group == out_channels:
layer_type = 0 # DW
else:
layer_type = 1 # Conv2D
node_card_list = [node.name, layer_type, nparams, params_per_scale]
elif node.op_type in ["Gemm", "MatMul"]:
weight_name = node.input[1]
weight = _get_initializer_tensor(model, weight_name)
# There is a possibility that MatMul works with 2 tensors none of them being of type 'initializer' so with no weights
# in this case we report an empty list
if weight is not None:
out_channels, in_channels = weight.shape
nparams = out_channels * in_channels
params_per_scale = out_channels
node_card_list = [node.name, 2, nparams, params_per_scale]
else:
node_card_list = []
return node_card_list
def _keras_layer_identity_card(layer):
"""
Returns a list with node characteristic in order to make a ranking later-on
Args:
layer: layer under consideration
Returns:
list of layers characteristics
"""
layer_card_list = []
if hasattr(layer, "kernel"): # Only include layers with weights
weight = layer.get_weights()[0]
nparams = weight.size
if isinstance(layer, tensorflow.keras.layers.DepthwiseConv2D):
layer_type = 0 # DW
kH, kW, in_ch, depth_mult = weight.shape
params_per_scale = kH * kW
elif isinstance(layer, tensorflow.keras.layers.Conv2D):
layer_type = 1 # Conv2D
kH, kW, in_ch, out_ch = weight.shape
params_per_scale = int(np.prod([kH, kW, in_ch]))
elif isinstance(layer, tensorflow.keras.layers.Dense):
layer_type = 2 # Dense
in_dim, out_dim = weight.shape
params_per_scale = out_dim
layer_card_list = [layer.name, layer_type, nparams, params_per_scale]
return layer_card_list
def composite_score_layer_ranking(model, extension: str = None):
"""
Count weights (not bias) per layer in a Keras model
Args:
model: the model either onnx or keras
extension: model backend, expected 'onnx' or 'keras'
Returns:
list of layer names ranked
"""
layer_params = []
if extension == '.keras':
for layer in model.layers:
layer_card_list = _keras_layer_identity_card(layer)
if layer_card_list:
layer_params.append(layer_card_list)
elif extension == '.onnx':
for node in model.graph.node:
node_card_list = _onnx_node_identity_card(model, node)
if node_card_list:
layer_params.append(node_card_list)
# Ranking of layers for mixed precision w4w8 quantization
layer_params_ranked = sorted(layer_params, key=lambda x: (x[2], -x[3], x[0]), reverse=True)
return layer_params_ranked