#!/usr/local/bin/python3 # Author: Pranab Ghosh # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You may # obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. # Package imports import os import sys import numpy as np import pandas as pd import sklearn as sk from sklearn import preprocessing from sklearn import metrics import random from math import * from decimal import Decimal import pprint from statsmodels.graphics import tsaplots from statsmodels.tsa import stattools as stt from statsmodels.stats import stattools as sstt from sklearn.linear_model import LinearRegression from matplotlib import pyplot as plt from scipy import stats as sta from statsmodels.tsa.seasonal import seasonal_decompose import statsmodels.api as sm from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor from sklearn.svm import OneClassSVM from sklearn.covariance import EllipticEnvelope from sklearn.mixture import GaussianMixture from sklearn.cluster import KMeans from sklearn.decomposition import PCA import hurst from .util import * from .mlutil import * from .sampler import * from .stats import * """ Load data from a CSV file, data frame, numpy array or list Each data set (array like) is given a name while loading Perform various data exploration operation refering to the data sets by name Save and restore workspace if needed """ class DataSetMetaData: """ data set meta data """ dtypeNum = 1 dtypeCat = 2 dtypeBin = 3 def __init__(self, dtype): self.notes = list() self.dtype = dtype def addNote(self, note): """ add note """ self.notes.append(note) class DataExplorer: """ various data exploration functions """ def __init__(self, verbose=True): """ initialize Parameters verbose : True for verbosity """ self.dataSets = dict() self.metaData = dict() self.pp = pprint.PrettyPrinter(indent=4) self.verbose = verbose def setVerbose(self, verbose): """ sets verbose Parameters verbose : True for verbosity """ self.verbose = verbose def save(self, filePath): """ save checkpoint Parameters filePath : path of file where saved """ self.__printBanner("saving workspace") ws = dict() ws["data"] = self.dataSets ws["metaData"] = self.metaData saveObject(ws, filePath) self.__printDone() def restore(self, filePath): """ restore checkpoint Parameters filePath : path of file from where to store """ self.__printBanner("restoring workspace") ws = restoreObject(filePath) self.dataSets = ws["data"] self.metaData = ws["metaData"] self.__printDone() def queryFileData(self, filePath, *columns): """ query column data type from a data file Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("querying column data type from a data frame") lcolumns = list(columns) noHeader = type(lcolumns[0]) == int if noHeader: df = pd.read_csv(filePath, header=None) else: df = pd.read_csv(filePath, header=0) return self.queryDataFrameData(df, *columns) def queryDataFrameData(self, df, *columns): """ query column data type from a data frame Parameters df : data frame with data columns : indexes followed by column name or column names """ self.__printBanner("querying column data type from a data frame") columns = list(columns) noHeader = type(columns[0]) == int dtypes = list() if noHeader: nCols = int(len(columns) / 2) colIndexes = columns[:nCols] cnames = columns[nCols:] nColsDf = len(df.columns) for i in range(nCols): ci = colIndexes[i] assert ci < nColsDf, "col index {} outside range".format(ci) col = df.loc[ : , ci] dtypes.append(self.getDataType(col)) else: cnames = columns for c in columns: col = df[c] dtypes.append(self.getDataType(col)) nt = list(zip(cnames, dtypes)) result = self.__printResult("columns and data types", nt) return result def getDataType(self, col): """ get data type Parameters col : contains data array like """ if isBinary(col): dtype = "binary" elif isInteger(col): dtype = "integer" elif isFloat(col): dtype = "float" elif isCategorical(col): dtype = "categorical" else: dtype = "mixed" return dtype def addFileNumericData(self,filePath, *columns): """ add numeric columns from a file Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("adding numeric columns from a file") self.addFileData(filePath, True, *columns) self.__printDone() def addFileBinaryData(self,filePath, *columns): """ add binary columns from a file Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("adding binary columns from a file") self.addFileData(filePath, False, *columns) self.__printDone() def addFileData(self, filePath, numeric, *columns): """ add columns from a file Parameters filePath : path of file with data numeric : True if numeric False in binary columns : indexes followed by column names or column names """ columns = list(columns) noHeader = type(columns[0]) == int if noHeader: df = pd.read_csv(filePath, header=None) else: df = pd.read_csv(filePath, header=0) self.addDataFrameData(df, numeric, *columns) def addDataFrameNumericData(self,filePath, *columns): """ add numeric columns from a data frame Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("adding numeric columns from a data frame") self.addDataFrameData(filePath, True, *columns) def addDataFrameBinaryData(self,filePath, *columns): """ add binary columns from a data frame Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("adding binary columns from a data frame") self.addDataFrameData(filePath, False, *columns) def addDataFrameData(self, df, numeric, *columns): """ add columns from a data frame Parameters df : data frame with data numeric : True if numeric False in binary columns : indexes followed by column names or column names """ columns = list(columns) noHeader = type(columns[0]) == int if noHeader: nCols = int(len(columns) / 2) colIndexes = columns[:nCols] nColsDf = len(df.columns) for i in range(nCols): ci = colIndexes[i] assert ci < nColsDf, "col index {} outside range".format(ci) col = df.loc[ : , ci] if numeric: assert isNumeric(col), "data is not numeric" else: assert isBinary(col), "data is not binary" col = col.to_numpy() cn = columns[i + nCols] dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin self.__addDataSet(cn, col, dtype) else: for c in columns: col = df[c] if numeric: assert isNumeric(col), "data is not numeric" else: assert isBinary(col), "data is not binary" col = col.to_numpy() dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin self.__addDataSet(c, col, dtype) def __addDataSet(self, dsn, data, dtype): """ add dada set Parameters dsn: data set name data : numpy array data """ self.dataSets[dsn] = data self.metaData[dsn] = DataSetMetaData(dtype) def addListNumericData(self, ds, name): """ add numeric data from a list Parameters ds : list with data name : name of data set """ self.__printBanner("add numeric data from a list") self.addListData(ds, True, name) self.__printDone() def addListBinaryData(self, ds, name): """ add binary data from a list Parameters ds : list with data name : name of data set """ self.__printBanner("adding binary data from a list") self.addListData(ds, False, name) self.__printDone() def addListData(self, ds, numeric, name): """ adds list data Parameters ds : list with data numeric : True if numeric False in binary name : name of data set """ assert type(ds) == list, "data not a list" if numeric: assert isNumeric(ds), "data is not numeric" else: assert isBinary(ds), "data is not binary" dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin self.dataSets[name] = np.array(ds) self.metaData[name] = DataSetMetaData(dtype) def addFileCatData(self, filePath, *columns): """ add categorical columns from a file Parameters filePath : path of file with data columns : indexes followed by column names or column names """ self.__printBanner("adding categorical columns from a file") columns = list(columns) noHeader = type(columns[0]) == int if noHeader: df = pd.read_csv(filePath, header=None) else: df = pd.read_csv(filePath, header=0) self.addDataFrameCatData(df, *columns) self.__printDone() def addDataFrameCatData(self, df, *columns): """ add categorical columns from a data frame Parameters df : data frame with data columns : indexes followed by column names or column names """ self.__printBanner("adding categorical columns from a data frame") columns = list(columns) noHeader = type(columns[0]) == int if noHeader: nCols = int(len(columns) / 2) colIndexes = columns[:nCols] nColsDf = len(df.columns) for i in range(nCols): ci = colIndexes[i] assert ci < nColsDf, "col index {} outside range".format(ci) col = df.loc[ : , ci] assert isCategorical(col), "data is not categorical" col = col.tolist() cn = columns[i + nCols] self.__addDataSet(cn, col, DataSetMetaData.dtypeCat) else: for c in columns: col = df[c].tolist() self.__addDataSet(c, col, DataSetMetaData.dtypeCat) def addListCatData(self, ds, name): """ add categorical list data Parameters ds : list with data name : name of data set """ self.__printBanner("adding categorical list data") assert type(ds) == list, "data not a list" assert isCategorical(ds), "data is not categorical" self.__addDataSet(name, ds, DataSetMetaData.dtypeCat) self.__printDone() def remData(self, ds): """ removes data set Parameters ds : data set name """ self.__printBanner("removing data set", ds) assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) self.dataSets.pop(ds) self.metaData.pop(ds) names = self.showNames() self.__printDone() return names def addNote(self, ds, note): """ get data Parameters ds : data set name or list or numpy array with data note: note text """ self.__printBanner("adding note") assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) mdata = self.metaData[ds] mdata.addNote(note) self.__printDone() def getNotes(self, ds): """ get data Parameters ds : data set name or list or numpy array with data """ self.__printBanner("getting notes") assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) mdata = self.metaData[ds] dnotes = mdata.notes if self.verbose: for dn in dnotes: print(dn) return dnotes def getNumericData(self, ds): """ get numeric data Parameters ds : data set name or list or numpy array with data """ if type(ds) == str: assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) assert self.metaData[ds].dtype == DataSetMetaData.dtypeNum, "data set {} is expected to be numerical type for this operation".format(ds) data = self.dataSets[ds] elif type(ds) == list: assert isNumeric(ds), "data is not numeric" data = np.array(ds) elif type(ds) == np.ndarray: data = ds else: raise "invalid type, expecting data set name, list or ndarray" return data def getCatData(self, ds): """ get categorical data Parameters ds : data set name or list with data """ if type(ds) == str: assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) assert self.metaData[ds].dtype == DataSetMetaData.dtypeCat, "data set {} is expected to be categorical type for this operation".format(ds) data = self.dataSets[ds] elif type(ds) == list: assert isCategorical(ds), "data is not categorical" data = ds else: raise "invalid type, expecting data set name or list" return data def getAnyData(self, ds): """ get any data Parameters ds : data set name or list with data """ if type(ds) == str: assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) data = self.dataSets[ds] elif type(ds) == list: data = ds else: raise "invalid type, expecting data set name or list" return data def loadCatFloatDataFrame(self, ds1, ds2): """ loads float and cat data into data frame Parameters ds1: data set name or list ds2: data set name or list or numpy array """ data1 = self.getCatData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) df1 = pd.DataFrame(data=data1) df2 = pd.DataFrame(data=data2) df = pd.concat([df1,df2], axis=1) df.columns = range(df.shape[1]) return df def showNames(self): """ lists data set names """ self.__printBanner("listing data set names") names = self.dataSets.keys() if self.verbose: print("data sets") for ds in names: print(ds) self.__printDone() return names def plot(self, ds, yscale=None): """ plots data Parameters ds: data set name or list or numpy array yscale: y scale """ self.__printBanner("plotting data", ds) data = self.getNumericData(ds) drawLine(data, yscale) def plotZoomed(self, ds, beg, end, yscale=None): """ plots zoomed data Parameters ds: data set name or list or numpy array beg: begin offset end: end offset yscale: y scale """ self.__printBanner("plotting data", ds) data = self.getNumericData(ds) drawLine(data[beg:end], yscale) def scatterPlot(self, ds1, ds2): """ scatter plots data Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array """ self.__printBanner("scatter plotting data", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) x = np.arange(1, len(data1)+1, 1) plt.scatter(x, data1 ,color="red") plt.scatter(x, data2 ,color="blue") plt.show() def print(self, ds): """ prunt data Parameters ds: data set name or list or numpy array """ self.__printBanner("printing data", ds) assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) data = self.dataSets[ds] if self.verbore: print(formatAny(len(data), "size")) print("showing first 50 elements" ) print(data[:50]) def plotHist(self, ds, cumulative, density, nbins=20): """ plots histogram Parameters ds: data set name or list or numpy array cumulative : True if cumulative density : True to normalize for probability density nbins : no of bins """ self.__printBanner("plotting histogram", ds) data = self.getNumericData(ds) plt.hist(data, bins=nbins, cumulative=cumulative, density=density) plt.show() def isMonotonicallyChanging(self, ds): """ checks if monotonically increasing or decreasing Parameters ds: data set name or list or numpy array """ self.__printBanner("checking monotonic change", ds) data = self.getNumericData(ds) monoIncreasing = all(list(map(lambda i : data[i] >= data[i-1], range(1, len(data), 1)))) monoDecreasing = all(list(map(lambda i : data[i] <= data[i-1], range(1, len(data), 1)))) result = self.__printResult("monoIncreasing", monoIncreasing, "monoDecreasing", monoDecreasing) return result def getFreqDistr(self, ds, nbins=20): """ get histogram Parameters ds: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting histogram", ds) data = self.getNumericData(ds) frequency, lowLimit, binsize, extraPoints = sta.relfreq(data, numbins=nbins) result = self.__printResult("frequency", frequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints) return result def getCumFreqDistr(self, ds, nbins=20): """ get cumulative freq distribution Parameters ds: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting cumulative freq distribution", ds) data = self.getNumericData(ds) cumFrequency, lowLimit, binsize, extraPoints = sta.cumfreq(data, numbins=nbins) result = self.__printResult("cumFrequency", cumFrequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints) return result def getExtremeValue(self, ds, ensamp, nsamp, polarity, doPlotDistr, nbins=20): """ get extreme values Parameters ds: data set name or list or numpy array ensamp: num of samples for extreme values nsamp: num of samples polarity: max or min doPlotDistr: plot distr nbins: num of bins """ self.__printBanner("getting extreme values", ds) data = self.getNumericData(ds) evalues = list() for _ in range(ensamp): values = selectRandomSubListFromListWithRepl(data, nsamp) if polarity == "max": evalues.append(max(values)) else: evalues.append(min(values)) if doPlotDistr: plt.hist(evalues, bins=nbins, cumulative=False, density=True) plt.show() result = self.__printResult("extremeValues", evalues) return result def getEntropy(self, ds, nbins=20): """ get entropy Parameters ds: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting entropy", ds) data = self.getNumericData(ds) result = self.getFreqDistr(data, nbins) entropy = sta.entropy(result["frequency"]) result = self.__printResult("entropy", entropy) return result def getRelEntropy(self, ds1, ds2, nbins=20): """ get relative entropy or KL divergence with both data sets numeric Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting relative entropy or KL divergence", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) result1 = self .getFeqDistr(data1, nbins) freq1 = result1["frequency"] result2 = self .getFeqDistr(data2, nbins) freq2 = result2["frequency"] entropy = sta.entropy(freq1, freq2) result = self.__printResult("relEntropy", entropy) return result def getAnyEntropy(self, ds, dt, nbins=20): """ get entropy of any data typr numeric or categorical Parameters ds: data set name or list or numpy array dt : data type num or cat nbins: num of bins """ entropy = self.getEntropy(ds, nbins)["entropy"] if dt == "num" else self.getStatsCat(ds)["entropy"] result = self.__printResult("entropy", entropy) return result def getJointEntropy(self, ds1, ds2, nbins=20): """ get joint entropy with both data sets numeric Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting join entropy", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) hist, xedges, yedges = np.histogram2d(data1, data2, bins=nbins) hist = hist.flatten() ssize = len(data1) hist = hist / ssize entropy = sta.entropy(hist) result = self.__printResult("jointEntropy", entropy) return result def getAllNumMutualInfo(self, ds1, ds2, nbins=20): """ get mutual information for both numeric data Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array nbins: num of bins """ self.__printBanner("getting mutual information", ds1, ds2) en1 = self.getEntropy(ds1,nbins) en2 = self.getEntropy(ds2,nbins) en = self.getJointEntropy(ds1, ds2, nbins) mutInfo = en1["entropy"] + en2["entropy"] - en["jointEntropy"] result = self.__printResult("mutInfo", mutInfo) return result def getNumCatMutualInfo(self, nds, cds ,nbins=20): """ get mutiual information between numeric and categorical data Parameters nds: numeric data set name or list or numpy array cds: categoric data set name or list nbins: num of bins """ self.__printBanner("getting mutual information of numerical and categorical data", nds, cds) ndata = self.getNumericData(nds) cds = self.getCatData(cds) nentr = self.getEntropy(nds)["entropy"] #conditional entropy cdistr = self.getStatsCat(cds)["distr"] grdata = self.getGroupByData(nds, cds, True)["groupedData"] cnentr = 0 for gr, data in grdata.items(): self.addListNumericData(data, "grdata") gnentr = self.getEntropy("grdata")["entropy"] cnentr += gnentr * cdistr[gr] mutInfo = nentr - cnentr result = self.__printResult("mutInfo", mutInfo, "entropy", nentr, "condEntropy", cnentr) return result def getTwoCatMutualInfo(self, cds1, cds2): """ get mutiual information between 2 categorical data sets Parameters cds1 : categoric data set name or list cds2 : categoric data set name or list """ self.__printBanner("getting mutual information of two categorical data sets", cds1, cds2) cdata1 = self.getCatData(cds1) cdata2 = self.getCatData(cds1) centr = self.getStatsCat(cds1)["entropy"] #conditional entropy cdistr = self.getStatsCat(cds2)["distr"] grdata = self.getGroupByData(cds1, cds2, True)["groupedData"] ccentr = 0 for gr, data in grdata.items(): self.addListCatData(data, "grdata") gcentr = self.getStatsCat("grdata")["entropy"] ccentr += gcentr * cdistr[gr] mutInfo = centr - ccentr result = self.__printResult("mutInfo", mutInfo, "entropy", centr, "condEntropy", ccentr) return result def getMutualInfo(self, dst, nbins=20): """ get mutiual information between 2 data sets,any combination numerical and categorical Parameters dst : data source , data type, data source , data type nbins : num of bins """ assertEqual(len(dst), 4, "invalid data source and data type list size") dtypes = ["num", "cat"] assertInList(dst[1], dtypes, "invalid data type") assertInList(dst[3], dtypes, "invalid data type") self.__printBanner("getting mutual information of any mix numerical and categorical data", dst[0], dst[2]) if dst[1] == "num": mutInfo = self.getAllNumMutualInfo(dst[0], dst[2], nbins)["mutInfo"] if dst[3] == "num" \ else self.getNumCatMutualInfo(dst[0], dst[2], nbins)["mutInfo"] else: mutInfo = self.getNumCatMutualInfo(dst[2], dst[0], nbins)["mutInfo"] if dst[3] == "num" \ else self.getTwoCatMutualInfo(dst[2], dst[0])["mutInfo"] result = self.__printResult("mutInfo", mutInfo) return result def getCondMutualInfo(self, dst, nbins=20): """ get conditional mutiual information between 2 data sets,any combination numerical and categorical Parameters dst : data source , data type, data source , data type, data source , data type nbins : num of bins """ assertEqual(len(dst), 6, "invalid data source and data type list size") dtypes = ["num", "cat"] assertInList(dst[1], dtypes, "invalid data type") assertInList(dst[3], dtypes, "invalid data type") assertInList(dst[5], dtypes, "invalid data type") self.__printBanner("getting conditional mutual information of any mix numerical and categorical data", dst[0], dst[2]) if dst[5] == "cat": cdistr = self.getStatsCat(dst[4])["distr"] grdata1 = self.getGroupByData(dst[0], dst[4], True)["groupedData"] grdata2 = self.getGroupByData(dst[2], dst[4], True)["groupedData"] else: gdata = self.getNumericData(dst[4]) hist = Histogram.createWithNumBins(gdata, nbins) cdistr = hist.distr() grdata1 = self.getGroupByData(dst[0], dst[4], False)["groupedData"] grdata2 = self.getGroupByData(dst[2], dst[4], False)["groupedData"] cminfo = 0 for gr in grdata1.keys(): data1 = grdata1[gr] data2 = grdata2[gr] if dst[1] == "num": self.addListNumericData(data1, "grdata1") else: self.addListCatData(data1, "grdata1") if dst[3] == "num": self.addListNumericData(data2, "grdata2") else: self.addListCatData(data2, "grdata2") gdst = ["grdata1", dst[1], "grdata2", dst[3]] minfo = self.getMutualInfo(gdst, nbins)["mutInfo"] cminfo += minfo * cdistr[gr] result = self.__printResult("condMutInfo", cminfo) return result def getPercentile(self, ds, value): """ gets percentile Parameters ds: data set name or list or numpy array value: the value """ self.__printBanner("getting percentile", ds) data = self.getNumericData(ds) percent = sta.percentileofscore(data, value) result = self.__printResult("value", value, "percentile", percent) return result def getValueRangePercentile(self, ds, value1, value2): """ gets percentile Parameters ds: data set name or list or numpy array value1: first value value2: second value """ self.__printBanner("getting percentile difference for value range", ds) if value1 < value2: v1 = value1 v2 = value2 else: v1 = value2 v2 = value1 data = self.getNumericData(ds) per1 = sta.percentileofscore(data, v1) per2 = sta.percentileofscore(data, v2) result = self.__printResult("valueFirst", value1, "valueSecond", value2, "percentileDiff", per2 - per1) return result def getValueAtPercentile(self, ds, percent): """ gets value at percentile Parameters ds: data set name or list or numpy array percent: percentile """ self.__printBanner("getting value at percentile", ds) data = self.getNumericData(ds) assert isInRange(percent, 0, 100), "percent should be between 0 and 100" value = sta.scoreatpercentile(data, percent) result = self.__printResult("value", value, "percentile", percent) return result def getLessThanValues(self, ds, cvalue): """ gets values less than given value Parameters ds: data set name or list or numpy array cvalue: condition value """ self.__printBanner("getting values less than", ds) fdata = self.__getCondValues(ds, cvalue, "lt") result = self.__printResult("count", len(fdata), "lessThanvalues", fdata ) return result def getGreaterThanValues(self, ds, cvalue): """ gets values greater than given value Parameters ds: data set name or list or numpy array cvalue: condition value """ self.__printBanner("getting values greater than", ds) fdata = self.__getCondValues(ds, cvalue, "gt") result = self.__printResult("count", len(fdata), "greaterThanvalues", fdata ) return result def __getCondValues(self, ds, cvalue, cond): """ gets cinditional values Parameters ds: data set name or list or numpy array cvalue: condition value cond: condition """ data = self.getNumericData(ds) if cond == "lt": ind = np.where(data < cvalue) else: ind = np.where(data > cvalue) fdata = data[ind] return fdata def getUniqueValueCounts(self, ds, maxCnt=10): """ gets unique values and counts Parameters ds: data set name or list or numpy array maxCnt; max value count pairs to return """ self.__printBanner("getting unique values and counts", ds) data = self.getNumericData(ds) values, counts = sta.find_repeats(data) cardinality = len(values) vc = list(zip(values, counts)) vc.sort(key = lambda v : v[1], reverse = True) result = self.__printResult("cardinality", cardinality, "vunique alues and repeat counts", vc[:maxCnt]) return result def getCatUniqueValueCounts(self, ds, maxCnt=10): """ gets unique categorical values and counts Parameters ds: data set name or list or numpy array maxCnt: max value count pairs to return """ self.__printBanner("getting unique categorical values and counts", ds) data = self.getCatData(ds) series = pd.Series(data) uvalues = series.value_counts() values = uvalues.index.tolist() counts = uvalues.tolist() vc = list(zip(values, counts)) vc.sort(key = lambda v : v[1], reverse = True) result = self.__printResult("cardinality", len(values), "unique values and repeat counts", vc[:maxCnt]) return result def getCatAlphaValueCounts(self, ds): """ gets alphabetic value count Parameters ds: data set name or list or numpy array """ self.__printBanner("getting alphabetic value counts", ds) data = self.getCatData(ds) series = pd.Series(data) flags = series.str.isalpha().tolist() count = sum(flags) result = self.__printResult("alphabeticValueCount", count) return result def getCatNumValueCounts(self, ds): """ gets numeric value count Parameters ds: data set name or list or numpy array """ self.__printBanner("getting numeric value counts", ds) data = self.getCatData(ds) series = pd.Series(data) flags = series.str.isnumeric().tolist() count = sum(flags) result = self.__printResult("numericValueCount", count) return result def getCatAlphaNumValueCounts(self, ds): """ gets alpha numeric value count Parameters ds: data set name or list or numpy array """ self.__printBanner("getting alpha numeric value counts", ds) data = self.getCatData(ds) series = pd.Series(data) flags = series.str.isalnum().tolist() count = sum(flags) result = self.__printResult("alphaNumericValueCount", count) return result def getCatAllCharCounts(self, ds): """ gets alphabetic, numeric and special char count list Parameters ds: data set name or list or numpy array """ self.__printBanner("getting alphabetic, numeric and special char counts", ds) data = self.getCatData(ds) counts = list() for d in data: r = getAlphaNumCharCount(d) counts.append(r) result = self.__printResult("allTypeCharCounts", counts) return result def getCatAlphaCharCounts(self, ds): """ gets alphabetic char count list Parameters ds: data set name or list or numpy array """ self.__printBanner("getting alphabetic char counts", ds) data = self.getCatData(ds) counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] counts = list(map(lambda r : r[0], counts)) result = self.__printResult("alphaCharCounts", counts) return result def getCatNumCharCounts(self, ds): """ gets numeric char count list Parameters ds: data set name or list or numpy array """ self.__printBanner("getting numeric char counts", ds) data = self.getCatData(ds) counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] counts = list(map(lambda r : r[1], counts)) result = self.__printResult("numCharCounts", counts) return result def getCatSpecialCharCounts(self, ds): """ gets special char count list Parameters ds: data set name or list or numpy array """ self.__printBanner("getting special char counts", ds) counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] counts = list(map(lambda r : r[2], counts)) result = self.__printResult("specialCharCounts", counts) return result def getCatAlphaCharCountStats(self, ds): """ gets alphabetic char count stats Parameters ds: data set name or list or numpy array """ self.__printBanner("getting alphabetic char count stats", ds) counts = self.getCatAlphaCharCounts(ds)["alphaCharCounts"] nz = counts.count(0) st = self.__getBasicStats(np.array(counts)) result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) return result def getCatNumCharCountStats(self, ds): """ gets numeric char count stats Parameters ds: data set name or list or numpy array """ self.__printBanner("getting numeric char count stats", ds) counts = self.getCatNumCharCounts(ds)["numCharCounts"] nz = counts.count(0) st = self.__getBasicStats(np.array(counts)) result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) return result def getCatSpecialCharCountStats(self, ds): """ gets special char count stats Parameters ds: data set name or list or numpy array """ self.__printBanner("getting special char count stats", ds) counts = self.getCatSpecialCharCounts(ds)["specialCharCounts"] nz = counts.count(0) st = self.__getBasicStats(np.array(counts)) result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) return result def getCatFldLenStats(self, ds): """ gets field length stats Parameters ds: data set name or list or numpy array """ self.__printBanner("getting field length stats", ds) data = self.getCatData(ds) le = list(map(lambda d: len(d), data)) st = self.__getBasicStats(np.array(le)) result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3]) return result def getCatCharCountStats(self, ds, ch): """ gets specified char ocuurence count stats Parameters ds: data set name or list or numpy array ch : character """ self.__printBanner("getting field length stats", ds) data = self.getCatData(ds) counts = list(map(lambda d: d.count(ch), data)) nz = counts.count(0) st = self.__getBasicStats(np.array(counts)) result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) return result def getStats(self, ds, nextreme=5): """ gets summary statistics Parameters ds: data set name or list or numpy array nextreme: num of extreme values """ self.__printBanner("getting summary statistics", ds) data = self.getNumericData(ds) stat = dict() stat["length"] = len(data) stat["min"] = data.min() stat["max"] = data.max() series = pd.Series(data) stat["n smallest"] = series.nsmallest(n=nextreme).tolist() stat["n largest"] = series.nlargest(n=nextreme).tolist() stat["mean"] = data.mean() stat["median"] = np.median(data) mode, modeCnt = sta.mode(data) stat["mode"] = mode[0] stat["mode count"] = modeCnt[0] stat["std"] = np.std(data) stat["skew"] = sta.skew(data) stat["kurtosis"] = sta.kurtosis(data) stat["mad"] = sta.median_absolute_deviation(data) self.pp.pprint(stat) return stat def getStatsCat(self, ds): """ gets summary statistics for categorical data Parameters ds: data set name or list or numpy array """ self.__printBanner("getting summary statistics for categorical data", ds) data = self.getCatData(ds) ch = CatHistogram() for d in data: ch.add(d) mode = ch.getMode() entr = ch.getEntropy() uvalues = ch.getUniqueValues() distr = ch.getDistr() result = self.__printResult("entropy", entr, "mode", mode, "uniqueValues", uvalues, "distr", distr) return result def getGroupByData(self, ds, gds, gdtypeCat, numBins=20): """ group by Parameters ds: data set name or list or numpy array gds: group by data set name or list or numpy array gdtpe : group by data type """ self.__printBanner("getting group by data", ds) data = self.getAnyData(ds) if gdtypeCat: gdata = self.getCatData(gds) else: gdata = self.getNumericData(gds) hist = Histogram.createWithNumBins(gdata, numBins) gdata = list(map(lambda d : hist.bin(d), gdata)) self.ensureSameSize([data, gdata]) groups = dict() for g,d in zip(gdata, data): appendKeyedList(groups, g, d) ve = self.verbose self.verbose = False result = self.__printResult("groupedData", groups) self.verbose = ve return result def getDifference(self, ds, order, doPlot=False): """ gets difference of given order Parameters ds: data set name or list or numpy array order: order of difference doPlot : True for plot """ self.__printBanner("getting difference of given order", ds) data = self.getNumericData(ds) diff = difference(data, order) if doPlot: drawLine(diff) return diff def getTrend(self, ds, doPlot=False): """ get trend Parameters ds: data set name or list or numpy array doPlot: true if plotting needed """ self.__printBanner("getting trend") data = self.getNumericData(ds) sz = len(data) X = list(range(0, sz)) X = np.reshape(X, (sz, 1)) model = LinearRegression() model.fit(X, data) trend = model.predict(X) sc = model.score(X, data) coef = model.coef_ intc = model.intercept_ result = self.__printResult("coeff", coef, "intercept", intc, "r square error", sc, "trend", trend) if doPlot: plt.plot(data) plt.plot(trend) plt.show() return result def getDiffSdNoisiness(self, ds): """ get noisiness based on std dev of first order difference Parameters ds: data set name or list or numpy array """ diff = self.getDifference(ds, 1) noise = np.std(np.array(diff)) result = self.__printResult("noisiness", noise) return result def getMaRmseNoisiness(self, ds, wsize=5): """ gets noisiness based on RMSE with moving average Parameters ds: data set name or list or numpy array wsize : window size """ assert wsize % 2 == 1, "window size must be odd" data = self.getNumericData(ds) wind = data[:wsize] wstat = SlidingWindowStat.initialize(wind.tolist()) whsize = int(wsize / 2) beg = whsize end = len(data) - whsize - 1 sumSq = 0.0 mean = wstat.getStat()[0] diff = data[beg] - mean sumSq += diff * diff for i in range(beg + 1, end, 1): mean = wstat.addGetStat(data[i + whsize])[0] diff = data[i] - mean sumSq += (diff * diff) noise = math.sqrt(sumSq / (len(data) - 2 * whsize)) result = self.__printResult("noisiness", noise) return result def deTrend(self, ds, trend, doPlot=False): """ de trend Parameters ds: data set name or list or numpy array ternd : trend data doPlot: true if plotting needed """ self.__printBanner("doing de trend", ds) data = self.getNumericData(ds) sz = len(data) detrended = list(map(lambda i : data[i]-trend[i], range(sz))) if doPlot: drawLine(detrended) return detrended def getTimeSeriesComponents(self, ds, model, freq, summaryOnly, doPlot=False): """ extracts trend, cycle and residue components of time series Parameters ds: data set name or list or numpy array model : model type freq : seasnality period summaryOnly : True if only summary needed in output doPlot: true if plotting needed """ self.__printBanner("extracting trend, cycle and residue components of time series", ds) assert model == "additive" or model == "multiplicative", "model must be additive or multiplicative" data = self.getNumericData(ds) res = seasonal_decompose(data, model=model, period=freq) if doPlot: res.plot() plt.show() #summar of componenets trend = np.array(removeNan(res.trend)) trendMean = trend.mean() trendSlope = (trend[-1] - trend[0]) / (len(trend) - 1) seasonal = np.array(removeNan(res.seasonal)) seasonalAmp = (seasonal.max() - seasonal.min()) / 2 resid = np.array(removeNan(res.resid)) residueMean = resid.mean() residueStdDev = np.std(resid) if summaryOnly: result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp, "residueMean", residueMean, "residueStdDev", residueStdDev) else: result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp, "residueMean", residueMean, "residueStdDev", residueStdDev, "trend", res.trend, "seasonal", res.seasonal, "residual", res.resid) return result def getGausianMixture(self, ncomp, cvType, ninit, *dsl): """ finds gaussian mixture parameters Parameters ncomp : num of gaussian componenets cvType : co variance type ninit: num of intializations dsl: list of data set name or list or numpy array """ self.__printBanner("getting gaussian mixture parameters", *dsl) assertInList(cvType, ["full", "tied", "diag", "spherical"], "invalid covariance type") dmat = self.__stackData(*dsl) gm = GaussianMixture(n_components=ncomp, covariance_type=cvType, n_init=ninit) gm.fit(dmat) weights = gm.weights_ means = gm.means_ covars = gm.covariances_ converged = gm.converged_ niter = gm.n_iter_ aic = gm.aic(dmat) result = self.__printResult("weights", weights, "mean", means, "covariance", covars, "converged", converged, "num iterations", niter, "aic", aic) return result def getKmeansCluster(self, nclust, ninit, *dsl): """ gets cluster parameters Parameters nclust : num of clusters ninit: num of intializations dsl: list of data set name or list or numpy array """ self.__printBanner("getting kmean cluster parameters", *dsl) dmat = self.__stackData(*dsl) nsamp = dmat.shape[0] km = KMeans(n_clusters=nclust, n_init=ninit) km.fit(dmat) centers = km.cluster_centers_ avdist = sqrt(km.inertia_ / nsamp) niter = km.n_iter_ score = km.score(dmat) result = self.__printResult("centers", centers, "average distance", avdist, "num iterations", niter, "score", score) return result def getPrincComp(self, ncomp, *dsl): """ finds pricipal componenet parameters Parameters ncomp : num of pricipal componenets dsl: list of data set name or list or numpy array """ self.__printBanner("getting principal componenet parameters", *dsl) dmat = self.__stackData(*dsl) nfeat = dmat.shape[1] assertGreater(nfeat, 1, "requires multiple features") assertLesserEqual(ncomp, nfeat, "num of componenets greater than num of features") pca = PCA(n_components=ncomp) pca.fit(dmat) comps = pca.components_ var = pca.explained_variance_ varr = pca.explained_variance_ratio_ svalues = pca.singular_values_ result = self.__printResult("componenets", comps, "variance", var, "variance ratio", varr, "singular values", svalues) return result def getOutliersWithIsoForest(self, contamination, *dsl): """ finds outliers using isolation forest Parameters contamination : proportion of outliers in the data set dsl: list of data set name or list or numpy array """ self.__printBanner("getting outliers using isolation forest", *dsl) assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" dmat = self.__stackData(*dsl) isf = IsolationForest(contamination=contamination, behaviour="new") ypred = isf.fit_predict(dmat) mask = ypred == -1 doul = dmat[mask, :] mask = ypred != -1 dwoul = dmat[mask, :] result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) return result def getOutliersWithLocalFactor(self, contamination, *dsl): """ gets outliers using local outlier factor Parameters contamination : proportion of outliers in the data set dsl: list of data set name or list or numpy array """ self.__printBanner("getting outliers using local outlier factor", *dsl) assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" dmat = self.__stackData(*dsl) lof = LocalOutlierFactor(contamination=contamination) ypred = lof.fit_predict(dmat) mask = ypred == -1 doul = dmat[mask, :] mask = ypred != -1 dwoul = dmat[mask, :] result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) return result def getOutliersWithSupVecMach(self, nu, *dsl): """ gets outliers using one class svm Parameters nu : upper bound on the fraction of training errors and a lower bound of the fraction of support vectors dsl: list of data set name or list or numpy array """ self.__printBanner("getting outliers using one class svm", *dsl) assert nu >= 0 and nu <= 0.5, "error upper bound outside valid range" dmat = self.__stackData(*dsl) svm = OneClassSVM(nu=nu) ypred = svm.fit_predict(dmat) mask = ypred == -1 doul = dmat[mask, :] mask = ypred != -1 dwoul = dmat[mask, :] result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) return result def getOutliersWithCovarDeterminant(self, contamination, *dsl): """ gets outliers using covariance determinan Parameters contamination : proportion of outliers in the data set dsl: list of data set name or list or numpy array """ self.__printBanner("getting outliers using using covariance determinant", *dsl) assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" dmat = self.__stackData(*dsl) lof = EllipticEnvelope(contamination=contamination) ypred = lof.fit_predict(dmat) mask = ypred == -1 doul = dmat[mask, :] mask = ypred != -1 dwoul = dmat[mask, :] result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) return result def getOutliersWithZscore(self, ds, zthreshold, stats=None): """ gets outliers using zscore Parameters ds: data set name or list or numpy array zthreshold : z score threshold stats : tuple cintaining mean and std dev """ self.__printBanner("getting outliers using zscore", ds) data = self.getNumericData(ds) if stats is None: mean = data.mean() sd = np.std(data) else: mean = stats[0] sd = stats[1] zs = list(map(lambda d : abs((d - mean) / sd), data)) outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(zs))) result = self.__printResult("outliers", outliers) return result def getOutliersWithRobustZscore(self, ds, zthreshold, stats=None): """ gets outliers using robust zscore Parameters ds: data set name or list or numpy array zthreshold : z score threshold stats : tuple containing median and median absolute deviation """ self.__printBanner("getting outliers using robust zscore", ds) data = self.getNumericData(ds) if stats is None: med = np.median(data) dev = np.array(list(map(lambda d : abs(d - med), data))) mad = 1.4296 * np.median(dev) else: med = stats[0] mad = stats[1] rzs = list(map(lambda d : abs((d - med) / mad), data)) outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(rzs))) result = self.__printResult("outliers", outliers) return result def getSubsequenceOutliersWithDissimilarity(self, subSeqSize, ds): """ gets subsequence outlier with subsequence pairwise disimilarity Parameters subSeqSize : sub sequence size ds: data set name or list or numpy array """ self.__printBanner("doing sub sequence anomaly detection with dissimilarity", ds) data = self.getNumericData(ds) sz = len(data) dist = dict() minDist = dict() for i in range(sz - subSeqSize): #first window w1 = data[i : i + subSeqSize] dmin = None for j in range(sz - subSeqSize): #second window not overlapping with the first if j + subSeqSize <=i or j >= i + subSeqSize: w2 = data[j : j + subSeqSize] k = (j,i) if k in dist: d = dist[k] else: d = euclideanDistance(w1,w2) k = (i,j) dist[k] = d if dmin is None: dmin = d else: dmin = d if d < dmin else dmin minDist[i] = dmin #find max of min dmax = None offset = None for k in minDist.keys(): d = minDist[k] if dmax is None: dmax = d offset = k else: if d > dmax: dmax = d offset = k result = self.__printResult("subSeqOffset", offset, "outlierScore", dmax) return result def getNullCount(self, ds): """ get count of null fields Parameters ds : data set name or list or numpy array with data """ self.__printBanner("getting null value count", ds) if type(ds) == str: assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) data = self.dataSets[ds] ser = pd.Series(data) elif type(ds) == list or type(ds) == np.ndarray: ser = pd.Series(ds) data = ds else: raise ValueError("invalid data type") nv = ser.isnull().tolist() nullCount = nv.count(True) nullFraction = nullCount / len(data) result = self.__printResult("nullFraction", nullFraction, "nullCount", nullCount) return result def fitLinearReg(self, dsx, ds, doPlot=False): """ fit linear regression Parameters dsx: x data set name or None ds: data set name or list or numpy array doPlot: true if plotting needed """ self.__printBanner("fitting linear regression", ds) data = self.getNumericData(ds) if dsx is None: x = np.arange(len(data)) else: x = self.getNumericData(dsx) slope, intercept, rvalue, pvalue, stderr = sta.linregress(x, data) result = self.__printResult("slope", slope, "intercept", intercept, "rvalue", rvalue, "pvalue", pvalue, "stderr", stderr) if doPlot: self.regFitPlot(x, data, slope, intercept) return result def fitSiegelRobustLinearReg(self, ds, doPlot=False): """ siegel robust linear regression fit based on median Parameters ds: data set name or list or numpy array doPlot: true if plotting needed """ self.__printBanner("fitting siegel robust linear regression based on median", ds) data = self.getNumericData(ds) slope , intercept = sta.siegelslopes(data) result = self.__printResult("slope", slope, "intercept", intercept) if doPlot: x = np.arange(len(data)) self.regFitPlot(x, data, slope, intercept) return result def fitTheilSenRobustLinearReg(self, ds, doPlot=False): """ thiel sen robust linear fit regression based on median Parameters ds: data set name or list or numpy array doPlot: true if plotting needed """ self.__printBanner("fitting thiel sen robust linear regression based on median", ds) data = self.getNumericData(ds) slope, intercept, loSlope, upSlope = sta.theilslopes(data) result = self.__printResult("slope", slope, "intercept", intercept, "lower slope", loSlope, "upper slope", upSlope) if doPlot: x = np.arange(len(data)) self.regFitPlot(x, data, slope, intercept) return result def plotRegFit(self, x, y, slope, intercept): """ plot linear rgeression fit line Parameters x : x values y : y values slope : slope intercept : intercept """ self.__printBanner("plotting linear rgeression fit line") fig = plt.figure() ax = fig.add_subplot(111) ax.plot(x, y, "b.") ax.plot(x, intercept + slope * x, "r-") plt.show() def getRegFit(self, xvalues, yvalues, slope, intercept): """ gets fitted line and residue Parameters x : x values y : y values slope : regression slope intercept : regressiob intercept """ yfit = list() residue = list() for x,y in zip(xvalues, yvalues): yf = x * slope + intercept yfit.append(yf) r = y - yf residue.append(r) result = self.__printResult("fitted line", yfit, "residue", residue) return result def getInfluentialPoints(self, dsx, dsy): """ gets influential points in regression model with Cook's distance Parameters dsx : data set name or list or numpy array for x dsy : data set name or list or numpy array for y """ self.__printBanner("finding influential points for linear regression", dsx, dsy) y = self.getNumericData(dsy) x = np.arange(len(data)) if dsx is None else self.getNumericData(dsx) model = sm.OLS(y, x).fit() np.set_printoptions(suppress=True) influence = model.get_influence() cooks = influence.cooks_distance result = self.__printResult("Cook distance", cooks) return result def getCovar(self, *dsl): """ gets covariance Parameters dsl: list of data set name or list or numpy array """ self.__printBanner("getting covariance", *dsl) data = list(map(lambda ds : self.getNumericData(ds), dsl)) self.ensureSameSize(data) data = np.vstack(data) cv = np.cov(data) print(cv) return cv def getPearsonCorr(self, ds1, ds2, sigLev=.05): """ gets pearson correlation coefficient Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array """ self.__printBanner("getting pearson correlation coefficient ", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) stat, pvalue = sta.pearsonr(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def getSpearmanRankCorr(self, ds1, ds2, sigLev=.05): """ gets spearman correlation coefficient Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("getting spearman correlation coefficient",ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) stat, pvalue = sta.spearmanr(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def getKendalRankCorr(self, ds1, ds2, sigLev=.05): """ kendall’s tau, a correlation measure for ordinal data Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("getting kendall’s tau, a correlation measure for ordinal data", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) stat, pvalue = sta.kendalltau(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def getPointBiserialCorr(self, ds1, ds2, sigLev=.05): """ point biserial correlation between binary and numeric Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("getting point biserial correlation between binary and numeric", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) assert isBinary(data1), "first data set is not binary" self.ensureSameSize([data1, data2]) stat, pvalue = sta.pointbiserialr(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def getConTab(self, ds1, ds2): """ get contingency table for categorical data pair Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array """ self.__printBanner("getting contingency table for categorical data", ds1, ds2) data1 = self.getCatData(ds1) data2 = self.getCatData(ds2) self.ensureSameSize([data1, data2]) crosstab = pd.crosstab(pd.Series(data1), pd.Series(data2), margins = False) ctab = crosstab.values print("contingency table") print(ctab) return ctab def getChiSqCorr(self, ds1, ds2, sigLev=.05): """ chi square correlation for categorical data pair Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("getting chi square correlation for two categorical", ds1, ds2) ctab = self.getConTab(ds1, ds2) stat, pvalue, dof, expctd = sta.chi2_contingency(ctab) result = self.__printResult("stat", stat, "pvalue", pvalue, "dof", dof, "expected", expctd) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def getSizeCorrectChiSqCorr(self, ds1, ds2, chisq): """ cramerV size corrected chi square correlation for categorical data pair Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array chisq: chisq stat """ self.__printBanner("getting size corrected chi square correlation for two categorical", ds1, ds2) c1 = self.getCatUniqueValueCounts(ds1)["cardinality"] c2 = self.getCatUniqueValueCounts(ds2)["cardinality"] c = min(c1,c2) assertGreater(c, 1, "min cardinality should be greater than 1") l = len(self.getCatData(ds1)) t = l * (c - 1) stat = math.sqrt(chisq / t) result = self.__printResult("stat", stat) return result def getAnovaCorr(self, ds1, ds2, grByCol, sigLev=.05): """ anova correlation for numerical categorical Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array grByCol : group by column sigLev: statistical significance level """ self.__printBanner("anova correlation for numerical categorical", ds1, ds2) df = self.loadCatFloatDataFrame(ds1, ds2) if grByCol == 0 else self.loadCatFloatDataFrame(ds2, ds1) grByCol = 0 dCol = 1 grouped = df.groupby([grByCol]) dlist = list(map(lambda v : v[1].loc[:, dCol].values, grouped)) stat, pvalue = sta.f_oneway(*dlist) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) return result def plotAutoCorr(self, ds, lags, alpha, diffOrder=0): """ plots auto correlation Parameters ds: data set name or list or numpy array lags: num of lags alpha: confidence level """ self.__printBanner("plotting auto correlation", ds) data = self.getNumericData(ds) ddata = difference(data, diffOrder) if diffOrder > 0 else data tsaplots.plot_acf(ddata, lags = lags, alpha = alpha) plt.show() def getAutoCorr(self, ds, lags, alpha=.05): """ gets auts correlation Parameters ds: data set name or list or numpy array lags: num of lags alpha: confidence level """ self.__printBanner("getting auto correlation", ds) data = self.getNumericData(ds) autoCorr, confIntv = stt.acf(data, nlags=lags, fft=False, alpha=alpha) result = self.__printResult("autoCorr", autoCorr, "confIntv", confIntv) return result def plotParAcf(self, ds, lags, alpha): """ partial auto correlation Parameters ds: data set name or list or numpy array lags: num of lags alpha: confidence level """ self.__printBanner("plotting partial auto correlation", ds) data = self.getNumericData(ds) tsaplots.plot_pacf(data, lags = lags, alpha = alpha) plt.show() def getParAutoCorr(self, ds, lags, alpha=.05): """ gets partial auts correlation Parameters ds: data set name or list or numpy array lags: num of lags alpha: confidence level """ self.__printBanner("getting partial auto correlation", ds) data = self.getNumericData(ds) partAutoCorr, confIntv = stt.pacf(data, nlags=lags, alpha=alpha) result = self.__printResult("partAutoCorr", partAutoCorr, "confIntv", confIntv) return result def getHurstExp(self, ds, kind, doPlot=True): """ gets Hurst exponent of time series Parameters ds: data set name or list or numpy array kind: kind of data change, random_walk, price doPlot: True for plot """ self.__printBanner("getting Hurst exponent", ds) data = self.getNumericData(ds) h, c, odata = hurst.compute_Hc(data, kind=kind, simplified=False) if doPlot: f, ax = plt.subplots() ax.plot(odata[0], c * odata[0] ** h, color="deepskyblue") ax.scatter(odata[0], odata[1], color="purple") ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel("time interval") ax.set_ylabel("cum dev range and std dev ratio") ax.grid(True) plt.show() result = self.__printResult("hurstExponent", h, "hurstConstant", c) return result def approxEntropy(self, ds, m, r): """ gets apprx entroty of time series (ref: wikipedia) Parameters ds: data set name or list or numpy array m: length of compared run of data r: filtering level """ self.__printBanner("getting approximate entropy", ds) ldata = self.getNumericData(ds) aent = abs(self.__phi(ldata, m + 1, r) - self.__phi(ldata, m, r)) result = self.__printResult("approxEntropy", aent) return result def __phi(self, ldata, m, r): """ phi function for approximate entropy Parameters ldata: data array m: length of compared run of data r: filtering level """ le = len(ldata) x = [[ldata[j] for j in range(i, i + m - 1 + 1)] for i in range(le - m + 1)] lex = len(x) c = list() for i in range(lex): cnt = 0 for j in range(lex): cnt += (1 if maxListDist(x[i], x[j]) <= r else 0) cnt /= (le - m + 1.0) c.append(cnt) return sum(np.log(c)) / (le - m + 1.0) def oneSpaceEntropy(self, ds, scaMethod="zscale"): """ gets one space entroty (ref: Estimating mutual information by Kraskov) Parameters ds: data set name or list or numpy array """ self.__printBanner("getting one space entropy", ds) data = self.getNumericData(ds) sdata = sorted(data) sdata = scaleData(sdata, scaMethod) su = 0 n = len(sdata) for i in range(1, n, 1): t = abs(sdata[i] - sdata[i-1]) if t > 0: su += log(t) su /= (n -1) #print(su) ose = digammaFun(n) - digammaFun(1) + su result = self.__printResult("entropy", ose) return result def plotCrossCorr(self, ds1, ds2, normed, lags): """ plots cross correlation Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array normed: If True, input vectors are normalised to unit lags: num of lags """ self.__printBanner("plotting cross correlation between two numeric", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) plt.xcorr(data1, data2, normed=normed, maxlags=lags) plt.show() def getCrossCorr(self, ds1, ds2): """ gets cross correlation Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array """ self.__printBanner("getting cross correlation", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) self.ensureSameSize([data1, data2]) crossCorr = stt.ccf(data1, data2) result = self.__printResult("crossCorr", crossCorr) return result def getFourierTransform(self, ds): """ gets fast fourier transform Parameters ds: data set name or list or numpy array """ self.__printBanner("getting fourier transform", ds) data = self.getNumericData(ds) ft = np.fft.rfft(data) result = self.__printResult("fourierTransform", ft) return result def testStationaryAdf(self, ds, regression, autolag, sigLev=.05): """ Adf stationary test null hyp not stationary Parameters ds: data set name or list or numpy array regression: constant and trend order to include in regression autolag: method to use when automatically determining the lag sigLev: statistical significance level """ self.__printBanner("doing ADF stationary test", ds) relist = ["c","ct","ctt","nc"] assert regression in relist, "invalid regression value" alList = ["AIC", "BIC", "t-stat", None] assert autolag in alList, "invalid autolag value" data = self.getNumericData(ds) re = stt.adfuller(data, regression=regression, autolag=autolag) result = self.__printResult("stat", re[0], "pvalue", re[1] , "num lags", re[2] , "num observation for regression", re[3], "critial values", re[4]) self.__printStat(re[0], re[1], "probably not stationary", "probably stationary", sigLev) return result def testStationaryKpss(self, ds, regression, nlags, sigLev=.05): """ Kpss stationary test null hyp stationary Parameters ds: data set name or list or numpy array regression: constant and trend order to include in regression nlags : no of lags sigLev: statistical significance level """ self.__printBanner("doing KPSS stationary test", ds) relist = ["c","ct"] assert regression in relist, "invalid regression value" nlList =[None, "auto", "legacy"] assert nlags in nlList or type(nlags) == int, "invalid nlags value" data = self.getNumericData(ds) stat, pvalue, nLags, criticalValues = stt.kpss(data, regression=regression, lags=nlags) result = self.__printResult("stat", stat, "pvalue", pvalue, "num lags", nLags, "critial values", criticalValues) self.__printStat(stat, pvalue, "probably stationary", "probably not stationary", sigLev) return result def testNormalJarqBera(self, ds, sigLev=.05): """ jarque bera normalcy test Parameters ds: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing ajrque bera normalcy test", ds) data = self.getNumericData(ds) jb, jbpv, skew, kurtosis = sstt.jarque_bera(data) result = self.__printResult("stat", jb, "pvalue", jbpv, "skew", skew, "kurtosis", kurtosis) self.__printStat(jb, jbpv, "probably gaussian", "probably not gaussian", sigLev) return result def testNormalShapWilk(self, ds, sigLev=.05): """ shapiro wilks normalcy test Parameters ds: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing shapiro wilks normalcy test", ds) data = self.getNumericData(ds) stat, pvalue = sta.shapiro(data) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev) return result def testNormalDagast(self, ds, sigLev=.05): """ D’Agostino’s K square normalcy test Parameters ds: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing D’Agostino’s K square normalcy test", ds) data = self.getNumericData(ds) stat, pvalue = sta.normaltest(data) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev) return result def testDistrAnderson(self, ds, dist, sigLev=.05): """ Anderson test for normal, expon, logistic, gumbel, gumbel_l, gumbel_r Parameters ds: data set name or list or numpy array dist: type of distribution sigLev: statistical significance level """ self.__printBanner("doing Anderson test for for various distributions", ds) diList = ["norm", "expon", "logistic", "gumbel", "gumbel_l", "gumbel_r", "extreme1"] assert dist in diList, "invalid distribution" data = self.getNumericData(ds) re = sta.anderson(data) slAlpha = int(100 * sigLev) msg = "significnt value not found" for i in range(len(re.critical_values)): sl, cv = re.significance_level[i], re.critical_values[i] if int(sl) == slAlpha: if re.statistic < cv: msg = "probably {} at the {:.3f} siginificance level".format(dist, sl) else: msg = "probably not {} at the {:.3f} siginificance level".format(dist, sl) result = self.__printResult("stat", re.statistic, "test", msg) print(msg) return result def testSkew(self, ds, sigLev=.05): """ test skew wrt normal distr Parameters ds: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("testing skew wrt normal distr", ds) data = self.getNumericData(ds) stat, pvalue = sta.skewtest(data) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same skew as normal distribution", "probably not same skew as normal distribution", sigLev) return result def testTwoSampleStudent(self, ds1, ds2, sigLev=.05): """ student t 2 sample test Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing student t 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.ttest_ind(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) return result def testTwoSampleKs(self, ds1, ds2, sigLev=.05): """ Kolmogorov Sminov 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Kolmogorov Sminov 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.ks_2samp(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) def testTwoSampleMw(self, ds1, ds2, sigLev=.05): """ Mann-Whitney 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Mann-Whitney 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.mannwhitneyu(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) def testTwoSampleWilcox(self, ds1, ds2, sigLev=.05): """ Wilcoxon Signed-Rank 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Wilcoxon Signed-Rank 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.wilcoxon(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) def testTwoSampleKw(self, ds1, ds2, sigLev=.05): """ Kruskal-Wallis 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Kruskal-Wallis 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.kruskal(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably snot ame distribution", sigLev) def testTwoSampleFriedman(self, ds1, ds2, ds3, sigLev=.05): """ Friedman 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Friedman 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) data3 = self.getNumericData(ds3) stat, pvalue = sta.friedmanchisquare(data1, data2, data3) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) def testTwoSampleEs(self, ds1, ds2, sigLev=.05): """ Epps Singleton 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Epps Singleton 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.epps_singleton_2samp(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) def testTwoSampleAnderson(self, ds1, ds2, sigLev=.05): """ Anderson 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Anderson 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) dseq = (data1, data2) stat, critValues, sLev = sta.anderson_ksamp(dseq) slAlpha = 100 * sigLev if slAlpha == 10: cv = critValues[1] elif slAlpha == 5: cv = critValues[2] elif slAlpha == 2.5: cv = critValues[3] elif slAlpha == 1: cv = critValues[4] else: cv = None result = self.__printResult("stat", stat, "critValues", critValues, "critValue", cv, "significanceLevel", sLev) print("stat: {:.3f}".format(stat)) if cv is None: msg = "critical values value not found for provided siginificance level" else: if stat < cv: msg = "probably same distribution at the {:.3f} siginificance level".format(sigLev) else: msg = "probably not same distribution at the {:.3f} siginificance level".format(sigLev) print(msg) return result def testTwoSampleScaleAb(self, ds1, ds2, sigLev=.05): """ Ansari Bradley 2 sample scale statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.ansari(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev) return result def testTwoSampleScaleMood(self, ds1, ds2, sigLev=.05): """ Mood 2 sample scale statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Mood 2 sample scale test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.mood(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev) return result def testTwoSampleVarBartlet(self, ds1, ds2, sigLev=.05): """ Ansari Bradley 2 sample scale statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.bartlett(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) return result def testTwoSampleVarLevene(self, ds1, ds2, sigLev=.05): """ Levene 2 sample variance statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Levene 2 sample variance test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.levene(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) return result def testTwoSampleVarFk(self, ds1, ds2, sigLev=.05): """ Fligner-Killeen 2 sample variance statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Fligner-Killeen 2 sample variance test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue = sta.fligner(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue) self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) return result def testTwoSampleMedMood(self, ds1, ds2, sigLev=.05): """ Mood 2 sample median statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Mood 2 sample median test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat, pvalue, median, ctable = sta.median_test(data1, data2) result = self.__printResult("stat", stat, "pvalue", pvalue, "median", median, "contigencyTable", ctable) self.__printStat(stat, pvalue, "probably same median", "probably not same median", sigLev) return result def testTwoSampleZc(self, ds1, ds2, sigLev=.05): """ Zhang-C 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Zhang-C 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) l1 = len(data1) l2 = len(data2) l = l1 + l2 #find ranks pooled = np.concatenate([data1, data2]) ranks = findRanks(data1, pooled) ranks.extend(findRanks(data2, pooled)) s1 = 0.0 for i in range(1, l1+1): s1 += math.log(l1 / (i - 0.5) - 1.0) * math.log(l / (ranks[i-1] - 0.5) - 1.0) s2 = 0.0 for i in range(1, l2+1): s2 += math.log(l2 / (i - 0.5) - 1.0) * math.log(l / (ranks[l1 + i - 1] - 0.5) - 1.0) stat = (s1 + s2) / l print(formatFloat(3, stat, "stat:")) return stat def testTwoSampleZa(self, ds1, ds2, sigLev=.05): """ Zhang-A 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Zhang-A 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) l1 = len(data1) l2 = len(data2) l = l1 + l2 pooled = np.concatenate([data1, data2]) cd1 = CumDistr(data1) cd2 = CumDistr(data2) sum = 0.0 for i in range(1, l+1): v = pooled[i-1] f1 = cd1.getDistr(v) f2 = cd2.getDistr(v) t1 = f1 * math.log(f1) t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log(1.0 - f1) sum += l1 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5)) t1 = f2 * math.log(f2) t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log(1.0 - f2) sum += l2 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5)) stat = -sum print(formatFloat(3, stat, "stat:")) return stat def testTwoSampleZk(self, ds1, ds2, sigLev=.05): """ Zhang-K 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing Zhang-K 2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) l1 = len(data1) l2 = len(data2) l = l1 + l2 pooled = np.concatenate([data1, data2]) cd1 = CumDistr(data1) cd2 = CumDistr(data2) cd = CumDistr(pooled) maxStat = None for i in range(1, l+1): v = pooled[i-1] f1 = cd1.getDistr(v) f2 = cd2.getDistr(v) f = cd.getDistr(v) t1 = 0 if f1 == 0 else f1 * math.log(f1 / f) t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log((1.0 - f1) / (1.0 - f)) stat = l1 * (t1 + t2) t1 = 0 if f2 == 0 else f2 * math.log(f2 / f) t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log((1.0 - f2) / (1.0 - f)) stat += l2 * (t1 + t2) if maxStat is None or stat > maxStat: maxStat = stat print(formatFloat(3, maxStat, "stat:")) return maxStat def testTwoSampleCvm(self, ds1, ds2, sigLev=.05): """ 2 sample cramer von mises Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array sigLev: statistical significance level """ self.__printBanner("doing 2 sample CVM test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) data = np.concatenate((data1,data2)) rdata = sta.rankdata(data) n = len(data1) m = len(data2) l = n + m s1 = 0 for i in range(n): t = rdata[i] - (i+1) s1 += (t * t) s1 *= n s2 = 0 for i in range(m): t = rdata[i + n] - (i+1) s2 += (t * t) s2 *= m u = s1 + s2 stat = u / (n * m * l) - (4 * m * n - 1) / (6 * l) result = self.__printResult("stat", stat) return result def ensureSameSize(self, dlist): """ ensures all data sets are of same size Parameters dlist : data source list """ le = None for d in dlist: cle = len(d) if le is None: le = cle else: assert cle == le, "all data sets need to be of same size" def testTwoSampleWasserstein(self, ds1, ds2): """ Wasserstein 2 sample statistic Parameters ds1: data set name or list or numpy array ds2: data set name or list or numpy array """ self.__printBanner("doing Wasserstein distance2 sample test", ds1, ds2) data1 = self.getNumericData(ds1) data2 = self.getNumericData(ds2) stat = sta.wasserstein_distance(data1, data2) sd = np.std(np.concatenate([data1, data2])) nstat = stat / sd result = self.__printResult("stat", stat, "normalizedStat", nstat) return result def getMaxRelMinRedFeatures(self, fdst, tdst, nfeatures, nbins=20): """ get top n features based on max relevance and min redudancy algorithm Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nfeatures : desired no of features nbins : no of bins for numerical data """ self.__printBanner("doing max relevance min redundancy feature selection") return self.getMutInfoFeatures(fdst, tdst, nfeatures, "mrmr", nbins) def getJointMutInfoFeatures(self, fdst, tdst, nfeatures, nbins=20): """ get top n features based on joint mutual infoormation algorithm Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nfeatures : desired no of features nbins : no of bins for numerical data """ self.__printBanner("doingjoint mutual info feature selection") return self.getMutInfoFeatures(fdst, tdst, nfeatures, "jmi", nbins) def getCondMutInfoMaxFeatures(self, fdst, tdst, nfeatures, nbins=20): """ get top n features based on condition mutual information maximization algorithm Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nfeatures : desired no of features nbins : no of bins for numerical data """ self.__printBanner("doing conditional mutual info max feature selection") return self.getMutInfoFeatures(fdst, tdst, nfeatures, "cmim", nbins) def getInteractCapFeatures(self, fdst, tdst, nfeatures, nbins=20): """ get top n features based on interaction capping algorithm Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nfeatures : desired no of features nbins : no of bins for numerical data """ self.__printBanner("doing interaction capped feature selection") return self.getMutInfoFeatures(fdst, tdst, nfeatures, "icap", nbins) def getMutInfoFeatures(self, fdst, tdst, nfeatures, algo, nbins=20): """ get top n features based on various mutual information based algorithm ref: Conditional likelihood maximisation : A unifying framework for information theoretic feature selection, Gavin Brown Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nfeatures : desired no of features algo: mi based feature selection algorithm nbins : no of bins for numerical data """ #verify data source types types le = len(fdst) nfeatGiven = int(le / 2) assertGreater(nfeatGiven, nfeatures, "no of features should be greater than no of features to be selected") fds = list() types = ["num", "cat"] for i in range (0, le, 2): ds = fdst[i] dt = fdst[i+1] assertInList(dt, types, "invalid type for data source " + dt) data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) p =(ds, dt) fds.append(p) algos = ["mrmr", "jmi", "cmim", "icap"] assertInList(algo, algos, "invalid feature selection algo " + algo) assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) #print(fds) sfds = list() selected = set() relevancies = dict() for i in range(nfeatures): #print(i) scorem = None dsm = None dsmt = None for ds, dt in fds: #print(ds, dt) if ds not in selected: #relevancy if ds in relevancies: mutInfo = relevancies[ds] else: mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"] relevancies[ds] = mutInfo relev = mutInfo #print("relev", relev) #redundancy smi = 0 reds = list() for sds, sdt, _ in sfds: #print(sds, sdt) mutInfo = self.getMutualInfo([ds, dt, sds, sdt], nbins)["mutInfo"] mutInfoCnd = self.getCondMutualInfo([ds, dt, sds, sdt, tdst[0], tdst[1]], nbins)["condMutInfo"] \ if algo != "mrmr" else 0 red = mutInfo - mutInfoCnd reds.append(red) if algo == "mrmr" or algo == "jmi": redun = sum(reds) / len(sfds) if len(sfds) > 0 else 0 elif algo == "cmim" or algo == "icap": redun = max(reds) if len(sfds) > 0 else 0 if algo == "icap": redun = max(0, redun) #print("redun", redun) score = relev - redun if scorem is None or score > scorem: scorem = score dsm = ds dsmt = dt pa = (dsm, dsmt, scorem) #print(pa) sfds.append(pa) selected.add(dsm) selFeatures = list(map(lambda r : (r[0], r[2]), sfds)) result = self.__printResult("selFeatures", selFeatures) return result def getFastCorrFeatures(self, fdst, tdst, delta, nbins=20): """ get top features based on Fast Correlation Based Filter (FCBF) ref: Feature Selection for High-Dimensional Data: A Fast Correlation-Based Filter Solution Lei Yu Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) delta : feature, target correlation threshold nbins : no of bins for numerical data """ le = len(fdst) nfeatGiven = int(le / 2) fds = list() types = ["num", "cat"] for i in range (0, le, 2): ds = fdst[i] dt = fdst[i+1] assertInList(dt, types, "invalid type for data source " + dt) data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) p =(ds, dt) fds.append(p) assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) # get features with symetric uncertainty above threshold tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"] rfeatures = list() fentrs = dict() for ds, dt in fds: mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"] fentr = self.getAnyEntropy(ds, dt, nbins)["entropy"] sunc = 2 * mutInfo / (tentr + fentr) #print("ds {} sunc {:.3f}".format(ds, sunc)) if sunc >= delta: f = [ds, dt, sunc, False] rfeatures.append(f) fentrs[ds] = fentr # sort descending of sym uncertainty rfeatures.sort(key=lambda e : e[2], reverse=True) #disccard redundant features le = len(rfeatures) for i in range(le): if rfeatures[i][3]: continue for j in range(i+1, le, 1): if rfeatures[j][3]: continue mutInfo = self.getMutualInfo([rfeatures[i][0], rfeatures[i][1], rfeatures[j][0], rfeatures[j][1]], nbins)["mutInfo"] sunc = 2 * mutInfo / (fentrs[rfeatures[i][0]] + fentrs[rfeatures[j][0]]) if sunc >= rfeatures[j][2]: rfeatures[j][3] = True frfeatures = list(filter(lambda f : not f[3], rfeatures)) selFeatures = list(map(lambda f : [f[0], f[2]], frfeatures)) result = self.__printResult("selFeatures", selFeatures) return result def getInfoGainFeatures(self, fdst, tdst, nfeatures, nsplit, nbins=20): """ get top n features based on information gain or entropy loss Parameters fdst: list of pair of data set name or list or numpy array and data type tdst: target data set name or list or numpy array and data type (cat for classification num for regression) nsplit : num of splits nfeatures : desired no of features nbins : no of bins for numerical data """ le = len(fdst) nfeatGiven = int(le / 2) assertGreater(nfeatGiven, nfeatures, "available features should be greater than desired") fds = list() types = ["num", "cat"] for i in range (0, le, 2): ds = fdst[i] dt = fdst[i+1] assertInList(dt, types, "invalid type for data source " + dt) data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) p =(ds, dt) fds.append(p) assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) assertGreater(nsplit, 3, "minimum 4 splits necessary") tdata = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"] sz =len(tdata) sfds = list() for ds, dt in fds: #print(ds, dt) if dt == "num": fd = self.getNumericData(ds) _ , _ , vmax, vmin = self.__getBasicStats(fd) intv = (vmax - vmin) / nsplit maxig = None spmin = vmin + intv spmax = vmax - 0.9 * intv #iterate all splits for sp in np.arange(spmin, spmax, intv): ltvals = list() gevals = list() for i in range(len(fd)): if fd[i] < sp: ltvals.append(tdata[i]) else: gevals.append(tdata[i]) self.addListNumericData(ltvals, "spds") if tdst[1] == "num" else self.addListCatData(ltvals, "spds") lten = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] self.addListNumericData(gevals, "spds") if tdst[1] == "num" else self.addListCatData(gevals, "spds") geen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] #info gain ig = tentr - (len(ltvals) * lten / sz + len(gevals) * geen / sz) if maxig is None or ig > maxig: maxig = ig pa = (ds, maxig) sfds.append(pa) else: fd = self.getCatData(ds) fds = set(fd) fdps = genPowerSet(fds) maxig = None #iterate all subsets for s in fdps: if len(s) == len(fds): continue invals = list() exvals = list() for i in range(len(fd)): if fd[i] in s: invals.append(tdata[i]) else: exvals.append(tdata[i]) self.addListNumericData(invals, "spds") if tdst[1] == "num" else self.addListCatData(invals, "spds") inen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] self.addListNumericData(exvals, "spds") if tdst[1] == "num" else self.addListCatData(exvals, "spds") exen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] ig = tentr - (len(invals) * inen / sz + len(exvals) * exen / sz) if maxig is None or ig > maxig: maxig = ig pa = (ds, maxig) sfds.append(pa) #sort of info gain sfds.sort(key = lambda v : v[1], reverse = True) result = self.__printResult("selFeatures", sfds[:nfeatures]) return result def __stackData(self, *dsl): """ stacks collumd to create matrix Parameters dsl: data source list """ dlist = tuple(map(lambda ds : self.getNumericData(ds), dsl)) self.ensureSameSize(dlist) dmat = np.column_stack(dlist) return dmat def __printBanner(self, msg, *dsl): """ print banner for any function Parameters msg: message dsl: list of data set name or list or numpy array """ tags = list(map(lambda ds : ds if type(ds) == str else "annoynymous", dsl)) forData = " for data sets " if tags else "" msg = msg + forData + " ".join(tags) if self.verbose: print("\n== " + msg + " ==") def __printDone(self): """ print banner for any function """ if self.verbose: print("done") def __printStat(self, stat, pvalue, nhMsg, ahMsg, sigLev=.05): """ generic stat and pvalue output Parameters stat : stat value pvalue : p value nhMsg : null hypothesis violation message ahMsg : null hypothesis message sigLev : significance level """ if self.verbose: print("\ntest result:") print("stat: {:.3f}".format(stat)) print("pvalue: {:.3f}".format(pvalue)) print("significance level: {:.3f}".format(sigLev)) print(nhMsg if pvalue > sigLev else ahMsg) def __printResult(self, *values): """ print results Parameters values : flattened kay and value pairs """ result = dict() assert len(values) % 2 == 0, "key value list should have even number of items" for i in range(0, len(values), 2): result[values[i]] = values[i+1] if self.verbose: print("result details:") self.pp.pprint(result) return result def __getBasicStats(self, data): """ get mean and std dev Parameters data : numpy array """ mean = np.average(data) sd = np.std(data) r = (mean, sd, np.max(data), np.min(data)) return r