Diogo-V's picture
Duplicate from Diogo-V/phi-nlr-5eec68bb37ef
4eef383 verified
from __future__ import annotations
import torch
from torch import amin # Necessary for arcsin
import copy
import torch.nn as nn
import numpy as np
from scipy.optimize import curve_fit
from typing import Dict, Any, Tuple, List, Callable
def quantization(x, **params):
return (torch.div(1, replace_num(params['_s'], num=0, to=10000)) * torch.asin(domain_guard((params['_0'] * x), min=-0.99999, max=0.99999, nan=0)))
def dequantization(x, **params):
return (torch.div(1, replace_num(params['_0'], num=0, to=10000)) * torch.sin((params['_s'] * x)))
def init_params(x: torch.Tensor, **kwargs: Dict[str, Any]) -> Dict[str, nn.Parameter]:
base_p0 = {
'_0': init_space_search(x, qtz_func=quantization, deqtz_func=dequantization, params_list=['_0', '_s'], param='_0', **kwargs),
}
base_p0['_s'] = init_linear_scale(x, qtz_func=quantization, params=base_p0, **kwargs)
if 'post_init_hook' in kwargs:
kwargs['post_init_hook'](parameters=base_p0)
params = init_non_linear_regression_fit(x, p0=base_p0, np_fit_func=fit_func, qtz_func=quantization, deqtz_func=dequantization, params_list=['_0', '_s'], **kwargs)
params = {k: nn.Parameter(v, requires_grad=False) for k, v in params.items()}
if 'post_method_hook' in kwargs:
kwargs['post_method_hook'](parameters=params)
params = learn_parameters(x, params,
qtz_func=quantization,
deqtz_func=dequantization,
bits=kwargs['bits'],
target_dtype=torch.int8,
epochs=500,
early_stop=False,
)
if 'post_train_hook' in kwargs:
kwargs['post_train_hook'](parameters=params)
return params
############### Numpy Qtz ###############
def np_quantization(x, _0, _s):
return (np.divide(1, np_replace_num(_s, num=0, to=10000)) * np.arcsin(np_domain_guard((_0 * x), min=-0.99999, max=0.99999, nan=0)))
def np_dequantization(x, _0, _s):
return (np.divide(1, np_replace_num(_0, num=0, to=10000)) * np.sin((_s * x)))
def fit_func(x, _0, _s):
x_ = np_quantization(x, _0, _s)
x_ = np_dequantization(x_, _0, _s)
return x_
############### HELPERS ###############
def domain_guard(
x: torch.Tensor,
min: float = None,
max: float = None,
posinf: float = None,
neginf: float = None,
nan: float = None
) -> torch.Tensor:
"""Guard a tensor to a valid domain."""
x = torch.nan_to_num(x, posinf=posinf, neginf=neginf, nan=nan)
if min is not None or max is not None:
x = torch.clamp(x, min=min, max=max)
return x
def replace_num(x: torch.Tensor, num: float, to: float) -> torch.Tensor:
"""Replace a number in a tensor with another number.
Args:
x (torch.Tensor): The input tensor.
num (float): The number to replace.
to (float): The number to replace with.
Returns:
torch.Tensor: The tensor with the number replaced.
"""
return torch.where(x == num, to, x)
def guarded_torch_power(x: torch.Tensor, exp: float) -> torch.Tensor:
"""Guard the power operation to a valid domain."""
return torch.pow(x, exp) if exp >= 1 else torch.pow(torch.relu(x), exp)
def init_ones(x: torch.Tensor, **kwargs: Dict[str, Any]) -> torch.Tensor:
val = torch.amin(x, dim=1)
return torch.ones_like(val, dtype=torch.float32, device=x.device)
def init_rand(x: torch.Tensor, **kwargs: Dict[str, Any]) -> torch.Tensor:
val = torch.amin(x, dim=1)
return torch.randn_like(val, dtype=torch.float32, device=x.device)
def init_space_search(
x: torch.Tensor,
**kwargs: Dict[str, Any],
) -> torch.Tensor:
def _build_initial_param(tensor: torch.Tensor, max_initial: int, n_params: int):
"""Generates the initial set of parameters. The first iteration generates 10 times more parameters."""
for _ in range(n_params * 10): # The first iteration generates 10 times more parameters
yield init_rand(tensor) * max_initial # Generates n_params in range [-max_initial, max_initial]
def _search_param(tensors: List[torch.tensor], n_params):
"""Takes the best parameters and generates new parameters around the mean of the best parameters."""
torch_tensors = torch.stack(tensors)
min_vals, max_vals = torch.aminmax(torch_tensors, dim=0)
abs_max_val_per_ch = torch.max(-min_vals, max_vals)
mean = torch.mean(torch_tensors, dim=0)
for _ in range(n_params): # Generates n_params around the mean of the tensors
yield torch.randn_like(min_vals) * abs_max_val_per_ch + mean
def _calc(x, qtz_func, deqtz_func, **params):
x_ = x.transpose(0, 1)
x_ = qtz_func(x=x_, **params)
x_ = deqtz_func(x=x_, **params)
x_ = x_.transpose(0, 1)
return x_
assert "qtz_func" in kwargs, "qtz_func must be provided."
assert "deqtz_func" in kwargs, "deqtz_func must be provided."
assert "params_list" in kwargs, "params list must be provided."
assert "param" in kwargs, "param must be provided."
qtz_func = kwargs.get('qtz_func')
deqtz_func = kwargs.get('deqtz_func')
params_list = kwargs.get('params_list')
param = kwargs.get('param')
n_runs = 50 # Number of runs to try to find the best parameters
n_random_params = 50 # Number of random parameters to generate
n_best_to_pick = 5 # Number of best parameters to pick after each run
max_initial = 10000 # Maximum value to initialize the parameters
# Initializes the parameters
base_params = { p: init_ones(x, **kwargs) for p in params_list if p != param }
params = _build_initial_param(x, max_initial, n_random_params)
# Performs the search
for _ in range(n_runs):
best_params = []
for param_ in params:
try:
x_ = _calc(x, qtz_func, deqtz_func, **base_params, **{param: param_})
loss_ones = nn.MSELoss()(x, x_)
if len(best_params) < n_best_to_pick:
best_params.append((param_, loss_ones.item()))
best_params = sorted(best_params, key=lambda x: x[1])
elif loss_ones < best_params[-1][1]:
best_params[-1] = (param_, loss_ones.item())
best_params = sorted(best_params, key=lambda x: x[1])
except Exception: # The parameters might not be valid for the function's domain
continue
# Generates new parameters around the mean
params = _search_param([p for p, _ in best_params], n_random_params)
# Checks if the best parameter is better than the init_ones
p_ones = init_ones(x, **kwargs)
x_ = _calc(x, qtz_func, deqtz_func, **base_params, **{param: p_ones})
loss_ones = nn.MSELoss()(x, x_)
# Checks if the best parameter is better than the init_rand
p_rand = init_rand(x, **kwargs)
x_ = _calc(x, qtz_func, deqtz_func, **base_params, **{param: p_rand})
loss_rand = nn.MSELoss()(x, x_)
if loss_rand < best_params[0][1] and loss_rand < loss_ones:
return p_rand
elif loss_ones < best_params[0][1] and loss_ones < loss_rand:
return p_ones
else:
return best_params[0][0]
def init_linear_scale( # Symmetric scale. From the study folder
x: torch.Tensor,
**kwargs: Dict[str, Any],
) -> torch.Tensor:
assert "bits" in kwargs, "bits must be provided."
assert "params" in kwargs, "params must be provided."
assert "qtz_func" in kwargs, "qtz_func must be provided."
bits = kwargs.get('bits')
params = kwargs.get('params')
qtz_func = kwargs.get('qtz_func')
x_ = x.transpose(0, 1)
x_ = qtz_func(x=x_, **params, _s=init_ones(x, **kwargs))
x_ = x_.transpose(0, 1)
quant_min, quant_max = get_min_max_from_bits_signed(bits)
min_vals, max_vals = torch.aminmax(x_, dim=1)
min_vals = torch.min(min_vals, torch.zeros_like(min_vals))
max_vals = torch.max(max_vals, torch.zeros_like(max_vals))
eps = torch.finfo(torch.float32).eps
abs_max_val_per_ch = torch.max(-min_vals, max_vals)
scale = abs_max_val_per_ch / (float(quant_max - quant_min) / 2)
scale = torch.clamp(scale, min=eps).to(dtype=torch.float32, device=min_vals.device)
# Introduces some noise in scale
# If I don't introduce noise, the accuracy is going to be 0.0 and not learn anything
# If I don't introduce noise, the accuracy is going to be 0.0 and not learn anything.
# NOTE(diogo): This has been disproven. The noise does not help the learning process but I still
# left it here for future reference. Will be removed later.
# scale = scale + 0.01 * torch.randn_like(scale)
return scale
def init_non_linear_regression_fit(
x: torch.Tensor,
**kwargs: Dict[str, Any],
) -> torch.Tensor:
assert "params_list" in kwargs, "params list must be provided."
assert "np_fit_func" in kwargs, "np_fit_func must be provided."
assert "p0" in kwargs, "p0 must be provided."
np_fit_func = kwargs.get('np_fit_func')
params_list = kwargs.get('params_list')
p0 = kwargs.get('p0')
def _fit(xdata: np.ndarray, ydata: np.ndarray, func: Callable, p0: List[float]):
popt, _ = curve_fit(
func,
xdata,
ydata,
maxfev=1000,
p0=p0,
method='lm'
)
return popt
# 1. Needs to convert the torch tensor to numpy tensor
xdata = x.cpu().numpy()
# 2. Sorts the data so that it makes it easier to fit to it
sorted_xdata = np.sort(xdata, axis=-1)
p0 = {k: v.cpu().numpy() for k, v in p0.items()}
params_list = sorted(params_list) # We need to make sure that it matches the numpy fit func arg order
# 3. Finds the best parameters for each channel
try:
params = []
for i in range(sorted_xdata.shape[0]):
xdata_ = sorted_xdata[i]
p0_ = [p0[p][i] for p in params_list]
ch_params = _fit(xdata_, xdata_, np_fit_func, p0_)
params.append(ch_params)
# 4. Builds the parameters
result = {}
for i, p in enumerate(params_list):
result[p] = torch.tensor([p_[i] for p_ in params], dtype=torch.float32).to(x.device)
return result
except ValueError as e:
print(f"Could not fit the function with error: {e}")
print(f"Using fallback result...")
return {
k: torch.tensor(v, dtype=torch.float32).to(x.device) for k, v in p0.items()
}
def init_zeros(x: torch.Tensor, **kwargs: Dict[str, Any]) -> torch.Tensor:
val = torch.amin(x, dim=1)
return torch.zeros_like(val, dtype=torch.float32, device=x.device)
def init_inner_scale(tensor: torch.Tensor, _min: float = torch.inf, _max: float = torch.inf) -> torch.Tensor:
# Calculate the original minimum and maximum values
min_vals, max_vals = torch.aminmax(tensor, dim=-1)
x_min = torch.min(min_vals, torch.zeros_like(min_vals))
x_max = torch.max(max_vals, torch.zeros_like(max_vals))
if _max is torch.inf: # We do not need to scale the tensor. Just need to move it
return torch.ones_like(x_min)
# Calculate the scale factor
scale = (_max - _min) / (x_max - x_min)
return scale
############## Quant ###############
@torch.enable_grad()
def learn_parameters(
x: torch.Tensor,
params: Dict[str, nn.Parameter],
qtz_func: nn.Module,
deqtz_func: nn.Module,
bits: int,
target_dtype: torch.dtype,
epochs: int = 1000,
early_stop: bool = True,
do_report: bool = False
) -> Tuple[Dict[str, nn.Parameter], torch.Tensor]:
loss_fn = nn.MSELoss()
# Determines the initial learning rate by computing the initial loss and multiplying it by
# the order of magnitude of the loss divided by 2
quant = quantize(x, params, qtz_func, bits, target_dtype)
dequant = dequantize(quant, params, deqtz_func, bits, x.dtype)
loss = loss_fn(x, dequant)
base_lr = 0.1
exponent = int(np.floor(np.log10(loss.item())))
lr = base_lr * (10 ** (exponent // 2))
# Requires gradients in the parameters
for p in params.values():
p.requires_grad = True
p.grad = None
param_keys = list(params.keys())
param_values = list(params.values())
# Defines optimizer and loss function
optimizer = torch.optim.Adam(param_values, lr=lr)
scheduler = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=1.0, end_factor=0.1, total_iters=epochs // 10)
# Contains the best loss and the best parameters
best_loss = float("inf")
best_params = None
# Used to stop the search early
min_delta = 1e-7
acc_loss = []
percent_epochs_before_stop = 0.1
for i in range(epochs):
optimizer.zero_grad()
quant = quantize(x, params, qtz_func, bits, target_dtype)
dequant = dequantize(quant, params, deqtz_func, bits, x.dtype)
loss = loss_fn(x, dequant)
if loss.isnan() or loss.isinf():
raise Exception("Loss is NaN or Inf. Stopping the search.")
loss.backward()
optimizer.step()
scheduler.step()
acc_loss.append(loss.item())
# Reports loss every 10 steps
if i % 10 == 0 and do_report:
print(f"Epoch {i}: Loss {loss.item()}")
# Optimizes the parameter search by storing the best loss and the parameters
if loss.item() < best_loss:
best_loss = loss.item()
best_params = copy.deepcopy({
k: v for k, v in params.items() if k in param_keys
})
# We also stop the search if the loss has not considerably during the last 10% epochs
if early_stop:
epochs_before_stop = int(epochs * percent_epochs_before_stop)
if i > epochs_before_stop and abs(acc_loss[i - epochs_before_stop] - acc_loss[i]) < min_delta:
break
# No longer requires gradients in the parameters
for p in best_params.values():
p.requires_grad = False
p.grad = None
if do_report:
print(f"Best loss: {best_loss}")
return best_params, acc_loss
else:
return best_params
def quantize(
x: torch.Tensor,
params: Dict[str, nn.Parameter],
func: nn.Module,
bits: int,
target_dtype: torch.dtype = torch.int8
) -> torch.Tensor:
quant_min, quant_max = get_min_max_from_bits_signed(bits)
x = x.transpose(0, 1) # Aligns shapes
x = func(x=x, **params)
x = x.transpose(0, 1)
x = torch.clamp(round_func_BPDA(x), quant_min, quant_max).to(target_dtype)
return x
def dequantize(
x: torch.Tensor,
params: Dict[str, nn.Parameter],
func: nn.Module,
bits: int,
out_dtype: torch.dtype
) -> torch.Tensor:
x = x.to(dtype=out_dtype)
x = x.transpose(0, 1)
x = func(x=x, **params)
x = x.transpose(0, 1)
return x
def round_func_BPDA(input):
# This is equivalent to replacing round function (non-differentiable) with
# an identity function (differentiable) only when backward.
forward_value = torch.round(input)
out = input.clone()
out.data = forward_value.data
return out
def get_min_max_from_bits_signed(bit_width: int) -> Tuple[int, int]:
return -2 ** (bit_width - 1), 2 ** (bit_width - 1) - 1
############## Numpy ###############
def np_domain_guard(
x: np.ndarray,
min: float = None,
max: float = None,
posinf: float = None,
neginf: float = None,
nan: float = None
) -> np.ndarray:
"""Guard a tensor to a valid domain."""
x = np.nan_to_num(x, posinf=posinf, neginf=neginf, nan=nan)
if min is not None or max is not None:
x = np.clip(x, min, max)
return x
def np_replace_num(x: np.ndarray, num: float, to: float) -> np.ndarray:
"""Replace a number in a tensor with another number.
Args:
x (np.ndarray): The input tensor.
num (float): The number to replace.
to (float): The number to replace with.
Returns:
np.ndarray: The tensor with the number replaced.
"""
return np.where(x == num, to, x)
def np_guarded_power(x: np.ndarray, exp: float) -> np.ndarray:
"""Guard the power operation to a valid domain."""
return np.power(x, exp) if exp >= 1 else np.power(np.maximum(x, 0), exp)