Time_RCD / utils /utility.py
Oliver Le
Initial commit
d03866e
"""A set of utility functions to support outlier detection.
"""
from __future__ import division
from __future__ import print_function
from joblib.parallel import cpu_count
import numpy as np
from numpy import percentile
import numbers
import sklearn
from sklearn.metrics import precision_score
from sklearn.preprocessing import StandardScaler
from sklearn.utils import column_or_1d
from sklearn.utils import check_array
from sklearn.utils import check_consistent_length
from sklearn.utils import check_random_state
from sklearn.utils.random import sample_without_replacement
import torch.nn as nn
MAX_INT = np.iinfo(np.int32).max
MIN_INT = -1 * MAX_INT
def zscore(a, axis=0, ddof=0):
a = np.asanyarray(a)
mns = a.mean(axis=axis)
sstd = a.std(axis=axis, ddof=ddof)
if axis and mns.ndim < a.ndim:
res = ((a - np.expand_dims(mns, axis=axis)) /
np.expand_dims(sstd, axis=axis))
else:
res = (a - mns) / sstd
return np.nan_to_num(res)
def pairwise_distances_no_broadcast(X, Y):
"""Utility function to calculate row-wise euclidean distance of two matrix.
Different from pair-wise calculation, this function would not broadcast.
For instance, X and Y are both (4,3) matrices, the function would return
a distance vector with shape (4,), instead of (4,4).
Parameters
----------
X : array of shape (n_samples, n_features)
First input samples
Y : array of shape (n_samples, n_features)
Second input samples
Returns
-------
distance : array of shape (n_samples,)
Row-wise euclidean distance of X and Y
"""
X = check_array(X)
Y = check_array(Y)
if X.shape[0] != Y.shape[0] or X.shape[1] != Y.shape[1]:
raise ValueError("pairwise_distances_no_broadcast function receive"
"matrix with different shapes {0} and {1}".format(
X.shape, Y.shape))
euclidean_sq = np.square(Y - X)
return np.sqrt(np.sum(euclidean_sq, axis=1)).ravel()
def getSplit(X):
"""
Randomly selects a split value from set of scalar data 'X'.
Returns the split value.
Parameters
----------
X : array
Array of scalar values
Returns
-------
float
split value
"""
xmin = X.min()
xmax = X.max()
return np.random.uniform(xmin, xmax)
def similarityScore(S, node, alpha):
"""
Given a set of instances S falling into node and a value alpha >=0,
returns for all element x in S the weighted similarity score between x
and the centroid M of S (node.M)
Parameters
----------
S : array of instances
Array of instances that fall into a node
node: a DiFF tree node
S is the set of instances "falling" into the node
alpha: float
alpha is the distance scaling hyper-parameter
Returns
-------
array
the array of similarity values between the instances in S and the mean of training instances falling in node
"""
d = np.shape(S)[1]
if len(S) > 0:
d = np.shape(S)[1]
U = (S-node.M)/node.Mstd # normalize using the standard deviation vector to the mean
U = (2)**(-alpha*(np.sum(U*U/d, axis=1)))
else:
U = 0
return U
def EE(hist):
"""
given a list of positive values as a histogram drawn from any information source,
returns the empirical entropy of its discrete probability function.
Parameters
----------
hist: array
histogram
Returns
-------
float
empirical entropy estimated from the histogram
"""
h = np.asarray(hist, dtype=np.float64)
if h.sum() <= 0 or (h < 0).any():
return 0
h = h/h.sum()
return -(h*np.ma.log2(h)).sum()
def weightFeature(s, nbins):
'''
Given a list of values corresponding to a feature dimension, returns a weight (in [0,1]) that is
one minus the normalized empirical entropy, a way to characterize the importance of the feature dimension.
Parameters
----------
s: array
list of scalar values corresponding to a feature dimension
nbins: int
the number of bins used to discretize the feature dimension using an histogram.
Returns
-------
float
the importance weight for feature s.
'''
if s.min() == s.max():
return 0
hist = np.histogram(s, bins=nbins, density=True)
ent = EE(hist[0])
ent = ent/np.log2(nbins)
return 1-ent
def check_parameter(param, low=MIN_INT, high=MAX_INT, param_name='',
include_left=False, include_right=False):
"""Check if an input is within the defined range.
Parameters
----------
param : int, float
The input parameter to check.
low : int, float
The lower bound of the range.
high : int, float
The higher bound of the range.
param_name : str, optional (default='')
The name of the parameter.
include_left : bool, optional (default=False)
Whether includes the lower bound (lower bound <=).
include_right : bool, optional (default=False)
Whether includes the higher bound (<= higher bound).
Returns
-------
within_range : bool or raise errors
Whether the parameter is within the range of (low, high)
"""
# param, low and high should all be numerical
if not isinstance(param, (numbers.Integral, np.integer, float)):
raise TypeError('{param_name} is set to {param} Not numerical'.format(
param=param, param_name=param_name))
if not isinstance(low, (numbers.Integral, np.integer, float)):
raise TypeError('low is set to {low}. Not numerical'.format(low=low))
if not isinstance(high, (numbers.Integral, np.integer, float)):
raise TypeError('high is set to {high}. Not numerical'.format(
high=high))
# at least one of the bounds should be specified
if low is MIN_INT and high is MAX_INT:
raise ValueError('Neither low nor high bounds is undefined')
# if wrong bound values are used
if low > high:
raise ValueError(
'Lower bound > Higher bound')
# value check under different bound conditions
if (include_left and include_right) and (param < low or param > high):
raise ValueError(
'{param_name} is set to {param}. '
'Not in the range of [{low}, {high}].'.format(
param=param, low=low, high=high, param_name=param_name))
elif (include_left and not include_right) and (
param < low or param >= high):
raise ValueError(
'{param_name} is set to {param}. '
'Not in the range of [{low}, {high}).'.format(
param=param, low=low, high=high, param_name=param_name))
elif (not include_left and include_right) and (
param <= low or param > high):
raise ValueError(
'{param_name} is set to {param}. '
'Not in the range of ({low}, {high}].'.format(
param=param, low=low, high=high, param_name=param_name))
elif (not include_left and not include_right) and (
param <= low or param >= high):
raise ValueError(
'{param_name} is set to {param}. '
'Not in the range of ({low}, {high}).'.format(
param=param, low=low, high=high, param_name=param_name))
else:
return True
def check_detector(detector):
"""Checks if fit and decision_function methods exist for given detector
Parameters
----------
detector : pyod.models
Detector instance for which the check is performed.
"""
if not hasattr(detector, 'fit') or not hasattr(detector,
'decision_function'):
raise AttributeError("%s is not a detector instance." % (detector))
def standardizer(X, X_t=None, keep_scalar=False):
"""Conduct Z-normalization on data to turn input samples become zero-mean
and unit variance.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training samples
X_t : numpy array of shape (n_samples_new, n_features), optional (default=None)
The data to be converted
keep_scalar : bool, optional (default=False)
The flag to indicate whether to return the scalar
Returns
-------
X_norm : numpy array of shape (n_samples, n_features)
X after the Z-score normalization
X_t_norm : numpy array of shape (n_samples, n_features)
X_t after the Z-score normalization
scalar : sklearn scalar object
The scalar used in conversion
"""
X = check_array(X)
scaler = StandardScaler().fit(X)
if X_t is None:
if keep_scalar:
return scaler.transform(X), scaler
else:
return scaler.transform(X)
else:
X_t = check_array(X_t)
if X.shape[1] != X_t.shape[1]:
raise ValueError(
"The number of input data feature should be consistent"
"X has {0} features and X_t has {1} features.".format(
X.shape[1], X_t.shape[1]))
if keep_scalar:
return scaler.transform(X), scaler.transform(X_t), scaler
else:
return scaler.transform(X), scaler.transform(X_t)
def score_to_label(pred_scores, outliers_fraction=0.1):
"""Turn raw outlier outlier scores to binary labels (0 or 1).
Parameters
----------
pred_scores : list or numpy array of shape (n_samples,)
Raw outlier scores. Outliers are assumed have larger values.
outliers_fraction : float in (0,1)
Percentage of outliers.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
# check input values
pred_scores = column_or_1d(pred_scores)
check_parameter(outliers_fraction, 0, 1)
threshold = percentile(pred_scores, 100 * (1 - outliers_fraction))
pred_labels = (pred_scores > threshold).astype('int')
return pred_labels
def precision_n_scores(y, y_pred, n=None):
"""Utility function to calculate precision @ rank n.
Parameters
----------
y : list or numpy array of shape (n_samples,)
The ground truth. Binary (0: inliers, 1: outliers).
y_pred : list or numpy array of shape (n_samples,)
The raw outlier scores as returned by a fitted model.
n : int, optional (default=None)
The number of outliers. if not defined, infer using ground truth.
Returns
-------
precision_at_rank_n : float
Precision at rank n score.
"""
# turn raw prediction decision scores into binary labels
y_pred = get_label_n(y, y_pred, n)
# enforce formats of y and labels_
y = column_or_1d(y)
y_pred = column_or_1d(y_pred)
return precision_score(y, y_pred)
def get_label_n(y, y_pred, n=None):
"""Function to turn raw outlier scores into binary labels by assign 1
to top n outlier scores.
Parameters
----------
y : list or numpy array of shape (n_samples,)
The ground truth. Binary (0: inliers, 1: outliers).
y_pred : list or numpy array of shape (n_samples,)
The raw outlier scores as returned by a fitted model.
n : int, optional (default=None)
The number of outliers. if not defined, infer using ground truth.
Returns
-------
labels : numpy array of shape (n_samples,)
binary labels 0: normal points and 1: outliers
Examples
--------
>>> from pyod.utils.utility import get_label_n
>>> y = [0, 1, 1, 0, 0]
>>> y_pred = [0.1, 0.5, 0.3, 0.2, 0.7]
>>> get_label_n(y, y_pred)
array([0, 1, 0, 0, 1])
"""
# enforce formats of inputs
y = column_or_1d(y)
y_pred = column_or_1d(y_pred)
check_consistent_length(y, y_pred)
y_len = len(y) # the length of targets
# calculate the percentage of outliers
if n is not None:
outliers_fraction = n / y_len
else:
outliers_fraction = np.count_nonzero(y) / y_len
threshold = percentile(y_pred, 100 * (1 - outliers_fraction))
y_pred = (y_pred > threshold).astype('int')
return y_pred
def get_intersection(lst1, lst2):
"""get the overlapping between two lists
Parameters
----------
li1 : list or numpy array
Input list 1.
li2 : list or numpy array
Input list 2.
Returns
-------
difference : list
The overlapping between li1 and li2.
"""
return list(set(lst1) & set(lst2))
def get_list_diff(li1, li2):
"""get the elements in li1 but not li2. li1-li2
Parameters
----------
li1 : list or numpy array
Input list 1.
li2 : list or numpy array
Input list 2.
Returns
-------
difference : list
The difference between li1 and li2.
"""
return (list(set(li1) - set(li2)))
def get_diff_elements(li1, li2):
"""get the elements in li1 but not li2, and vice versa
Parameters
----------
li1 : list or numpy array
Input list 1.
li2 : list or numpy array
Input list 2.
Returns
-------
difference : list
The difference between li1 and li2.
"""
return (list(set(li1) - set(li2)) + list(set(li2) - set(li1)))
def argmaxn(value_list, n, order='desc'):
"""Return the index of top n elements in the list
if order is set to 'desc', otherwise return the index of n smallest ones.
Parameters
----------
value_list : list, array, numpy array of shape (n_samples,)
A list containing all values.
n : int
The number of elements to select.
order : str, optional (default='desc')
The order to sort {'desc', 'asc'}:
- 'desc': descending
- 'asc': ascending
Returns
-------
index_list : numpy array of shape (n,)
The index of the top n elements.
"""
value_list = column_or_1d(value_list)
length = len(value_list)
# validate the choice of n
check_parameter(n, 1, length, include_left=True, include_right=True,
param_name='n')
# for the smallest n, flip the value
if order != 'desc':
n = length - n
value_sorted = np.partition(value_list, length - n)
threshold = value_sorted[int(length - n)]
if order == 'desc':
return np.where(np.greater_equal(value_list, threshold))[0]
else: # return the index of n smallest elements
return np.where(np.less(value_list, threshold))[0]
def invert_order(scores, method='multiplication'):
""" Invert the order of a list of values. The smallest value becomes
the largest in the inverted list. This is useful while combining
multiple detectors since their score order could be different.
Parameters
----------
scores : list, array or numpy array with shape (n_samples,)
The list of values to be inverted
method : str, optional (default='multiplication')
Methods used for order inversion. Valid methods are:
- 'multiplication': multiply by -1
- 'subtraction': max(scores) - scores
Returns
-------
inverted_scores : numpy array of shape (n_samples,)
The inverted list
Examples
--------
>>> scores1 = [0.1, 0.3, 0.5, 0.7, 0.2, 0.1]
>>> invert_order(scores1)
array([-0.1, -0.3, -0.5, -0.7, -0.2, -0.1])
>>> invert_order(scores1, method='subtraction')
array([0.6, 0.4, 0.2, 0. , 0.5, 0.6])
"""
scores = column_or_1d(scores)
if method == 'multiplication':
return scores.ravel() * -1
if method == 'subtraction':
return (scores.max() - scores).ravel()
def _get_sklearn_version(): # pragma: no cover
""" Utility function to decide the version of sklearn.
PyOD will result in different behaviors with different sklearn version
Returns
-------
sk_learn version : int
"""
sklearn_version = str(sklearn.__version__)
if int(sklearn_version.split(".")[1]) < 19 or int(
sklearn_version.split(".")[1]) > 23:
raise ValueError("Sklearn version error")
return int(sklearn_version.split(".")[1])
def _sklearn_version_21(): # pragma: no cover
""" Utility function to decide the version of sklearn
In sklearn 21.0, LOF is changed. Specifically, _decision_function
is replaced by _score_samples
Returns
-------
sklearn_21_flag : bool
True if sklearn.__version__ is newer than 0.21.0
"""
sklearn_version = str(sklearn.__version__)
if int(sklearn_version.split(".")[1]) > 20:
return True
else:
return False
def generate_bagging_indices(random_state, bootstrap_features, n_features,
min_features, max_features):
""" Randomly draw feature indices. Internal use only.
Modified from sklearn/ensemble/bagging.py
Parameters
----------
random_state : RandomState
A random number generator instance to define the state of the random
permutations generator.
bootstrap_features : bool
Specifies whether to bootstrap indice generation
n_features : int
Specifies the population size when generating indices
min_features : int
Lower limit for number of features to randomly sample
max_features : int
Upper limit for number of features to randomly sample
Returns
-------
feature_indices : numpy array, shape (n_samples,)
Indices for features to bag
"""
# Get valid random state
random_state = check_random_state(random_state)
# decide number of features to draw
random_n_features = random_state.randint(min_features, max_features)
# Draw indices
feature_indices = generate_indices(random_state, bootstrap_features,
n_features, random_n_features)
return feature_indices
def generate_indices(random_state, bootstrap, n_population, n_samples):
""" Draw randomly sampled indices. Internal use only.
See sklearn/ensemble/bagging.py
Parameters
----------
random_state : RandomState
A random number generator instance to define the state of the random
permutations generator.
bootstrap : bool
Specifies whether to bootstrap indice generation
n_population : int
Specifies the population size when generating indices
n_samples : int
Specifies number of samples to draw
Returns
-------
indices : numpy array, shape (n_samples,)
randomly drawn indices
"""
# Draw sample indices
if bootstrap:
indices = random_state.randint(0, n_population, n_samples)
else:
indices = sample_without_replacement(n_population, n_samples,
random_state=random_state)
return indices
def EuclideanDist(x,y):
return np.sqrt(np.sum((x - y) ** 2))
def dist2set(x, X):
l=len(X)
ldist=[]
for i in range(l):
ldist.append(EuclideanDist(x,X[i]))
return ldist
def c_factor(n) :
if(n<2):
n=2
return 2.0*(np.log(n-1)+0.5772156649) - (2.0*(n-1.)/(n*1.0))
def all_branches(node, current=[], branches = None):
current = current[:node.e]
if branches is None: branches = []
if node.ntype == 'inNode':
current.append('L')
all_branches(node.left, current=current, branches=branches)
current = current[:-1]
current.append('R')
all_branches(node.right, current=current, branches=branches)
else:
branches.append(current)
return branches
def branch2num(branch, init_root=0):
num = [init_root]
for b in branch:
if b == 'L':
num.append(num[-1] * 2 + 1)
if b == 'R':
num.append(num[-1] * 2 + 2)
return num
def _get_n_jobs(n_jobs):
"""Get number of jobs for the computation.
See sklearn/utils/__init__.py for more information.
This function reimplements the logic of joblib to determine the actual
number of jobs depending on the cpu count. If -1 all CPUs are used.
If 1 is given, no parallel computing code is used at all, which is useful
for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used.
Thus for n_jobs = -2, all CPUs but one are used.
Parameters
----------
n_jobs : int
Number of jobs stated in joblib convention.
Returns
-------
n_jobs : int
The actual number of jobs as positive integer.
"""
if n_jobs < 0:
return max(cpu_count() + 1 + n_jobs, 1)
elif n_jobs == 0:
raise ValueError('Parameter n_jobs == 0 has no meaning.')
else:
return n_jobs
def _partition_estimators(n_estimators, n_jobs):
"""Private function used to partition estimators between jobs.
See sklearn/ensemble/base.py for more information.
"""
# Compute the number of jobs
n_jobs = min(_get_n_jobs(n_jobs), n_estimators)
# Partition estimators between jobs
n_estimators_per_job = (n_estimators // n_jobs) * np.ones(n_jobs, dtype=int)
n_estimators_per_job[:n_estimators % n_jobs] += 1
starts = np.cumsum(n_estimators_per_job)
return n_jobs, n_estimators_per_job.tolist(), [0] + starts.tolist()
def _pprint(params, offset=0, printer=repr):
# noinspection PyPep8
"""Pretty print the dictionary 'params'
See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
and sklearn/base.py for more information.
:param params: The dictionary to pretty print
:type params: dict
:param offset: The offset in characters to add at the begin of each line.
:type offset: int
:param printer: The function to convert entries to strings, typically
the builtin str or repr
:type printer: callable
:return: None
"""
# Do a multi-line justified repr:
options = np.get_printoptions()
np.set_printoptions(precision=5, threshold=64, edgeitems=2)
params_list = list()
this_line_length = offset
line_sep = ',\n' + (1 + offset // 2) * ' '
for i, (k, v) in enumerate(sorted(params.items())):
if type(v) is float:
# use str for representing floating point numbers
# this way we get consistent representation across
# architectures and versions.
this_repr = '%s=%s' % (k, str(v))
else:
# use repr of the rest
this_repr = '%s=%s' % (k, printer(v))
if len(this_repr) > 500:
this_repr = this_repr[:300] + '...' + this_repr[-100:]
if i > 0:
if this_line_length + len(this_repr) >= 75 or '\n' in this_repr:
params_list.append(line_sep)
this_line_length = len(line_sep)
else:
params_list.append(', ')
this_line_length += 2
params_list.append(this_repr)
this_line_length += len(this_repr)
np.set_printoptions(**options)
lines = ''.join(params_list)
# Strip trailing space to avoid nightmare in doctests
lines = '\n'.join(l.rstrip(' ') for l in lines.split('\n'))
return lines
def get_activation_by_name(name):
activations = {
'relu': nn.ReLU(),
'sigmoid': nn.Sigmoid(),
'tanh': nn.Tanh(),
'leakyrelu':nn.LeakyReLU()
}
if name in activations.keys():
return activations[name]
else:
raise ValueError(name, "is not a valid activation function")
def get_optimal_n_bins(X, upper_bound=None, epsilon=1):
""" Determine optimal number of bins for a histogram using the Birge
Rozenblac method (see :cite:`birge2006many` for details.)
See https://doi.org/10.1051/ps:2006001
Parameters
----------
X : array-like of shape (n_samples, n_features)
The samples to determine the optimal number of bins for.
upper_bound : int, default=None
The maximum value of n_bins to be considered.
If set to None, np.sqrt(X.shape[0]) will be used as upper bound.
epsilon : float, default = 1
A stabilizing term added to the logarithm to prevent division by zero.
Returns
-------
optimal_n_bins : int
The optimal value of n_bins according to the Birge Rozenblac method
"""
if upper_bound is None:
upper_bound = int(np.sqrt(X.shape[0]))
n = X.shape[0]
maximum_likelihood = np.zeros((upper_bound - 1, 1))
for i, b in enumerate(range(1, upper_bound)):
histogram, _ = np.histogram(X, bins=b)
maximum_likelihood[i] = np.sum(
histogram * np.log(b * histogram / n + epsilon) - (
b - 1 + np.power(np.log(b), 2.5)))
return np.argmax(maximum_likelihood) + 1