Spaces:
Running
Running
| """A set of utility functions to support outlier detection. | |
| """ | |
| from __future__ import division | |
| from __future__ import print_function | |
| from joblib.parallel import cpu_count | |
| import numpy as np | |
| from numpy import percentile | |
| import numbers | |
| import sklearn | |
| from sklearn.metrics import precision_score | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.utils import column_or_1d | |
| from sklearn.utils import check_array | |
| from sklearn.utils import check_consistent_length | |
| from sklearn.utils import check_random_state | |
| from sklearn.utils.random import sample_without_replacement | |
| import torch.nn as nn | |
| MAX_INT = np.iinfo(np.int32).max | |
| MIN_INT = -1 * MAX_INT | |
| def zscore(a, axis=0, ddof=0): | |
| a = np.asanyarray(a) | |
| mns = a.mean(axis=axis) | |
| sstd = a.std(axis=axis, ddof=ddof) | |
| if axis and mns.ndim < a.ndim: | |
| res = ((a - np.expand_dims(mns, axis=axis)) / | |
| np.expand_dims(sstd, axis=axis)) | |
| else: | |
| res = (a - mns) / sstd | |
| return np.nan_to_num(res) | |
| def pairwise_distances_no_broadcast(X, Y): | |
| """Utility function to calculate row-wise euclidean distance of two matrix. | |
| Different from pair-wise calculation, this function would not broadcast. | |
| For instance, X and Y are both (4,3) matrices, the function would return | |
| a distance vector with shape (4,), instead of (4,4). | |
| Parameters | |
| ---------- | |
| X : array of shape (n_samples, n_features) | |
| First input samples | |
| Y : array of shape (n_samples, n_features) | |
| Second input samples | |
| Returns | |
| ------- | |
| distance : array of shape (n_samples,) | |
| Row-wise euclidean distance of X and Y | |
| """ | |
| X = check_array(X) | |
| Y = check_array(Y) | |
| if X.shape[0] != Y.shape[0] or X.shape[1] != Y.shape[1]: | |
| raise ValueError("pairwise_distances_no_broadcast function receive" | |
| "matrix with different shapes {0} and {1}".format( | |
| X.shape, Y.shape)) | |
| euclidean_sq = np.square(Y - X) | |
| return np.sqrt(np.sum(euclidean_sq, axis=1)).ravel() | |
| def getSplit(X): | |
| """ | |
| Randomly selects a split value from set of scalar data 'X'. | |
| Returns the split value. | |
| Parameters | |
| ---------- | |
| X : array | |
| Array of scalar values | |
| Returns | |
| ------- | |
| float | |
| split value | |
| """ | |
| xmin = X.min() | |
| xmax = X.max() | |
| return np.random.uniform(xmin, xmax) | |
| def similarityScore(S, node, alpha): | |
| """ | |
| Given a set of instances S falling into node and a value alpha >=0, | |
| returns for all element x in S the weighted similarity score between x | |
| and the centroid M of S (node.M) | |
| Parameters | |
| ---------- | |
| S : array of instances | |
| Array of instances that fall into a node | |
| node: a DiFF tree node | |
| S is the set of instances "falling" into the node | |
| alpha: float | |
| alpha is the distance scaling hyper-parameter | |
| Returns | |
| ------- | |
| array | |
| the array of similarity values between the instances in S and the mean of training instances falling in node | |
| """ | |
| d = np.shape(S)[1] | |
| if len(S) > 0: | |
| d = np.shape(S)[1] | |
| U = (S-node.M)/node.Mstd # normalize using the standard deviation vector to the mean | |
| U = (2)**(-alpha*(np.sum(U*U/d, axis=1))) | |
| else: | |
| U = 0 | |
| return U | |
| def EE(hist): | |
| """ | |
| given a list of positive values as a histogram drawn from any information source, | |
| returns the empirical entropy of its discrete probability function. | |
| Parameters | |
| ---------- | |
| hist: array | |
| histogram | |
| Returns | |
| ------- | |
| float | |
| empirical entropy estimated from the histogram | |
| """ | |
| h = np.asarray(hist, dtype=np.float64) | |
| if h.sum() <= 0 or (h < 0).any(): | |
| return 0 | |
| h = h/h.sum() | |
| return -(h*np.ma.log2(h)).sum() | |
| def weightFeature(s, nbins): | |
| ''' | |
| Given a list of values corresponding to a feature dimension, returns a weight (in [0,1]) that is | |
| one minus the normalized empirical entropy, a way to characterize the importance of the feature dimension. | |
| Parameters | |
| ---------- | |
| s: array | |
| list of scalar values corresponding to a feature dimension | |
| nbins: int | |
| the number of bins used to discretize the feature dimension using an histogram. | |
| Returns | |
| ------- | |
| float | |
| the importance weight for feature s. | |
| ''' | |
| if s.min() == s.max(): | |
| return 0 | |
| hist = np.histogram(s, bins=nbins, density=True) | |
| ent = EE(hist[0]) | |
| ent = ent/np.log2(nbins) | |
| return 1-ent | |
| def check_parameter(param, low=MIN_INT, high=MAX_INT, param_name='', | |
| include_left=False, include_right=False): | |
| """Check if an input is within the defined range. | |
| Parameters | |
| ---------- | |
| param : int, float | |
| The input parameter to check. | |
| low : int, float | |
| The lower bound of the range. | |
| high : int, float | |
| The higher bound of the range. | |
| param_name : str, optional (default='') | |
| The name of the parameter. | |
| include_left : bool, optional (default=False) | |
| Whether includes the lower bound (lower bound <=). | |
| include_right : bool, optional (default=False) | |
| Whether includes the higher bound (<= higher bound). | |
| Returns | |
| ------- | |
| within_range : bool or raise errors | |
| Whether the parameter is within the range of (low, high) | |
| """ | |
| # param, low and high should all be numerical | |
| if not isinstance(param, (numbers.Integral, np.integer, float)): | |
| raise TypeError('{param_name} is set to {param} Not numerical'.format( | |
| param=param, param_name=param_name)) | |
| if not isinstance(low, (numbers.Integral, np.integer, float)): | |
| raise TypeError('low is set to {low}. Not numerical'.format(low=low)) | |
| if not isinstance(high, (numbers.Integral, np.integer, float)): | |
| raise TypeError('high is set to {high}. Not numerical'.format( | |
| high=high)) | |
| # at least one of the bounds should be specified | |
| if low is MIN_INT and high is MAX_INT: | |
| raise ValueError('Neither low nor high bounds is undefined') | |
| # if wrong bound values are used | |
| if low > high: | |
| raise ValueError( | |
| 'Lower bound > Higher bound') | |
| # value check under different bound conditions | |
| if (include_left and include_right) and (param < low or param > high): | |
| raise ValueError( | |
| '{param_name} is set to {param}. ' | |
| 'Not in the range of [{low}, {high}].'.format( | |
| param=param, low=low, high=high, param_name=param_name)) | |
| elif (include_left and not include_right) and ( | |
| param < low or param >= high): | |
| raise ValueError( | |
| '{param_name} is set to {param}. ' | |
| 'Not in the range of [{low}, {high}).'.format( | |
| param=param, low=low, high=high, param_name=param_name)) | |
| elif (not include_left and include_right) and ( | |
| param <= low or param > high): | |
| raise ValueError( | |
| '{param_name} is set to {param}. ' | |
| 'Not in the range of ({low}, {high}].'.format( | |
| param=param, low=low, high=high, param_name=param_name)) | |
| elif (not include_left and not include_right) and ( | |
| param <= low or param >= high): | |
| raise ValueError( | |
| '{param_name} is set to {param}. ' | |
| 'Not in the range of ({low}, {high}).'.format( | |
| param=param, low=low, high=high, param_name=param_name)) | |
| else: | |
| return True | |
| def check_detector(detector): | |
| """Checks if fit and decision_function methods exist for given detector | |
| Parameters | |
| ---------- | |
| detector : pyod.models | |
| Detector instance for which the check is performed. | |
| """ | |
| if not hasattr(detector, 'fit') or not hasattr(detector, | |
| 'decision_function'): | |
| raise AttributeError("%s is not a detector instance." % (detector)) | |
| def standardizer(X, X_t=None, keep_scalar=False): | |
| """Conduct Z-normalization on data to turn input samples become zero-mean | |
| and unit variance. | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, n_features) | |
| The training samples | |
| X_t : numpy array of shape (n_samples_new, n_features), optional (default=None) | |
| The data to be converted | |
| keep_scalar : bool, optional (default=False) | |
| The flag to indicate whether to return the scalar | |
| Returns | |
| ------- | |
| X_norm : numpy array of shape (n_samples, n_features) | |
| X after the Z-score normalization | |
| X_t_norm : numpy array of shape (n_samples, n_features) | |
| X_t after the Z-score normalization | |
| scalar : sklearn scalar object | |
| The scalar used in conversion | |
| """ | |
| X = check_array(X) | |
| scaler = StandardScaler().fit(X) | |
| if X_t is None: | |
| if keep_scalar: | |
| return scaler.transform(X), scaler | |
| else: | |
| return scaler.transform(X) | |
| else: | |
| X_t = check_array(X_t) | |
| if X.shape[1] != X_t.shape[1]: | |
| raise ValueError( | |
| "The number of input data feature should be consistent" | |
| "X has {0} features and X_t has {1} features.".format( | |
| X.shape[1], X_t.shape[1])) | |
| if keep_scalar: | |
| return scaler.transform(X), scaler.transform(X_t), scaler | |
| else: | |
| return scaler.transform(X), scaler.transform(X_t) | |
| def score_to_label(pred_scores, outliers_fraction=0.1): | |
| """Turn raw outlier outlier scores to binary labels (0 or 1). | |
| Parameters | |
| ---------- | |
| pred_scores : list or numpy array of shape (n_samples,) | |
| Raw outlier scores. Outliers are assumed have larger values. | |
| outliers_fraction : float in (0,1) | |
| Percentage of outliers. | |
| Returns | |
| ------- | |
| outlier_labels : numpy array of shape (n_samples,) | |
| For each observation, tells whether or not | |
| it should be considered as an outlier according to the | |
| fitted model. Return the outlier probability, ranging | |
| in [0,1]. | |
| """ | |
| # check input values | |
| pred_scores = column_or_1d(pred_scores) | |
| check_parameter(outliers_fraction, 0, 1) | |
| threshold = percentile(pred_scores, 100 * (1 - outliers_fraction)) | |
| pred_labels = (pred_scores > threshold).astype('int') | |
| return pred_labels | |
| def precision_n_scores(y, y_pred, n=None): | |
| """Utility function to calculate precision @ rank n. | |
| Parameters | |
| ---------- | |
| y : list or numpy array of shape (n_samples,) | |
| The ground truth. Binary (0: inliers, 1: outliers). | |
| y_pred : list or numpy array of shape (n_samples,) | |
| The raw outlier scores as returned by a fitted model. | |
| n : int, optional (default=None) | |
| The number of outliers. if not defined, infer using ground truth. | |
| Returns | |
| ------- | |
| precision_at_rank_n : float | |
| Precision at rank n score. | |
| """ | |
| # turn raw prediction decision scores into binary labels | |
| y_pred = get_label_n(y, y_pred, n) | |
| # enforce formats of y and labels_ | |
| y = column_or_1d(y) | |
| y_pred = column_or_1d(y_pred) | |
| return precision_score(y, y_pred) | |
| def get_label_n(y, y_pred, n=None): | |
| """Function to turn raw outlier scores into binary labels by assign 1 | |
| to top n outlier scores. | |
| Parameters | |
| ---------- | |
| y : list or numpy array of shape (n_samples,) | |
| The ground truth. Binary (0: inliers, 1: outliers). | |
| y_pred : list or numpy array of shape (n_samples,) | |
| The raw outlier scores as returned by a fitted model. | |
| n : int, optional (default=None) | |
| The number of outliers. if not defined, infer using ground truth. | |
| Returns | |
| ------- | |
| labels : numpy array of shape (n_samples,) | |
| binary labels 0: normal points and 1: outliers | |
| Examples | |
| -------- | |
| >>> from pyod.utils.utility import get_label_n | |
| >>> y = [0, 1, 1, 0, 0] | |
| >>> y_pred = [0.1, 0.5, 0.3, 0.2, 0.7] | |
| >>> get_label_n(y, y_pred) | |
| array([0, 1, 0, 0, 1]) | |
| """ | |
| # enforce formats of inputs | |
| y = column_or_1d(y) | |
| y_pred = column_or_1d(y_pred) | |
| check_consistent_length(y, y_pred) | |
| y_len = len(y) # the length of targets | |
| # calculate the percentage of outliers | |
| if n is not None: | |
| outliers_fraction = n / y_len | |
| else: | |
| outliers_fraction = np.count_nonzero(y) / y_len | |
| threshold = percentile(y_pred, 100 * (1 - outliers_fraction)) | |
| y_pred = (y_pred > threshold).astype('int') | |
| return y_pred | |
| def get_intersection(lst1, lst2): | |
| """get the overlapping between two lists | |
| Parameters | |
| ---------- | |
| li1 : list or numpy array | |
| Input list 1. | |
| li2 : list or numpy array | |
| Input list 2. | |
| Returns | |
| ------- | |
| difference : list | |
| The overlapping between li1 and li2. | |
| """ | |
| return list(set(lst1) & set(lst2)) | |
| def get_list_diff(li1, li2): | |
| """get the elements in li1 but not li2. li1-li2 | |
| Parameters | |
| ---------- | |
| li1 : list or numpy array | |
| Input list 1. | |
| li2 : list or numpy array | |
| Input list 2. | |
| Returns | |
| ------- | |
| difference : list | |
| The difference between li1 and li2. | |
| """ | |
| return (list(set(li1) - set(li2))) | |
| def get_diff_elements(li1, li2): | |
| """get the elements in li1 but not li2, and vice versa | |
| Parameters | |
| ---------- | |
| li1 : list or numpy array | |
| Input list 1. | |
| li2 : list or numpy array | |
| Input list 2. | |
| Returns | |
| ------- | |
| difference : list | |
| The difference between li1 and li2. | |
| """ | |
| return (list(set(li1) - set(li2)) + list(set(li2) - set(li1))) | |
| def argmaxn(value_list, n, order='desc'): | |
| """Return the index of top n elements in the list | |
| if order is set to 'desc', otherwise return the index of n smallest ones. | |
| Parameters | |
| ---------- | |
| value_list : list, array, numpy array of shape (n_samples,) | |
| A list containing all values. | |
| n : int | |
| The number of elements to select. | |
| order : str, optional (default='desc') | |
| The order to sort {'desc', 'asc'}: | |
| - 'desc': descending | |
| - 'asc': ascending | |
| Returns | |
| ------- | |
| index_list : numpy array of shape (n,) | |
| The index of the top n elements. | |
| """ | |
| value_list = column_or_1d(value_list) | |
| length = len(value_list) | |
| # validate the choice of n | |
| check_parameter(n, 1, length, include_left=True, include_right=True, | |
| param_name='n') | |
| # for the smallest n, flip the value | |
| if order != 'desc': | |
| n = length - n | |
| value_sorted = np.partition(value_list, length - n) | |
| threshold = value_sorted[int(length - n)] | |
| if order == 'desc': | |
| return np.where(np.greater_equal(value_list, threshold))[0] | |
| else: # return the index of n smallest elements | |
| return np.where(np.less(value_list, threshold))[0] | |
| def invert_order(scores, method='multiplication'): | |
| """ Invert the order of a list of values. The smallest value becomes | |
| the largest in the inverted list. This is useful while combining | |
| multiple detectors since their score order could be different. | |
| Parameters | |
| ---------- | |
| scores : list, array or numpy array with shape (n_samples,) | |
| The list of values to be inverted | |
| method : str, optional (default='multiplication') | |
| Methods used for order inversion. Valid methods are: | |
| - 'multiplication': multiply by -1 | |
| - 'subtraction': max(scores) - scores | |
| Returns | |
| ------- | |
| inverted_scores : numpy array of shape (n_samples,) | |
| The inverted list | |
| Examples | |
| -------- | |
| >>> scores1 = [0.1, 0.3, 0.5, 0.7, 0.2, 0.1] | |
| >>> invert_order(scores1) | |
| array([-0.1, -0.3, -0.5, -0.7, -0.2, -0.1]) | |
| >>> invert_order(scores1, method='subtraction') | |
| array([0.6, 0.4, 0.2, 0. , 0.5, 0.6]) | |
| """ | |
| scores = column_or_1d(scores) | |
| if method == 'multiplication': | |
| return scores.ravel() * -1 | |
| if method == 'subtraction': | |
| return (scores.max() - scores).ravel() | |
| def _get_sklearn_version(): # pragma: no cover | |
| """ Utility function to decide the version of sklearn. | |
| PyOD will result in different behaviors with different sklearn version | |
| Returns | |
| ------- | |
| sk_learn version : int | |
| """ | |
| sklearn_version = str(sklearn.__version__) | |
| if int(sklearn_version.split(".")[1]) < 19 or int( | |
| sklearn_version.split(".")[1]) > 23: | |
| raise ValueError("Sklearn version error") | |
| return int(sklearn_version.split(".")[1]) | |
| def _sklearn_version_21(): # pragma: no cover | |
| """ Utility function to decide the version of sklearn | |
| In sklearn 21.0, LOF is changed. Specifically, _decision_function | |
| is replaced by _score_samples | |
| Returns | |
| ------- | |
| sklearn_21_flag : bool | |
| True if sklearn.__version__ is newer than 0.21.0 | |
| """ | |
| sklearn_version = str(sklearn.__version__) | |
| if int(sklearn_version.split(".")[1]) > 20: | |
| return True | |
| else: | |
| return False | |
| def generate_bagging_indices(random_state, bootstrap_features, n_features, | |
| min_features, max_features): | |
| """ Randomly draw feature indices. Internal use only. | |
| Modified from sklearn/ensemble/bagging.py | |
| Parameters | |
| ---------- | |
| random_state : RandomState | |
| A random number generator instance to define the state of the random | |
| permutations generator. | |
| bootstrap_features : bool | |
| Specifies whether to bootstrap indice generation | |
| n_features : int | |
| Specifies the population size when generating indices | |
| min_features : int | |
| Lower limit for number of features to randomly sample | |
| max_features : int | |
| Upper limit for number of features to randomly sample | |
| Returns | |
| ------- | |
| feature_indices : numpy array, shape (n_samples,) | |
| Indices for features to bag | |
| """ | |
| # Get valid random state | |
| random_state = check_random_state(random_state) | |
| # decide number of features to draw | |
| random_n_features = random_state.randint(min_features, max_features) | |
| # Draw indices | |
| feature_indices = generate_indices(random_state, bootstrap_features, | |
| n_features, random_n_features) | |
| return feature_indices | |
| def generate_indices(random_state, bootstrap, n_population, n_samples): | |
| """ Draw randomly sampled indices. Internal use only. | |
| See sklearn/ensemble/bagging.py | |
| Parameters | |
| ---------- | |
| random_state : RandomState | |
| A random number generator instance to define the state of the random | |
| permutations generator. | |
| bootstrap : bool | |
| Specifies whether to bootstrap indice generation | |
| n_population : int | |
| Specifies the population size when generating indices | |
| n_samples : int | |
| Specifies number of samples to draw | |
| Returns | |
| ------- | |
| indices : numpy array, shape (n_samples,) | |
| randomly drawn indices | |
| """ | |
| # Draw sample indices | |
| if bootstrap: | |
| indices = random_state.randint(0, n_population, n_samples) | |
| else: | |
| indices = sample_without_replacement(n_population, n_samples, | |
| random_state=random_state) | |
| return indices | |
| def EuclideanDist(x,y): | |
| return np.sqrt(np.sum((x - y) ** 2)) | |
| def dist2set(x, X): | |
| l=len(X) | |
| ldist=[] | |
| for i in range(l): | |
| ldist.append(EuclideanDist(x,X[i])) | |
| return ldist | |
| def c_factor(n) : | |
| if(n<2): | |
| n=2 | |
| return 2.0*(np.log(n-1)+0.5772156649) - (2.0*(n-1.)/(n*1.0)) | |
| def all_branches(node, current=[], branches = None): | |
| current = current[:node.e] | |
| if branches is None: branches = [] | |
| if node.ntype == 'inNode': | |
| current.append('L') | |
| all_branches(node.left, current=current, branches=branches) | |
| current = current[:-1] | |
| current.append('R') | |
| all_branches(node.right, current=current, branches=branches) | |
| else: | |
| branches.append(current) | |
| return branches | |
| def branch2num(branch, init_root=0): | |
| num = [init_root] | |
| for b in branch: | |
| if b == 'L': | |
| num.append(num[-1] * 2 + 1) | |
| if b == 'R': | |
| num.append(num[-1] * 2 + 2) | |
| return num | |
| def _get_n_jobs(n_jobs): | |
| """Get number of jobs for the computation. | |
| See sklearn/utils/__init__.py for more information. | |
| This function reimplements the logic of joblib to determine the actual | |
| number of jobs depending on the cpu count. If -1 all CPUs are used. | |
| If 1 is given, no parallel computing code is used at all, which is useful | |
| for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. | |
| Thus for n_jobs = -2, all CPUs but one are used. | |
| Parameters | |
| ---------- | |
| n_jobs : int | |
| Number of jobs stated in joblib convention. | |
| Returns | |
| ------- | |
| n_jobs : int | |
| The actual number of jobs as positive integer. | |
| """ | |
| if n_jobs < 0: | |
| return max(cpu_count() + 1 + n_jobs, 1) | |
| elif n_jobs == 0: | |
| raise ValueError('Parameter n_jobs == 0 has no meaning.') | |
| else: | |
| return n_jobs | |
| def _partition_estimators(n_estimators, n_jobs): | |
| """Private function used to partition estimators between jobs. | |
| See sklearn/ensemble/base.py for more information. | |
| """ | |
| # Compute the number of jobs | |
| n_jobs = min(_get_n_jobs(n_jobs), n_estimators) | |
| # Partition estimators between jobs | |
| n_estimators_per_job = (n_estimators // n_jobs) * np.ones(n_jobs, dtype=int) | |
| n_estimators_per_job[:n_estimators % n_jobs] += 1 | |
| starts = np.cumsum(n_estimators_per_job) | |
| return n_jobs, n_estimators_per_job.tolist(), [0] + starts.tolist() | |
| def _pprint(params, offset=0, printer=repr): | |
| # noinspection PyPep8 | |
| """Pretty print the dictionary 'params' | |
| See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html | |
| and sklearn/base.py for more information. | |
| :param params: The dictionary to pretty print | |
| :type params: dict | |
| :param offset: The offset in characters to add at the begin of each line. | |
| :type offset: int | |
| :param printer: The function to convert entries to strings, typically | |
| the builtin str or repr | |
| :type printer: callable | |
| :return: None | |
| """ | |
| # Do a multi-line justified repr: | |
| options = np.get_printoptions() | |
| np.set_printoptions(precision=5, threshold=64, edgeitems=2) | |
| params_list = list() | |
| this_line_length = offset | |
| line_sep = ',\n' + (1 + offset // 2) * ' ' | |
| for i, (k, v) in enumerate(sorted(params.items())): | |
| if type(v) is float: | |
| # use str for representing floating point numbers | |
| # this way we get consistent representation across | |
| # architectures and versions. | |
| this_repr = '%s=%s' % (k, str(v)) | |
| else: | |
| # use repr of the rest | |
| this_repr = '%s=%s' % (k, printer(v)) | |
| if len(this_repr) > 500: | |
| this_repr = this_repr[:300] + '...' + this_repr[-100:] | |
| if i > 0: | |
| if this_line_length + len(this_repr) >= 75 or '\n' in this_repr: | |
| params_list.append(line_sep) | |
| this_line_length = len(line_sep) | |
| else: | |
| params_list.append(', ') | |
| this_line_length += 2 | |
| params_list.append(this_repr) | |
| this_line_length += len(this_repr) | |
| np.set_printoptions(**options) | |
| lines = ''.join(params_list) | |
| # Strip trailing space to avoid nightmare in doctests | |
| lines = '\n'.join(l.rstrip(' ') for l in lines.split('\n')) | |
| return lines | |
| def get_activation_by_name(name): | |
| activations = { | |
| 'relu': nn.ReLU(), | |
| 'sigmoid': nn.Sigmoid(), | |
| 'tanh': nn.Tanh(), | |
| 'leakyrelu':nn.LeakyReLU() | |
| } | |
| if name in activations.keys(): | |
| return activations[name] | |
| else: | |
| raise ValueError(name, "is not a valid activation function") | |
| def get_optimal_n_bins(X, upper_bound=None, epsilon=1): | |
| """ Determine optimal number of bins for a histogram using the Birge | |
| Rozenblac method (see :cite:`birge2006many` for details.) | |
| See https://doi.org/10.1051/ps:2006001 | |
| Parameters | |
| ---------- | |
| X : array-like of shape (n_samples, n_features) | |
| The samples to determine the optimal number of bins for. | |
| upper_bound : int, default=None | |
| The maximum value of n_bins to be considered. | |
| If set to None, np.sqrt(X.shape[0]) will be used as upper bound. | |
| epsilon : float, default = 1 | |
| A stabilizing term added to the logarithm to prevent division by zero. | |
| Returns | |
| ------- | |
| optimal_n_bins : int | |
| The optimal value of n_bins according to the Birge Rozenblac method | |
| """ | |
| if upper_bound is None: | |
| upper_bound = int(np.sqrt(X.shape[0])) | |
| n = X.shape[0] | |
| maximum_likelihood = np.zeros((upper_bound - 1, 1)) | |
| for i, b in enumerate(range(1, upper_bound)): | |
| histogram, _ = np.histogram(X, bins=b) | |
| maximum_likelihood[i] = np.sum( | |
| histogram * np.log(b * histogram / n + epsilon) - ( | |
| b - 1 + np.power(np.log(b), 2.5))) | |
| return np.argmax(maximum_likelihood) + 1 |