Priyanka-Kumavat-At-TE's picture
Upload 7 files
e03eaf2
#!/usr/local/bin/python3
# Author: Pranab Ghosh
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
# Package imports
import os
import sys
import numpy as np
import pandas as pd
import sklearn as sk
from sklearn import preprocessing
from sklearn import metrics
import random
from math import *
from decimal import Decimal
import pprint
from statsmodels.graphics import tsaplots
from statsmodels.tsa import stattools as stt
from statsmodels.stats import stattools as sstt
from sklearn.linear_model import LinearRegression
from matplotlib import pyplot as plt
from scipy import stats as sta
from statsmodels.tsa.seasonal import seasonal_decompose
import statsmodels.api as sm
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import hurst
from .util import *
from .mlutil import *
from .sampler import *
from .stats import *
"""
Load data from a CSV file, data frame, numpy array or list
Each data set (array like) is given a name while loading
Perform various data exploration operation refering to the data sets by name
Save and restore workspace if needed
"""
class DataSetMetaData:
"""
data set meta data
"""
dtypeNum = 1
dtypeCat = 2
dtypeBin = 3
def __init__(self, dtype):
self.notes = list()
self.dtype = dtype
def addNote(self, note):
"""
add note
"""
self.notes.append(note)
class DataExplorer:
"""
various data exploration functions
"""
def __init__(self, verbose=True):
"""
initialize
Parameters
verbose : True for verbosity
"""
self.dataSets = dict()
self.metaData = dict()
self.pp = pprint.PrettyPrinter(indent=4)
self.verbose = verbose
def setVerbose(self, verbose):
"""
sets verbose
Parameters
verbose : True for verbosity
"""
self.verbose = verbose
def save(self, filePath):
"""
save checkpoint
Parameters
filePath : path of file where saved
"""
self.__printBanner("saving workspace")
ws = dict()
ws["data"] = self.dataSets
ws["metaData"] = self.metaData
saveObject(ws, filePath)
self.__printDone()
def restore(self, filePath):
"""
restore checkpoint
Parameters
filePath : path of file from where to store
"""
self.__printBanner("restoring workspace")
ws = restoreObject(filePath)
self.dataSets = ws["data"]
self.metaData = ws["metaData"]
self.__printDone()
def queryFileData(self, filePath, *columns):
"""
query column data type from a data file
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("querying column data type from a data frame")
lcolumns = list(columns)
noHeader = type(lcolumns[0]) == int
if noHeader:
df = pd.read_csv(filePath, header=None)
else:
df = pd.read_csv(filePath, header=0)
return self.queryDataFrameData(df, *columns)
def queryDataFrameData(self, df, *columns):
"""
query column data type from a data frame
Parameters
df : data frame with data
columns : indexes followed by column name or column names
"""
self.__printBanner("querying column data type from a data frame")
columns = list(columns)
noHeader = type(columns[0]) == int
dtypes = list()
if noHeader:
nCols = int(len(columns) / 2)
colIndexes = columns[:nCols]
cnames = columns[nCols:]
nColsDf = len(df.columns)
for i in range(nCols):
ci = colIndexes[i]
assert ci < nColsDf, "col index {} outside range".format(ci)
col = df.loc[ : , ci]
dtypes.append(self.getDataType(col))
else:
cnames = columns
for c in columns:
col = df[c]
dtypes.append(self.getDataType(col))
nt = list(zip(cnames, dtypes))
result = self.__printResult("columns and data types", nt)
return result
def getDataType(self, col):
"""
get data type
Parameters
col : contains data array like
"""
if isBinary(col):
dtype = "binary"
elif isInteger(col):
dtype = "integer"
elif isFloat(col):
dtype = "float"
elif isCategorical(col):
dtype = "categorical"
else:
dtype = "mixed"
return dtype
def addFileNumericData(self,filePath, *columns):
"""
add numeric columns from a file
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding numeric columns from a file")
self.addFileData(filePath, True, *columns)
self.__printDone()
def addFileBinaryData(self,filePath, *columns):
"""
add binary columns from a file
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding binary columns from a file")
self.addFileData(filePath, False, *columns)
self.__printDone()
def addFileData(self, filePath, numeric, *columns):
"""
add columns from a file
Parameters
filePath : path of file with data
numeric : True if numeric False in binary
columns : indexes followed by column names or column names
"""
columns = list(columns)
noHeader = type(columns[0]) == int
if noHeader:
df = pd.read_csv(filePath, header=None)
else:
df = pd.read_csv(filePath, header=0)
self.addDataFrameData(df, numeric, *columns)
def addDataFrameNumericData(self,filePath, *columns):
"""
add numeric columns from a data frame
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding numeric columns from a data frame")
self.addDataFrameData(filePath, True, *columns)
def addDataFrameBinaryData(self,filePath, *columns):
"""
add binary columns from a data frame
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding binary columns from a data frame")
self.addDataFrameData(filePath, False, *columns)
def addDataFrameData(self, df, numeric, *columns):
"""
add columns from a data frame
Parameters
df : data frame with data
numeric : True if numeric False in binary
columns : indexes followed by column names or column names
"""
columns = list(columns)
noHeader = type(columns[0]) == int
if noHeader:
nCols = int(len(columns) / 2)
colIndexes = columns[:nCols]
nColsDf = len(df.columns)
for i in range(nCols):
ci = colIndexes[i]
assert ci < nColsDf, "col index {} outside range".format(ci)
col = df.loc[ : , ci]
if numeric:
assert isNumeric(col), "data is not numeric"
else:
assert isBinary(col), "data is not binary"
col = col.to_numpy()
cn = columns[i + nCols]
dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin
self.__addDataSet(cn, col, dtype)
else:
for c in columns:
col = df[c]
if numeric:
assert isNumeric(col), "data is not numeric"
else:
assert isBinary(col), "data is not binary"
col = col.to_numpy()
dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin
self.__addDataSet(c, col, dtype)
def __addDataSet(self, dsn, data, dtype):
"""
add dada set
Parameters
dsn: data set name
data : numpy array data
"""
self.dataSets[dsn] = data
self.metaData[dsn] = DataSetMetaData(dtype)
def addListNumericData(self, ds, name):
"""
add numeric data from a list
Parameters
ds : list with data
name : name of data set
"""
self.__printBanner("add numeric data from a list")
self.addListData(ds, True, name)
self.__printDone()
def addListBinaryData(self, ds, name):
"""
add binary data from a list
Parameters
ds : list with data
name : name of data set
"""
self.__printBanner("adding binary data from a list")
self.addListData(ds, False, name)
self.__printDone()
def addListData(self, ds, numeric, name):
"""
adds list data
Parameters
ds : list with data
numeric : True if numeric False in binary
name : name of data set
"""
assert type(ds) == list, "data not a list"
if numeric:
assert isNumeric(ds), "data is not numeric"
else:
assert isBinary(ds), "data is not binary"
dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin
self.dataSets[name] = np.array(ds)
self.metaData[name] = DataSetMetaData(dtype)
def addFileCatData(self, filePath, *columns):
"""
add categorical columns from a file
Parameters
filePath : path of file with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding categorical columns from a file")
columns = list(columns)
noHeader = type(columns[0]) == int
if noHeader:
df = pd.read_csv(filePath, header=None)
else:
df = pd.read_csv(filePath, header=0)
self.addDataFrameCatData(df, *columns)
self.__printDone()
def addDataFrameCatData(self, df, *columns):
"""
add categorical columns from a data frame
Parameters
df : data frame with data
columns : indexes followed by column names or column names
"""
self.__printBanner("adding categorical columns from a data frame")
columns = list(columns)
noHeader = type(columns[0]) == int
if noHeader:
nCols = int(len(columns) / 2)
colIndexes = columns[:nCols]
nColsDf = len(df.columns)
for i in range(nCols):
ci = colIndexes[i]
assert ci < nColsDf, "col index {} outside range".format(ci)
col = df.loc[ : , ci]
assert isCategorical(col), "data is not categorical"
col = col.tolist()
cn = columns[i + nCols]
self.__addDataSet(cn, col, DataSetMetaData.dtypeCat)
else:
for c in columns:
col = df[c].tolist()
self.__addDataSet(c, col, DataSetMetaData.dtypeCat)
def addListCatData(self, ds, name):
"""
add categorical list data
Parameters
ds : list with data
name : name of data set
"""
self.__printBanner("adding categorical list data")
assert type(ds) == list, "data not a list"
assert isCategorical(ds), "data is not categorical"
self.__addDataSet(name, ds, DataSetMetaData.dtypeCat)
self.__printDone()
def remData(self, ds):
"""
removes data set
Parameters
ds : data set name
"""
self.__printBanner("removing data set", ds)
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
self.dataSets.pop(ds)
self.metaData.pop(ds)
names = self.showNames()
self.__printDone()
return names
def addNote(self, ds, note):
"""
get data
Parameters
ds : data set name or list or numpy array with data
note: note text
"""
self.__printBanner("adding note")
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
mdata = self.metaData[ds]
mdata.addNote(note)
self.__printDone()
def getNotes(self, ds):
"""
get data
Parameters
ds : data set name or list or numpy array with data
"""
self.__printBanner("getting notes")
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
mdata = self.metaData[ds]
dnotes = mdata.notes
if self.verbose:
for dn in dnotes:
print(dn)
return dnotes
def getNumericData(self, ds):
"""
get numeric data
Parameters
ds : data set name or list or numpy array with data
"""
if type(ds) == str:
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
assert self.metaData[ds].dtype == DataSetMetaData.dtypeNum, "data set {} is expected to be numerical type for this operation".format(ds)
data = self.dataSets[ds]
elif type(ds) == list:
assert isNumeric(ds), "data is not numeric"
data = np.array(ds)
elif type(ds) == np.ndarray:
data = ds
else:
raise "invalid type, expecting data set name, list or ndarray"
return data
def getCatData(self, ds):
"""
get categorical data
Parameters
ds : data set name or list with data
"""
if type(ds) == str:
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
assert self.metaData[ds].dtype == DataSetMetaData.dtypeCat, "data set {} is expected to be categorical type for this operation".format(ds)
data = self.dataSets[ds]
elif type(ds) == list:
assert isCategorical(ds), "data is not categorical"
data = ds
else:
raise "invalid type, expecting data set name or list"
return data
def getAnyData(self, ds):
"""
get any data
Parameters
ds : data set name or list with data
"""
if type(ds) == str:
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
data = self.dataSets[ds]
elif type(ds) == list:
data = ds
else:
raise "invalid type, expecting data set name or list"
return data
def loadCatFloatDataFrame(self, ds1, ds2):
"""
loads float and cat data into data frame
Parameters
ds1: data set name or list
ds2: data set name or list or numpy array
"""
data1 = self.getCatData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
df1 = pd.DataFrame(data=data1)
df2 = pd.DataFrame(data=data2)
df = pd.concat([df1,df2], axis=1)
df.columns = range(df.shape[1])
return df
def showNames(self):
"""
lists data set names
"""
self.__printBanner("listing data set names")
names = self.dataSets.keys()
if self.verbose:
print("data sets")
for ds in names:
print(ds)
self.__printDone()
return names
def plot(self, ds, yscale=None):
"""
plots data
Parameters
ds: data set name or list or numpy array
yscale: y scale
"""
self.__printBanner("plotting data", ds)
data = self.getNumericData(ds)
drawLine(data, yscale)
def plotZoomed(self, ds, beg, end, yscale=None):
"""
plots zoomed data
Parameters
ds: data set name or list or numpy array
beg: begin offset
end: end offset
yscale: y scale
"""
self.__printBanner("plotting data", ds)
data = self.getNumericData(ds)
drawLine(data[beg:end], yscale)
def scatterPlot(self, ds1, ds2):
"""
scatter plots data
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
"""
self.__printBanner("scatter plotting data", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
x = np.arange(1, len(data1)+1, 1)
plt.scatter(x, data1 ,color="red")
plt.scatter(x, data2 ,color="blue")
plt.show()
def print(self, ds):
"""
prunt data
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("printing data", ds)
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
data = self.dataSets[ds]
if self.verbore:
print(formatAny(len(data), "size"))
print("showing first 50 elements" )
print(data[:50])
def plotHist(self, ds, cumulative, density, nbins=20):
"""
plots histogram
Parameters
ds: data set name or list or numpy array
cumulative : True if cumulative
density : True to normalize for probability density
nbins : no of bins
"""
self.__printBanner("plotting histogram", ds)
data = self.getNumericData(ds)
plt.hist(data, bins=nbins, cumulative=cumulative, density=density)
plt.show()
def isMonotonicallyChanging(self, ds):
"""
checks if monotonically increasing or decreasing
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("checking monotonic change", ds)
data = self.getNumericData(ds)
monoIncreasing = all(list(map(lambda i : data[i] >= data[i-1], range(1, len(data), 1))))
monoDecreasing = all(list(map(lambda i : data[i] <= data[i-1], range(1, len(data), 1))))
result = self.__printResult("monoIncreasing", monoIncreasing, "monoDecreasing", monoDecreasing)
return result
def getFreqDistr(self, ds, nbins=20):
"""
get histogram
Parameters
ds: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting histogram", ds)
data = self.getNumericData(ds)
frequency, lowLimit, binsize, extraPoints = sta.relfreq(data, numbins=nbins)
result = self.__printResult("frequency", frequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints)
return result
def getCumFreqDistr(self, ds, nbins=20):
"""
get cumulative freq distribution
Parameters
ds: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting cumulative freq distribution", ds)
data = self.getNumericData(ds)
cumFrequency, lowLimit, binsize, extraPoints = sta.cumfreq(data, numbins=nbins)
result = self.__printResult("cumFrequency", cumFrequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints)
return result
def getExtremeValue(self, ds, ensamp, nsamp, polarity, doPlotDistr, nbins=20):
"""
get extreme values
Parameters
ds: data set name or list or numpy array
ensamp: num of samples for extreme values
nsamp: num of samples
polarity: max or min
doPlotDistr: plot distr
nbins: num of bins
"""
self.__printBanner("getting extreme values", ds)
data = self.getNumericData(ds)
evalues = list()
for _ in range(ensamp):
values = selectRandomSubListFromListWithRepl(data, nsamp)
if polarity == "max":
evalues.append(max(values))
else:
evalues.append(min(values))
if doPlotDistr:
plt.hist(evalues, bins=nbins, cumulative=False, density=True)
plt.show()
result = self.__printResult("extremeValues", evalues)
return result
def getEntropy(self, ds, nbins=20):
"""
get entropy
Parameters
ds: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting entropy", ds)
data = self.getNumericData(ds)
result = self.getFreqDistr(data, nbins)
entropy = sta.entropy(result["frequency"])
result = self.__printResult("entropy", entropy)
return result
def getRelEntropy(self, ds1, ds2, nbins=20):
"""
get relative entropy or KL divergence with both data sets numeric
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting relative entropy or KL divergence", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
result1 = self .getFeqDistr(data1, nbins)
freq1 = result1["frequency"]
result2 = self .getFeqDistr(data2, nbins)
freq2 = result2["frequency"]
entropy = sta.entropy(freq1, freq2)
result = self.__printResult("relEntropy", entropy)
return result
def getAnyEntropy(self, ds, dt, nbins=20):
"""
get entropy of any data typr numeric or categorical
Parameters
ds: data set name or list or numpy array
dt : data type num or cat
nbins: num of bins
"""
entropy = self.getEntropy(ds, nbins)["entropy"] if dt == "num" else self.getStatsCat(ds)["entropy"]
result = self.__printResult("entropy", entropy)
return result
def getJointEntropy(self, ds1, ds2, nbins=20):
"""
get joint entropy with both data sets numeric
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting join entropy", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
hist, xedges, yedges = np.histogram2d(data1, data2, bins=nbins)
hist = hist.flatten()
ssize = len(data1)
hist = hist / ssize
entropy = sta.entropy(hist)
result = self.__printResult("jointEntropy", entropy)
return result
def getAllNumMutualInfo(self, ds1, ds2, nbins=20):
"""
get mutual information for both numeric data
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
nbins: num of bins
"""
self.__printBanner("getting mutual information", ds1, ds2)
en1 = self.getEntropy(ds1,nbins)
en2 = self.getEntropy(ds2,nbins)
en = self.getJointEntropy(ds1, ds2, nbins)
mutInfo = en1["entropy"] + en2["entropy"] - en["jointEntropy"]
result = self.__printResult("mutInfo", mutInfo)
return result
def getNumCatMutualInfo(self, nds, cds ,nbins=20):
"""
get mutiual information between numeric and categorical data
Parameters
nds: numeric data set name or list or numpy array
cds: categoric data set name or list
nbins: num of bins
"""
self.__printBanner("getting mutual information of numerical and categorical data", nds, cds)
ndata = self.getNumericData(nds)
cds = self.getCatData(cds)
nentr = self.getEntropy(nds)["entropy"]
#conditional entropy
cdistr = self.getStatsCat(cds)["distr"]
grdata = self.getGroupByData(nds, cds, True)["groupedData"]
cnentr = 0
for gr, data in grdata.items():
self.addListNumericData(data, "grdata")
gnentr = self.getEntropy("grdata")["entropy"]
cnentr += gnentr * cdistr[gr]
mutInfo = nentr - cnentr
result = self.__printResult("mutInfo", mutInfo, "entropy", nentr, "condEntropy", cnentr)
return result
def getTwoCatMutualInfo(self, cds1, cds2):
"""
get mutiual information between 2 categorical data sets
Parameters
cds1 : categoric data set name or list
cds2 : categoric data set name or list
"""
self.__printBanner("getting mutual information of two categorical data sets", cds1, cds2)
cdata1 = self.getCatData(cds1)
cdata2 = self.getCatData(cds1)
centr = self.getStatsCat(cds1)["entropy"]
#conditional entropy
cdistr = self.getStatsCat(cds2)["distr"]
grdata = self.getGroupByData(cds1, cds2, True)["groupedData"]
ccentr = 0
for gr, data in grdata.items():
self.addListCatData(data, "grdata")
gcentr = self.getStatsCat("grdata")["entropy"]
ccentr += gcentr * cdistr[gr]
mutInfo = centr - ccentr
result = self.__printResult("mutInfo", mutInfo, "entropy", centr, "condEntropy", ccentr)
return result
def getMutualInfo(self, dst, nbins=20):
"""
get mutiual information between 2 data sets,any combination numerical and categorical
Parameters
dst : data source , data type, data source , data type
nbins : num of bins
"""
assertEqual(len(dst), 4, "invalid data source and data type list size")
dtypes = ["num", "cat"]
assertInList(dst[1], dtypes, "invalid data type")
assertInList(dst[3], dtypes, "invalid data type")
self.__printBanner("getting mutual information of any mix numerical and categorical data", dst[0], dst[2])
if dst[1] == "num":
mutInfo = self.getAllNumMutualInfo(dst[0], dst[2], nbins)["mutInfo"] if dst[3] == "num" \
else self.getNumCatMutualInfo(dst[0], dst[2], nbins)["mutInfo"]
else:
mutInfo = self.getNumCatMutualInfo(dst[2], dst[0], nbins)["mutInfo"] if dst[3] == "num" \
else self.getTwoCatMutualInfo(dst[2], dst[0])["mutInfo"]
result = self.__printResult("mutInfo", mutInfo)
return result
def getCondMutualInfo(self, dst, nbins=20):
"""
get conditional mutiual information between 2 data sets,any combination numerical and categorical
Parameters
dst : data source , data type, data source , data type, data source , data type
nbins : num of bins
"""
assertEqual(len(dst), 6, "invalid data source and data type list size")
dtypes = ["num", "cat"]
assertInList(dst[1], dtypes, "invalid data type")
assertInList(dst[3], dtypes, "invalid data type")
assertInList(dst[5], dtypes, "invalid data type")
self.__printBanner("getting conditional mutual information of any mix numerical and categorical data", dst[0], dst[2])
if dst[5] == "cat":
cdistr = self.getStatsCat(dst[4])["distr"]
grdata1 = self.getGroupByData(dst[0], dst[4], True)["groupedData"]
grdata2 = self.getGroupByData(dst[2], dst[4], True)["groupedData"]
else:
gdata = self.getNumericData(dst[4])
hist = Histogram.createWithNumBins(gdata, nbins)
cdistr = hist.distr()
grdata1 = self.getGroupByData(dst[0], dst[4], False)["groupedData"]
grdata2 = self.getGroupByData(dst[2], dst[4], False)["groupedData"]
cminfo = 0
for gr in grdata1.keys():
data1 = grdata1[gr]
data2 = grdata2[gr]
if dst[1] == "num":
self.addListNumericData(data1, "grdata1")
else:
self.addListCatData(data1, "grdata1")
if dst[3] == "num":
self.addListNumericData(data2, "grdata2")
else:
self.addListCatData(data2, "grdata2")
gdst = ["grdata1", dst[1], "grdata2", dst[3]]
minfo = self.getMutualInfo(gdst, nbins)["mutInfo"]
cminfo += minfo * cdistr[gr]
result = self.__printResult("condMutInfo", cminfo)
return result
def getPercentile(self, ds, value):
"""
gets percentile
Parameters
ds: data set name or list or numpy array
value: the value
"""
self.__printBanner("getting percentile", ds)
data = self.getNumericData(ds)
percent = sta.percentileofscore(data, value)
result = self.__printResult("value", value, "percentile", percent)
return result
def getValueRangePercentile(self, ds, value1, value2):
"""
gets percentile
Parameters
ds: data set name or list or numpy array
value1: first value
value2: second value
"""
self.__printBanner("getting percentile difference for value range", ds)
if value1 < value2:
v1 = value1
v2 = value2
else:
v1 = value2
v2 = value1
data = self.getNumericData(ds)
per1 = sta.percentileofscore(data, v1)
per2 = sta.percentileofscore(data, v2)
result = self.__printResult("valueFirst", value1, "valueSecond", value2, "percentileDiff", per2 - per1)
return result
def getValueAtPercentile(self, ds, percent):
"""
gets value at percentile
Parameters
ds: data set name or list or numpy array
percent: percentile
"""
self.__printBanner("getting value at percentile", ds)
data = self.getNumericData(ds)
assert isInRange(percent, 0, 100), "percent should be between 0 and 100"
value = sta.scoreatpercentile(data, percent)
result = self.__printResult("value", value, "percentile", percent)
return result
def getLessThanValues(self, ds, cvalue):
"""
gets values less than given value
Parameters
ds: data set name or list or numpy array
cvalue: condition value
"""
self.__printBanner("getting values less than", ds)
fdata = self.__getCondValues(ds, cvalue, "lt")
result = self.__printResult("count", len(fdata), "lessThanvalues", fdata )
return result
def getGreaterThanValues(self, ds, cvalue):
"""
gets values greater than given value
Parameters
ds: data set name or list or numpy array
cvalue: condition value
"""
self.__printBanner("getting values greater than", ds)
fdata = self.__getCondValues(ds, cvalue, "gt")
result = self.__printResult("count", len(fdata), "greaterThanvalues", fdata )
return result
def __getCondValues(self, ds, cvalue, cond):
"""
gets cinditional values
Parameters
ds: data set name or list or numpy array
cvalue: condition value
cond: condition
"""
data = self.getNumericData(ds)
if cond == "lt":
ind = np.where(data < cvalue)
else:
ind = np.where(data > cvalue)
fdata = data[ind]
return fdata
def getUniqueValueCounts(self, ds, maxCnt=10):
"""
gets unique values and counts
Parameters
ds: data set name or list or numpy array
maxCnt; max value count pairs to return
"""
self.__printBanner("getting unique values and counts", ds)
data = self.getNumericData(ds)
values, counts = sta.find_repeats(data)
cardinality = len(values)
vc = list(zip(values, counts))
vc.sort(key = lambda v : v[1], reverse = True)
result = self.__printResult("cardinality", cardinality, "vunique alues and repeat counts", vc[:maxCnt])
return result
def getCatUniqueValueCounts(self, ds, maxCnt=10):
"""
gets unique categorical values and counts
Parameters
ds: data set name or list or numpy array
maxCnt: max value count pairs to return
"""
self.__printBanner("getting unique categorical values and counts", ds)
data = self.getCatData(ds)
series = pd.Series(data)
uvalues = series.value_counts()
values = uvalues.index.tolist()
counts = uvalues.tolist()
vc = list(zip(values, counts))
vc.sort(key = lambda v : v[1], reverse = True)
result = self.__printResult("cardinality", len(values), "unique values and repeat counts", vc[:maxCnt])
return result
def getCatAlphaValueCounts(self, ds):
"""
gets alphabetic value count
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting alphabetic value counts", ds)
data = self.getCatData(ds)
series = pd.Series(data)
flags = series.str.isalpha().tolist()
count = sum(flags)
result = self.__printResult("alphabeticValueCount", count)
return result
def getCatNumValueCounts(self, ds):
"""
gets numeric value count
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting numeric value counts", ds)
data = self.getCatData(ds)
series = pd.Series(data)
flags = series.str.isnumeric().tolist()
count = sum(flags)
result = self.__printResult("numericValueCount", count)
return result
def getCatAlphaNumValueCounts(self, ds):
"""
gets alpha numeric value count
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting alpha numeric value counts", ds)
data = self.getCatData(ds)
series = pd.Series(data)
flags = series.str.isalnum().tolist()
count = sum(flags)
result = self.__printResult("alphaNumericValueCount", count)
return result
def getCatAllCharCounts(self, ds):
"""
gets alphabetic, numeric and special char count list
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting alphabetic, numeric and special char counts", ds)
data = self.getCatData(ds)
counts = list()
for d in data:
r = getAlphaNumCharCount(d)
counts.append(r)
result = self.__printResult("allTypeCharCounts", counts)
return result
def getCatAlphaCharCounts(self, ds):
"""
gets alphabetic char count list
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting alphabetic char counts", ds)
data = self.getCatData(ds)
counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"]
counts = list(map(lambda r : r[0], counts))
result = self.__printResult("alphaCharCounts", counts)
return result
def getCatNumCharCounts(self, ds):
"""
gets numeric char count list
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting numeric char counts", ds)
data = self.getCatData(ds)
counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"]
counts = list(map(lambda r : r[1], counts))
result = self.__printResult("numCharCounts", counts)
return result
def getCatSpecialCharCounts(self, ds):
"""
gets special char count list
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting special char counts", ds)
counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"]
counts = list(map(lambda r : r[2], counts))
result = self.__printResult("specialCharCounts", counts)
return result
def getCatAlphaCharCountStats(self, ds):
"""
gets alphabetic char count stats
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting alphabetic char count stats", ds)
counts = self.getCatAlphaCharCounts(ds)["alphaCharCounts"]
nz = counts.count(0)
st = self.__getBasicStats(np.array(counts))
result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz)
return result
def getCatNumCharCountStats(self, ds):
"""
gets numeric char count stats
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting numeric char count stats", ds)
counts = self.getCatNumCharCounts(ds)["numCharCounts"]
nz = counts.count(0)
st = self.__getBasicStats(np.array(counts))
result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz)
return result
def getCatSpecialCharCountStats(self, ds):
"""
gets special char count stats
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting special char count stats", ds)
counts = self.getCatSpecialCharCounts(ds)["specialCharCounts"]
nz = counts.count(0)
st = self.__getBasicStats(np.array(counts))
result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz)
return result
def getCatFldLenStats(self, ds):
"""
gets field length stats
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting field length stats", ds)
data = self.getCatData(ds)
le = list(map(lambda d: len(d), data))
st = self.__getBasicStats(np.array(le))
result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3])
return result
def getCatCharCountStats(self, ds, ch):
"""
gets specified char ocuurence count stats
Parameters
ds: data set name or list or numpy array
ch : character
"""
self.__printBanner("getting field length stats", ds)
data = self.getCatData(ds)
counts = list(map(lambda d: d.count(ch), data))
nz = counts.count(0)
st = self.__getBasicStats(np.array(counts))
result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz)
return result
def getStats(self, ds, nextreme=5):
"""
gets summary statistics
Parameters
ds: data set name or list or numpy array
nextreme: num of extreme values
"""
self.__printBanner("getting summary statistics", ds)
data = self.getNumericData(ds)
stat = dict()
stat["length"] = len(data)
stat["min"] = data.min()
stat["max"] = data.max()
series = pd.Series(data)
stat["n smallest"] = series.nsmallest(n=nextreme).tolist()
stat["n largest"] = series.nlargest(n=nextreme).tolist()
stat["mean"] = data.mean()
stat["median"] = np.median(data)
mode, modeCnt = sta.mode(data)
stat["mode"] = mode[0]
stat["mode count"] = modeCnt[0]
stat["std"] = np.std(data)
stat["skew"] = sta.skew(data)
stat["kurtosis"] = sta.kurtosis(data)
stat["mad"] = sta.median_absolute_deviation(data)
self.pp.pprint(stat)
return stat
def getStatsCat(self, ds):
"""
gets summary statistics for categorical data
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting summary statistics for categorical data", ds)
data = self.getCatData(ds)
ch = CatHistogram()
for d in data:
ch.add(d)
mode = ch.getMode()
entr = ch.getEntropy()
uvalues = ch.getUniqueValues()
distr = ch.getDistr()
result = self.__printResult("entropy", entr, "mode", mode, "uniqueValues", uvalues, "distr", distr)
return result
def getGroupByData(self, ds, gds, gdtypeCat, numBins=20):
"""
group by
Parameters
ds: data set name or list or numpy array
gds: group by data set name or list or numpy array
gdtpe : group by data type
"""
self.__printBanner("getting group by data", ds)
data = self.getAnyData(ds)
if gdtypeCat:
gdata = self.getCatData(gds)
else:
gdata = self.getNumericData(gds)
hist = Histogram.createWithNumBins(gdata, numBins)
gdata = list(map(lambda d : hist.bin(d), gdata))
self.ensureSameSize([data, gdata])
groups = dict()
for g,d in zip(gdata, data):
appendKeyedList(groups, g, d)
ve = self.verbose
self.verbose = False
result = self.__printResult("groupedData", groups)
self.verbose = ve
return result
def getDifference(self, ds, order, doPlot=False):
"""
gets difference of given order
Parameters
ds: data set name or list or numpy array
order: order of difference
doPlot : True for plot
"""
self.__printBanner("getting difference of given order", ds)
data = self.getNumericData(ds)
diff = difference(data, order)
if doPlot:
drawLine(diff)
return diff
def getTrend(self, ds, doPlot=False):
"""
get trend
Parameters
ds: data set name or list or numpy array
doPlot: true if plotting needed
"""
self.__printBanner("getting trend")
data = self.getNumericData(ds)
sz = len(data)
X = list(range(0, sz))
X = np.reshape(X, (sz, 1))
model = LinearRegression()
model.fit(X, data)
trend = model.predict(X)
sc = model.score(X, data)
coef = model.coef_
intc = model.intercept_
result = self.__printResult("coeff", coef, "intercept", intc, "r square error", sc, "trend", trend)
if doPlot:
plt.plot(data)
plt.plot(trend)
plt.show()
return result
def getDiffSdNoisiness(self, ds):
"""
get noisiness based on std dev of first order difference
Parameters
ds: data set name or list or numpy array
"""
diff = self.getDifference(ds, 1)
noise = np.std(np.array(diff))
result = self.__printResult("noisiness", noise)
return result
def getMaRmseNoisiness(self, ds, wsize=5):
"""
gets noisiness based on RMSE with moving average
Parameters
ds: data set name or list or numpy array
wsize : window size
"""
assert wsize % 2 == 1, "window size must be odd"
data = self.getNumericData(ds)
wind = data[:wsize]
wstat = SlidingWindowStat.initialize(wind.tolist())
whsize = int(wsize / 2)
beg = whsize
end = len(data) - whsize - 1
sumSq = 0.0
mean = wstat.getStat()[0]
diff = data[beg] - mean
sumSq += diff * diff
for i in range(beg + 1, end, 1):
mean = wstat.addGetStat(data[i + whsize])[0]
diff = data[i] - mean
sumSq += (diff * diff)
noise = math.sqrt(sumSq / (len(data) - 2 * whsize))
result = self.__printResult("noisiness", noise)
return result
def deTrend(self, ds, trend, doPlot=False):
"""
de trend
Parameters
ds: data set name or list or numpy array
ternd : trend data
doPlot: true if plotting needed
"""
self.__printBanner("doing de trend", ds)
data = self.getNumericData(ds)
sz = len(data)
detrended = list(map(lambda i : data[i]-trend[i], range(sz)))
if doPlot:
drawLine(detrended)
return detrended
def getTimeSeriesComponents(self, ds, model, freq, summaryOnly, doPlot=False):
"""
extracts trend, cycle and residue components of time series
Parameters
ds: data set name or list or numpy array
model : model type
freq : seasnality period
summaryOnly : True if only summary needed in output
doPlot: true if plotting needed
"""
self.__printBanner("extracting trend, cycle and residue components of time series", ds)
assert model == "additive" or model == "multiplicative", "model must be additive or multiplicative"
data = self.getNumericData(ds)
res = seasonal_decompose(data, model=model, period=freq)
if doPlot:
res.plot()
plt.show()
#summar of componenets
trend = np.array(removeNan(res.trend))
trendMean = trend.mean()
trendSlope = (trend[-1] - trend[0]) / (len(trend) - 1)
seasonal = np.array(removeNan(res.seasonal))
seasonalAmp = (seasonal.max() - seasonal.min()) / 2
resid = np.array(removeNan(res.resid))
residueMean = resid.mean()
residueStdDev = np.std(resid)
if summaryOnly:
result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp,
"residueMean", residueMean, "residueStdDev", residueStdDev)
else:
result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp,
"residueMean", residueMean, "residueStdDev", residueStdDev, "trend", res.trend, "seasonal", res.seasonal,
"residual", res.resid)
return result
def getGausianMixture(self, ncomp, cvType, ninit, *dsl):
"""
finds gaussian mixture parameters
Parameters
ncomp : num of gaussian componenets
cvType : co variance type
ninit: num of intializations
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting gaussian mixture parameters", *dsl)
assertInList(cvType, ["full", "tied", "diag", "spherical"], "invalid covariance type")
dmat = self.__stackData(*dsl)
gm = GaussianMixture(n_components=ncomp, covariance_type=cvType, n_init=ninit)
gm.fit(dmat)
weights = gm.weights_
means = gm.means_
covars = gm.covariances_
converged = gm.converged_
niter = gm.n_iter_
aic = gm.aic(dmat)
result = self.__printResult("weights", weights, "mean", means, "covariance", covars, "converged", converged, "num iterations", niter, "aic", aic)
return result
def getKmeansCluster(self, nclust, ninit, *dsl):
"""
gets cluster parameters
Parameters
nclust : num of clusters
ninit: num of intializations
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting kmean cluster parameters", *dsl)
dmat = self.__stackData(*dsl)
nsamp = dmat.shape[0]
km = KMeans(n_clusters=nclust, n_init=ninit)
km.fit(dmat)
centers = km.cluster_centers_
avdist = sqrt(km.inertia_ / nsamp)
niter = km.n_iter_
score = km.score(dmat)
result = self.__printResult("centers", centers, "average distance", avdist, "num iterations", niter, "score", score)
return result
def getPrincComp(self, ncomp, *dsl):
"""
finds pricipal componenet parameters
Parameters
ncomp : num of pricipal componenets
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting principal componenet parameters", *dsl)
dmat = self.__stackData(*dsl)
nfeat = dmat.shape[1]
assertGreater(nfeat, 1, "requires multiple features")
assertLesserEqual(ncomp, nfeat, "num of componenets greater than num of features")
pca = PCA(n_components=ncomp)
pca.fit(dmat)
comps = pca.components_
var = pca.explained_variance_
varr = pca.explained_variance_ratio_
svalues = pca.singular_values_
result = self.__printResult("componenets", comps, "variance", var, "variance ratio", varr, "singular values", svalues)
return result
def getOutliersWithIsoForest(self, contamination, *dsl):
"""
finds outliers using isolation forest
Parameters
contamination : proportion of outliers in the data set
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting outliers using isolation forest", *dsl)
assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range"
dmat = self.__stackData(*dsl)
isf = IsolationForest(contamination=contamination, behaviour="new")
ypred = isf.fit_predict(dmat)
mask = ypred == -1
doul = dmat[mask, :]
mask = ypred != -1
dwoul = dmat[mask, :]
result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul)
return result
def getOutliersWithLocalFactor(self, contamination, *dsl):
"""
gets outliers using local outlier factor
Parameters
contamination : proportion of outliers in the data set
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting outliers using local outlier factor", *dsl)
assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range"
dmat = self.__stackData(*dsl)
lof = LocalOutlierFactor(contamination=contamination)
ypred = lof.fit_predict(dmat)
mask = ypred == -1
doul = dmat[mask, :]
mask = ypred != -1
dwoul = dmat[mask, :]
result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul)
return result
def getOutliersWithSupVecMach(self, nu, *dsl):
"""
gets outliers using one class svm
Parameters
nu : upper bound on the fraction of training errors and a lower bound of the fraction of support vectors
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting outliers using one class svm", *dsl)
assert nu >= 0 and nu <= 0.5, "error upper bound outside valid range"
dmat = self.__stackData(*dsl)
svm = OneClassSVM(nu=nu)
ypred = svm.fit_predict(dmat)
mask = ypred == -1
doul = dmat[mask, :]
mask = ypred != -1
dwoul = dmat[mask, :]
result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul)
return result
def getOutliersWithCovarDeterminant(self, contamination, *dsl):
"""
gets outliers using covariance determinan
Parameters
contamination : proportion of outliers in the data set
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting outliers using using covariance determinant", *dsl)
assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range"
dmat = self.__stackData(*dsl)
lof = EllipticEnvelope(contamination=contamination)
ypred = lof.fit_predict(dmat)
mask = ypred == -1
doul = dmat[mask, :]
mask = ypred != -1
dwoul = dmat[mask, :]
result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul)
return result
def getOutliersWithZscore(self, ds, zthreshold, stats=None):
"""
gets outliers using zscore
Parameters
ds: data set name or list or numpy array
zthreshold : z score threshold
stats : tuple cintaining mean and std dev
"""
self.__printBanner("getting outliers using zscore", ds)
data = self.getNumericData(ds)
if stats is None:
mean = data.mean()
sd = np.std(data)
else:
mean = stats[0]
sd = stats[1]
zs = list(map(lambda d : abs((d - mean) / sd), data))
outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(zs)))
result = self.__printResult("outliers", outliers)
return result
def getOutliersWithRobustZscore(self, ds, zthreshold, stats=None):
"""
gets outliers using robust zscore
Parameters
ds: data set name or list or numpy array
zthreshold : z score threshold
stats : tuple containing median and median absolute deviation
"""
self.__printBanner("getting outliers using robust zscore", ds)
data = self.getNumericData(ds)
if stats is None:
med = np.median(data)
dev = np.array(list(map(lambda d : abs(d - med), data)))
mad = 1.4296 * np.median(dev)
else:
med = stats[0]
mad = stats[1]
rzs = list(map(lambda d : abs((d - med) / mad), data))
outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(rzs)))
result = self.__printResult("outliers", outliers)
return result
def getSubsequenceOutliersWithDissimilarity(self, subSeqSize, ds):
"""
gets subsequence outlier with subsequence pairwise disimilarity
Parameters
subSeqSize : sub sequence size
ds: data set name or list or numpy array
"""
self.__printBanner("doing sub sequence anomaly detection with dissimilarity", ds)
data = self.getNumericData(ds)
sz = len(data)
dist = dict()
minDist = dict()
for i in range(sz - subSeqSize):
#first window
w1 = data[i : i + subSeqSize]
dmin = None
for j in range(sz - subSeqSize):
#second window not overlapping with the first
if j + subSeqSize <=i or j >= i + subSeqSize:
w2 = data[j : j + subSeqSize]
k = (j,i)
if k in dist:
d = dist[k]
else:
d = euclideanDistance(w1,w2)
k = (i,j)
dist[k] = d
if dmin is None:
dmin = d
else:
dmin = d if d < dmin else dmin
minDist[i] = dmin
#find max of min
dmax = None
offset = None
for k in minDist.keys():
d = minDist[k]
if dmax is None:
dmax = d
offset = k
else:
if d > dmax:
dmax = d
offset = k
result = self.__printResult("subSeqOffset", offset, "outlierScore", dmax)
return result
def getNullCount(self, ds):
"""
get count of null fields
Parameters
ds : data set name or list or numpy array with data
"""
self.__printBanner("getting null value count", ds)
if type(ds) == str:
assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds)
data = self.dataSets[ds]
ser = pd.Series(data)
elif type(ds) == list or type(ds) == np.ndarray:
ser = pd.Series(ds)
data = ds
else:
raise ValueError("invalid data type")
nv = ser.isnull().tolist()
nullCount = nv.count(True)
nullFraction = nullCount / len(data)
result = self.__printResult("nullFraction", nullFraction, "nullCount", nullCount)
return result
def fitLinearReg(self, dsx, ds, doPlot=False):
"""
fit linear regression
Parameters
dsx: x data set name or None
ds: data set name or list or numpy array
doPlot: true if plotting needed
"""
self.__printBanner("fitting linear regression", ds)
data = self.getNumericData(ds)
if dsx is None:
x = np.arange(len(data))
else:
x = self.getNumericData(dsx)
slope, intercept, rvalue, pvalue, stderr = sta.linregress(x, data)
result = self.__printResult("slope", slope, "intercept", intercept, "rvalue", rvalue, "pvalue", pvalue, "stderr", stderr)
if doPlot:
self.regFitPlot(x, data, slope, intercept)
return result
def fitSiegelRobustLinearReg(self, ds, doPlot=False):
"""
siegel robust linear regression fit based on median
Parameters
ds: data set name or list or numpy array
doPlot: true if plotting needed
"""
self.__printBanner("fitting siegel robust linear regression based on median", ds)
data = self.getNumericData(ds)
slope , intercept = sta.siegelslopes(data)
result = self.__printResult("slope", slope, "intercept", intercept)
if doPlot:
x = np.arange(len(data))
self.regFitPlot(x, data, slope, intercept)
return result
def fitTheilSenRobustLinearReg(self, ds, doPlot=False):
"""
thiel sen robust linear fit regression based on median
Parameters
ds: data set name or list or numpy array
doPlot: true if plotting needed
"""
self.__printBanner("fitting thiel sen robust linear regression based on median", ds)
data = self.getNumericData(ds)
slope, intercept, loSlope, upSlope = sta.theilslopes(data)
result = self.__printResult("slope", slope, "intercept", intercept, "lower slope", loSlope, "upper slope", upSlope)
if doPlot:
x = np.arange(len(data))
self.regFitPlot(x, data, slope, intercept)
return result
def plotRegFit(self, x, y, slope, intercept):
"""
plot linear rgeression fit line
Parameters
x : x values
y : y values
slope : slope
intercept : intercept
"""
self.__printBanner("plotting linear rgeression fit line")
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(x, y, "b.")
ax.plot(x, intercept + slope * x, "r-")
plt.show()
def getRegFit(self, xvalues, yvalues, slope, intercept):
"""
gets fitted line and residue
Parameters
x : x values
y : y values
slope : regression slope
intercept : regressiob intercept
"""
yfit = list()
residue = list()
for x,y in zip(xvalues, yvalues):
yf = x * slope + intercept
yfit.append(yf)
r = y - yf
residue.append(r)
result = self.__printResult("fitted line", yfit, "residue", residue)
return result
def getInfluentialPoints(self, dsx, dsy):
"""
gets influential points in regression model with Cook's distance
Parameters
dsx : data set name or list or numpy array for x
dsy : data set name or list or numpy array for y
"""
self.__printBanner("finding influential points for linear regression", dsx, dsy)
y = self.getNumericData(dsy)
x = np.arange(len(data)) if dsx is None else self.getNumericData(dsx)
model = sm.OLS(y, x).fit()
np.set_printoptions(suppress=True)
influence = model.get_influence()
cooks = influence.cooks_distance
result = self.__printResult("Cook distance", cooks)
return result
def getCovar(self, *dsl):
"""
gets covariance
Parameters
dsl: list of data set name or list or numpy array
"""
self.__printBanner("getting covariance", *dsl)
data = list(map(lambda ds : self.getNumericData(ds), dsl))
self.ensureSameSize(data)
data = np.vstack(data)
cv = np.cov(data)
print(cv)
return cv
def getPearsonCorr(self, ds1, ds2, sigLev=.05):
"""
gets pearson correlation coefficient
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
"""
self.__printBanner("getting pearson correlation coefficient ", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
stat, pvalue = sta.pearsonr(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def getSpearmanRankCorr(self, ds1, ds2, sigLev=.05):
"""
gets spearman correlation coefficient
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("getting spearman correlation coefficient",ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
stat, pvalue = sta.spearmanr(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def getKendalRankCorr(self, ds1, ds2, sigLev=.05):
"""
kendall’s tau, a correlation measure for ordinal data
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("getting kendall’s tau, a correlation measure for ordinal data", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
stat, pvalue = sta.kendalltau(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def getPointBiserialCorr(self, ds1, ds2, sigLev=.05):
"""
point biserial correlation between binary and numeric
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("getting point biserial correlation between binary and numeric", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
assert isBinary(data1), "first data set is not binary"
self.ensureSameSize([data1, data2])
stat, pvalue = sta.pointbiserialr(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def getConTab(self, ds1, ds2):
"""
get contingency table for categorical data pair
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
"""
self.__printBanner("getting contingency table for categorical data", ds1, ds2)
data1 = self.getCatData(ds1)
data2 = self.getCatData(ds2)
self.ensureSameSize([data1, data2])
crosstab = pd.crosstab(pd.Series(data1), pd.Series(data2), margins = False)
ctab = crosstab.values
print("contingency table")
print(ctab)
return ctab
def getChiSqCorr(self, ds1, ds2, sigLev=.05):
"""
chi square correlation for categorical data pair
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("getting chi square correlation for two categorical", ds1, ds2)
ctab = self.getConTab(ds1, ds2)
stat, pvalue, dof, expctd = sta.chi2_contingency(ctab)
result = self.__printResult("stat", stat, "pvalue", pvalue, "dof", dof, "expected", expctd)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def getSizeCorrectChiSqCorr(self, ds1, ds2, chisq):
"""
cramerV size corrected chi square correlation for categorical data pair
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
chisq: chisq stat
"""
self.__printBanner("getting size corrected chi square correlation for two categorical", ds1, ds2)
c1 = self.getCatUniqueValueCounts(ds1)["cardinality"]
c2 = self.getCatUniqueValueCounts(ds2)["cardinality"]
c = min(c1,c2)
assertGreater(c, 1, "min cardinality should be greater than 1")
l = len(self.getCatData(ds1))
t = l * (c - 1)
stat = math.sqrt(chisq / t)
result = self.__printResult("stat", stat)
return result
def getAnovaCorr(self, ds1, ds2, grByCol, sigLev=.05):
"""
anova correlation for numerical categorical
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
grByCol : group by column
sigLev: statistical significance level
"""
self.__printBanner("anova correlation for numerical categorical", ds1, ds2)
df = self.loadCatFloatDataFrame(ds1, ds2) if grByCol == 0 else self.loadCatFloatDataFrame(ds2, ds1)
grByCol = 0
dCol = 1
grouped = df.groupby([grByCol])
dlist = list(map(lambda v : v[1].loc[:, dCol].values, grouped))
stat, pvalue = sta.f_oneway(*dlist)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev)
return result
def plotAutoCorr(self, ds, lags, alpha, diffOrder=0):
"""
plots auto correlation
Parameters
ds: data set name or list or numpy array
lags: num of lags
alpha: confidence level
"""
self.__printBanner("plotting auto correlation", ds)
data = self.getNumericData(ds)
ddata = difference(data, diffOrder) if diffOrder > 0 else data
tsaplots.plot_acf(ddata, lags = lags, alpha = alpha)
plt.show()
def getAutoCorr(self, ds, lags, alpha=.05):
"""
gets auts correlation
Parameters
ds: data set name or list or numpy array
lags: num of lags
alpha: confidence level
"""
self.__printBanner("getting auto correlation", ds)
data = self.getNumericData(ds)
autoCorr, confIntv = stt.acf(data, nlags=lags, fft=False, alpha=alpha)
result = self.__printResult("autoCorr", autoCorr, "confIntv", confIntv)
return result
def plotParAcf(self, ds, lags, alpha):
"""
partial auto correlation
Parameters
ds: data set name or list or numpy array
lags: num of lags
alpha: confidence level
"""
self.__printBanner("plotting partial auto correlation", ds)
data = self.getNumericData(ds)
tsaplots.plot_pacf(data, lags = lags, alpha = alpha)
plt.show()
def getParAutoCorr(self, ds, lags, alpha=.05):
"""
gets partial auts correlation
Parameters
ds: data set name or list or numpy array
lags: num of lags
alpha: confidence level
"""
self.__printBanner("getting partial auto correlation", ds)
data = self.getNumericData(ds)
partAutoCorr, confIntv = stt.pacf(data, nlags=lags, alpha=alpha)
result = self.__printResult("partAutoCorr", partAutoCorr, "confIntv", confIntv)
return result
def getHurstExp(self, ds, kind, doPlot=True):
"""
gets Hurst exponent of time series
Parameters
ds: data set name or list or numpy array
kind: kind of data change, random_walk, price
doPlot: True for plot
"""
self.__printBanner("getting Hurst exponent", ds)
data = self.getNumericData(ds)
h, c, odata = hurst.compute_Hc(data, kind=kind, simplified=False)
if doPlot:
f, ax = plt.subplots()
ax.plot(odata[0], c * odata[0] ** h, color="deepskyblue")
ax.scatter(odata[0], odata[1], color="purple")
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("time interval")
ax.set_ylabel("cum dev range and std dev ratio")
ax.grid(True)
plt.show()
result = self.__printResult("hurstExponent", h, "hurstConstant", c)
return result
def approxEntropy(self, ds, m, r):
"""
gets apprx entroty of time series (ref: wikipedia)
Parameters
ds: data set name or list or numpy array
m: length of compared run of data
r: filtering level
"""
self.__printBanner("getting approximate entropy", ds)
ldata = self.getNumericData(ds)
aent = abs(self.__phi(ldata, m + 1, r) - self.__phi(ldata, m, r))
result = self.__printResult("approxEntropy", aent)
return result
def __phi(self, ldata, m, r):
"""
phi function for approximate entropy
Parameters
ldata: data array
m: length of compared run of data
r: filtering level
"""
le = len(ldata)
x = [[ldata[j] for j in range(i, i + m - 1 + 1)] for i in range(le - m + 1)]
lex = len(x)
c = list()
for i in range(lex):
cnt = 0
for j in range(lex):
cnt += (1 if maxListDist(x[i], x[j]) <= r else 0)
cnt /= (le - m + 1.0)
c.append(cnt)
return sum(np.log(c)) / (le - m + 1.0)
def oneSpaceEntropy(self, ds, scaMethod="zscale"):
"""
gets one space entroty (ref: Estimating mutual information by Kraskov)
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting one space entropy", ds)
data = self.getNumericData(ds)
sdata = sorted(data)
sdata = scaleData(sdata, scaMethod)
su = 0
n = len(sdata)
for i in range(1, n, 1):
t = abs(sdata[i] - sdata[i-1])
if t > 0:
su += log(t)
su /= (n -1)
#print(su)
ose = digammaFun(n) - digammaFun(1) + su
result = self.__printResult("entropy", ose)
return result
def plotCrossCorr(self, ds1, ds2, normed, lags):
"""
plots cross correlation
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
normed: If True, input vectors are normalised to unit
lags: num of lags
"""
self.__printBanner("plotting cross correlation between two numeric", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
plt.xcorr(data1, data2, normed=normed, maxlags=lags)
plt.show()
def getCrossCorr(self, ds1, ds2):
"""
gets cross correlation
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
"""
self.__printBanner("getting cross correlation", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
self.ensureSameSize([data1, data2])
crossCorr = stt.ccf(data1, data2)
result = self.__printResult("crossCorr", crossCorr)
return result
def getFourierTransform(self, ds):
"""
gets fast fourier transform
Parameters
ds: data set name or list or numpy array
"""
self.__printBanner("getting fourier transform", ds)
data = self.getNumericData(ds)
ft = np.fft.rfft(data)
result = self.__printResult("fourierTransform", ft)
return result
def testStationaryAdf(self, ds, regression, autolag, sigLev=.05):
"""
Adf stationary test null hyp not stationary
Parameters
ds: data set name or list or numpy array
regression: constant and trend order to include in regression
autolag: method to use when automatically determining the lag
sigLev: statistical significance level
"""
self.__printBanner("doing ADF stationary test", ds)
relist = ["c","ct","ctt","nc"]
assert regression in relist, "invalid regression value"
alList = ["AIC", "BIC", "t-stat", None]
assert autolag in alList, "invalid autolag value"
data = self.getNumericData(ds)
re = stt.adfuller(data, regression=regression, autolag=autolag)
result = self.__printResult("stat", re[0], "pvalue", re[1] , "num lags", re[2] , "num observation for regression", re[3],
"critial values", re[4])
self.__printStat(re[0], re[1], "probably not stationary", "probably stationary", sigLev)
return result
def testStationaryKpss(self, ds, regression, nlags, sigLev=.05):
"""
Kpss stationary test null hyp stationary
Parameters
ds: data set name or list or numpy array
regression: constant and trend order to include in regression
nlags : no of lags
sigLev: statistical significance level
"""
self.__printBanner("doing KPSS stationary test", ds)
relist = ["c","ct"]
assert regression in relist, "invalid regression value"
nlList =[None, "auto", "legacy"]
assert nlags in nlList or type(nlags) == int, "invalid nlags value"
data = self.getNumericData(ds)
stat, pvalue, nLags, criticalValues = stt.kpss(data, regression=regression, lags=nlags)
result = self.__printResult("stat", stat, "pvalue", pvalue, "num lags", nLags, "critial values", criticalValues)
self.__printStat(stat, pvalue, "probably stationary", "probably not stationary", sigLev)
return result
def testNormalJarqBera(self, ds, sigLev=.05):
"""
jarque bera normalcy test
Parameters
ds: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing ajrque bera normalcy test", ds)
data = self.getNumericData(ds)
jb, jbpv, skew, kurtosis = sstt.jarque_bera(data)
result = self.__printResult("stat", jb, "pvalue", jbpv, "skew", skew, "kurtosis", kurtosis)
self.__printStat(jb, jbpv, "probably gaussian", "probably not gaussian", sigLev)
return result
def testNormalShapWilk(self, ds, sigLev=.05):
"""
shapiro wilks normalcy test
Parameters
ds: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing shapiro wilks normalcy test", ds)
data = self.getNumericData(ds)
stat, pvalue = sta.shapiro(data)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev)
return result
def testNormalDagast(self, ds, sigLev=.05):
"""
D’Agostino’s K square normalcy test
Parameters
ds: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing D’Agostino’s K square normalcy test", ds)
data = self.getNumericData(ds)
stat, pvalue = sta.normaltest(data)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev)
return result
def testDistrAnderson(self, ds, dist, sigLev=.05):
"""
Anderson test for normal, expon, logistic, gumbel, gumbel_l, gumbel_r
Parameters
ds: data set name or list or numpy array
dist: type of distribution
sigLev: statistical significance level
"""
self.__printBanner("doing Anderson test for for various distributions", ds)
diList = ["norm", "expon", "logistic", "gumbel", "gumbel_l", "gumbel_r", "extreme1"]
assert dist in diList, "invalid distribution"
data = self.getNumericData(ds)
re = sta.anderson(data)
slAlpha = int(100 * sigLev)
msg = "significnt value not found"
for i in range(len(re.critical_values)):
sl, cv = re.significance_level[i], re.critical_values[i]
if int(sl) == slAlpha:
if re.statistic < cv:
msg = "probably {} at the {:.3f} siginificance level".format(dist, sl)
else:
msg = "probably not {} at the {:.3f} siginificance level".format(dist, sl)
result = self.__printResult("stat", re.statistic, "test", msg)
print(msg)
return result
def testSkew(self, ds, sigLev=.05):
"""
test skew wrt normal distr
Parameters
ds: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("testing skew wrt normal distr", ds)
data = self.getNumericData(ds)
stat, pvalue = sta.skewtest(data)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same skew as normal distribution", "probably not same skew as normal distribution", sigLev)
return result
def testTwoSampleStudent(self, ds1, ds2, sigLev=.05):
"""
student t 2 sample test
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing student t 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.ttest_ind(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
return result
def testTwoSampleKs(self, ds1, ds2, sigLev=.05):
"""
Kolmogorov Sminov 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Kolmogorov Sminov 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.ks_2samp(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
def testTwoSampleMw(self, ds1, ds2, sigLev=.05):
"""
Mann-Whitney 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Mann-Whitney 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.mannwhitneyu(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
def testTwoSampleWilcox(self, ds1, ds2, sigLev=.05):
"""
Wilcoxon Signed-Rank 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Wilcoxon Signed-Rank 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.wilcoxon(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
def testTwoSampleKw(self, ds1, ds2, sigLev=.05):
"""
Kruskal-Wallis 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Kruskal-Wallis 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.kruskal(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably snot ame distribution", sigLev)
def testTwoSampleFriedman(self, ds1, ds2, ds3, sigLev=.05):
"""
Friedman 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Friedman 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
data3 = self.getNumericData(ds3)
stat, pvalue = sta.friedmanchisquare(data1, data2, data3)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
def testTwoSampleEs(self, ds1, ds2, sigLev=.05):
"""
Epps Singleton 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Epps Singleton 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.epps_singleton_2samp(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev)
def testTwoSampleAnderson(self, ds1, ds2, sigLev=.05):
"""
Anderson 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Anderson 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
dseq = (data1, data2)
stat, critValues, sLev = sta.anderson_ksamp(dseq)
slAlpha = 100 * sigLev
if slAlpha == 10:
cv = critValues[1]
elif slAlpha == 5:
cv = critValues[2]
elif slAlpha == 2.5:
cv = critValues[3]
elif slAlpha == 1:
cv = critValues[4]
else:
cv = None
result = self.__printResult("stat", stat, "critValues", critValues, "critValue", cv, "significanceLevel", sLev)
print("stat: {:.3f}".format(stat))
if cv is None:
msg = "critical values value not found for provided siginificance level"
else:
if stat < cv:
msg = "probably same distribution at the {:.3f} siginificance level".format(sigLev)
else:
msg = "probably not same distribution at the {:.3f} siginificance level".format(sigLev)
print(msg)
return result
def testTwoSampleScaleAb(self, ds1, ds2, sigLev=.05):
"""
Ansari Bradley 2 sample scale statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.ansari(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev)
return result
def testTwoSampleScaleMood(self, ds1, ds2, sigLev=.05):
"""
Mood 2 sample scale statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Mood 2 sample scale test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.mood(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev)
return result
def testTwoSampleVarBartlet(self, ds1, ds2, sigLev=.05):
"""
Ansari Bradley 2 sample scale statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.bartlett(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev)
return result
def testTwoSampleVarLevene(self, ds1, ds2, sigLev=.05):
"""
Levene 2 sample variance statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Levene 2 sample variance test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.levene(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev)
return result
def testTwoSampleVarFk(self, ds1, ds2, sigLev=.05):
"""
Fligner-Killeen 2 sample variance statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Fligner-Killeen 2 sample variance test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue = sta.fligner(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue)
self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev)
return result
def testTwoSampleMedMood(self, ds1, ds2, sigLev=.05):
"""
Mood 2 sample median statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Mood 2 sample median test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat, pvalue, median, ctable = sta.median_test(data1, data2)
result = self.__printResult("stat", stat, "pvalue", pvalue, "median", median, "contigencyTable", ctable)
self.__printStat(stat, pvalue, "probably same median", "probably not same median", sigLev)
return result
def testTwoSampleZc(self, ds1, ds2, sigLev=.05):
"""
Zhang-C 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Zhang-C 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
l1 = len(data1)
l2 = len(data2)
l = l1 + l2
#find ranks
pooled = np.concatenate([data1, data2])
ranks = findRanks(data1, pooled)
ranks.extend(findRanks(data2, pooled))
s1 = 0.0
for i in range(1, l1+1):
s1 += math.log(l1 / (i - 0.5) - 1.0) * math.log(l / (ranks[i-1] - 0.5) - 1.0)
s2 = 0.0
for i in range(1, l2+1):
s2 += math.log(l2 / (i - 0.5) - 1.0) * math.log(l / (ranks[l1 + i - 1] - 0.5) - 1.0)
stat = (s1 + s2) / l
print(formatFloat(3, stat, "stat:"))
return stat
def testTwoSampleZa(self, ds1, ds2, sigLev=.05):
"""
Zhang-A 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Zhang-A 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
l1 = len(data1)
l2 = len(data2)
l = l1 + l2
pooled = np.concatenate([data1, data2])
cd1 = CumDistr(data1)
cd2 = CumDistr(data2)
sum = 0.0
for i in range(1, l+1):
v = pooled[i-1]
f1 = cd1.getDistr(v)
f2 = cd2.getDistr(v)
t1 = f1 * math.log(f1)
t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log(1.0 - f1)
sum += l1 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5))
t1 = f2 * math.log(f2)
t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log(1.0 - f2)
sum += l2 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5))
stat = -sum
print(formatFloat(3, stat, "stat:"))
return stat
def testTwoSampleZk(self, ds1, ds2, sigLev=.05):
"""
Zhang-K 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing Zhang-K 2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
l1 = len(data1)
l2 = len(data2)
l = l1 + l2
pooled = np.concatenate([data1, data2])
cd1 = CumDistr(data1)
cd2 = CumDistr(data2)
cd = CumDistr(pooled)
maxStat = None
for i in range(1, l+1):
v = pooled[i-1]
f1 = cd1.getDistr(v)
f2 = cd2.getDistr(v)
f = cd.getDistr(v)
t1 = 0 if f1 == 0 else f1 * math.log(f1 / f)
t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log((1.0 - f1) / (1.0 - f))
stat = l1 * (t1 + t2)
t1 = 0 if f2 == 0 else f2 * math.log(f2 / f)
t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log((1.0 - f2) / (1.0 - f))
stat += l2 * (t1 + t2)
if maxStat is None or stat > maxStat:
maxStat = stat
print(formatFloat(3, maxStat, "stat:"))
return maxStat
def testTwoSampleCvm(self, ds1, ds2, sigLev=.05):
"""
2 sample cramer von mises
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
sigLev: statistical significance level
"""
self.__printBanner("doing 2 sample CVM test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
data = np.concatenate((data1,data2))
rdata = sta.rankdata(data)
n = len(data1)
m = len(data2)
l = n + m
s1 = 0
for i in range(n):
t = rdata[i] - (i+1)
s1 += (t * t)
s1 *= n
s2 = 0
for i in range(m):
t = rdata[i + n] - (i+1)
s2 += (t * t)
s2 *= m
u = s1 + s2
stat = u / (n * m * l) - (4 * m * n - 1) / (6 * l)
result = self.__printResult("stat", stat)
return result
def ensureSameSize(self, dlist):
"""
ensures all data sets are of same size
Parameters
dlist : data source list
"""
le = None
for d in dlist:
cle = len(d)
if le is None:
le = cle
else:
assert cle == le, "all data sets need to be of same size"
def testTwoSampleWasserstein(self, ds1, ds2):
"""
Wasserstein 2 sample statistic
Parameters
ds1: data set name or list or numpy array
ds2: data set name or list or numpy array
"""
self.__printBanner("doing Wasserstein distance2 sample test", ds1, ds2)
data1 = self.getNumericData(ds1)
data2 = self.getNumericData(ds2)
stat = sta.wasserstein_distance(data1, data2)
sd = np.std(np.concatenate([data1, data2]))
nstat = stat / sd
result = self.__printResult("stat", stat, "normalizedStat", nstat)
return result
def getMaxRelMinRedFeatures(self, fdst, tdst, nfeatures, nbins=20):
"""
get top n features based on max relevance and min redudancy algorithm
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nfeatures : desired no of features
nbins : no of bins for numerical data
"""
self.__printBanner("doing max relevance min redundancy feature selection")
return self.getMutInfoFeatures(fdst, tdst, nfeatures, "mrmr", nbins)
def getJointMutInfoFeatures(self, fdst, tdst, nfeatures, nbins=20):
"""
get top n features based on joint mutual infoormation algorithm
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nfeatures : desired no of features
nbins : no of bins for numerical data
"""
self.__printBanner("doingjoint mutual info feature selection")
return self.getMutInfoFeatures(fdst, tdst, nfeatures, "jmi", nbins)
def getCondMutInfoMaxFeatures(self, fdst, tdst, nfeatures, nbins=20):
"""
get top n features based on condition mutual information maximization algorithm
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nfeatures : desired no of features
nbins : no of bins for numerical data
"""
self.__printBanner("doing conditional mutual info max feature selection")
return self.getMutInfoFeatures(fdst, tdst, nfeatures, "cmim", nbins)
def getInteractCapFeatures(self, fdst, tdst, nfeatures, nbins=20):
"""
get top n features based on interaction capping algorithm
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nfeatures : desired no of features
nbins : no of bins for numerical data
"""
self.__printBanner("doing interaction capped feature selection")
return self.getMutInfoFeatures(fdst, tdst, nfeatures, "icap", nbins)
def getMutInfoFeatures(self, fdst, tdst, nfeatures, algo, nbins=20):
"""
get top n features based on various mutual information based algorithm
ref: Conditional likelihood maximisation : A unifying framework for information
theoretic feature selection, Gavin Brown
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nfeatures : desired no of features
algo: mi based feature selection algorithm
nbins : no of bins for numerical data
"""
#verify data source types types
le = len(fdst)
nfeatGiven = int(le / 2)
assertGreater(nfeatGiven, nfeatures, "no of features should be greater than no of features to be selected")
fds = list()
types = ["num", "cat"]
for i in range (0, le, 2):
ds = fdst[i]
dt = fdst[i+1]
assertInList(dt, types, "invalid type for data source " + dt)
data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds)
p =(ds, dt)
fds.append(p)
algos = ["mrmr", "jmi", "cmim", "icap"]
assertInList(algo, algos, "invalid feature selection algo " + algo)
assertInList(tdst[1], types, "invalid type for data source " + tdst[1])
data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0])
#print(fds)
sfds = list()
selected = set()
relevancies = dict()
for i in range(nfeatures):
#print(i)
scorem = None
dsm = None
dsmt = None
for ds, dt in fds:
#print(ds, dt)
if ds not in selected:
#relevancy
if ds in relevancies:
mutInfo = relevancies[ds]
else:
mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"]
relevancies[ds] = mutInfo
relev = mutInfo
#print("relev", relev)
#redundancy
smi = 0
reds = list()
for sds, sdt, _ in sfds:
#print(sds, sdt)
mutInfo = self.getMutualInfo([ds, dt, sds, sdt], nbins)["mutInfo"]
mutInfoCnd = self.getCondMutualInfo([ds, dt, sds, sdt, tdst[0], tdst[1]], nbins)["condMutInfo"] \
if algo != "mrmr" else 0
red = mutInfo - mutInfoCnd
reds.append(red)
if algo == "mrmr" or algo == "jmi":
redun = sum(reds) / len(sfds) if len(sfds) > 0 else 0
elif algo == "cmim" or algo == "icap":
redun = max(reds) if len(sfds) > 0 else 0
if algo == "icap":
redun = max(0, redun)
#print("redun", redun)
score = relev - redun
if scorem is None or score > scorem:
scorem = score
dsm = ds
dsmt = dt
pa = (dsm, dsmt, scorem)
#print(pa)
sfds.append(pa)
selected.add(dsm)
selFeatures = list(map(lambda r : (r[0], r[2]), sfds))
result = self.__printResult("selFeatures", selFeatures)
return result
def getFastCorrFeatures(self, fdst, tdst, delta, nbins=20):
"""
get top features based on Fast Correlation Based Filter (FCBF)
ref: Feature Selection for High-Dimensional Data: A Fast Correlation-Based Filter Solution
Lei Yu
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
delta : feature, target correlation threshold
nbins : no of bins for numerical data
"""
le = len(fdst)
nfeatGiven = int(le / 2)
fds = list()
types = ["num", "cat"]
for i in range (0, le, 2):
ds = fdst[i]
dt = fdst[i+1]
assertInList(dt, types, "invalid type for data source " + dt)
data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds)
p =(ds, dt)
fds.append(p)
assertInList(tdst[1], types, "invalid type for data source " + tdst[1])
data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0])
# get features with symetric uncertainty above threshold
tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"]
rfeatures = list()
fentrs = dict()
for ds, dt in fds:
mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"]
fentr = self.getAnyEntropy(ds, dt, nbins)["entropy"]
sunc = 2 * mutInfo / (tentr + fentr)
#print("ds {} sunc {:.3f}".format(ds, sunc))
if sunc >= delta:
f = [ds, dt, sunc, False]
rfeatures.append(f)
fentrs[ds] = fentr
# sort descending of sym uncertainty
rfeatures.sort(key=lambda e : e[2], reverse=True)
#disccard redundant features
le = len(rfeatures)
for i in range(le):
if rfeatures[i][3]:
continue
for j in range(i+1, le, 1):
if rfeatures[j][3]:
continue
mutInfo = self.getMutualInfo([rfeatures[i][0], rfeatures[i][1], rfeatures[j][0], rfeatures[j][1]], nbins)["mutInfo"]
sunc = 2 * mutInfo / (fentrs[rfeatures[i][0]] + fentrs[rfeatures[j][0]])
if sunc >= rfeatures[j][2]:
rfeatures[j][3] = True
frfeatures = list(filter(lambda f : not f[3], rfeatures))
selFeatures = list(map(lambda f : [f[0], f[2]], frfeatures))
result = self.__printResult("selFeatures", selFeatures)
return result
def getInfoGainFeatures(self, fdst, tdst, nfeatures, nsplit, nbins=20):
"""
get top n features based on information gain or entropy loss
Parameters
fdst: list of pair of data set name or list or numpy array and data type
tdst: target data set name or list or numpy array and data type (cat for classification num for regression)
nsplit : num of splits
nfeatures : desired no of features
nbins : no of bins for numerical data
"""
le = len(fdst)
nfeatGiven = int(le / 2)
assertGreater(nfeatGiven, nfeatures, "available features should be greater than desired")
fds = list()
types = ["num", "cat"]
for i in range (0, le, 2):
ds = fdst[i]
dt = fdst[i+1]
assertInList(dt, types, "invalid type for data source " + dt)
data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds)
p =(ds, dt)
fds.append(p)
assertInList(tdst[1], types, "invalid type for data source " + tdst[1])
assertGreater(nsplit, 3, "minimum 4 splits necessary")
tdata = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0])
tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"]
sz =len(tdata)
sfds = list()
for ds, dt in fds:
#print(ds, dt)
if dt == "num":
fd = self.getNumericData(ds)
_ , _ , vmax, vmin = self.__getBasicStats(fd)
intv = (vmax - vmin) / nsplit
maxig = None
spmin = vmin + intv
spmax = vmax - 0.9 * intv
#iterate all splits
for sp in np.arange(spmin, spmax, intv):
ltvals = list()
gevals = list()
for i in range(len(fd)):
if fd[i] < sp:
ltvals.append(tdata[i])
else:
gevals.append(tdata[i])
self.addListNumericData(ltvals, "spds") if tdst[1] == "num" else self.addListCatData(ltvals, "spds")
lten = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"]
self.addListNumericData(gevals, "spds") if tdst[1] == "num" else self.addListCatData(gevals, "spds")
geen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"]
#info gain
ig = tentr - (len(ltvals) * lten / sz + len(gevals) * geen / sz)
if maxig is None or ig > maxig:
maxig = ig
pa = (ds, maxig)
sfds.append(pa)
else:
fd = self.getCatData(ds)
fds = set(fd)
fdps = genPowerSet(fds)
maxig = None
#iterate all subsets
for s in fdps:
if len(s) == len(fds):
continue
invals = list()
exvals = list()
for i in range(len(fd)):
if fd[i] in s:
invals.append(tdata[i])
else:
exvals.append(tdata[i])
self.addListNumericData(invals, "spds") if tdst[1] == "num" else self.addListCatData(invals, "spds")
inen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"]
self.addListNumericData(exvals, "spds") if tdst[1] == "num" else self.addListCatData(exvals, "spds")
exen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"]
ig = tentr - (len(invals) * inen / sz + len(exvals) * exen / sz)
if maxig is None or ig > maxig:
maxig = ig
pa = (ds, maxig)
sfds.append(pa)
#sort of info gain
sfds.sort(key = lambda v : v[1], reverse = True)
result = self.__printResult("selFeatures", sfds[:nfeatures])
return result
def __stackData(self, *dsl):
"""
stacks collumd to create matrix
Parameters
dsl: data source list
"""
dlist = tuple(map(lambda ds : self.getNumericData(ds), dsl))
self.ensureSameSize(dlist)
dmat = np.column_stack(dlist)
return dmat
def __printBanner(self, msg, *dsl):
"""
print banner for any function
Parameters
msg: message
dsl: list of data set name or list or numpy array
"""
tags = list(map(lambda ds : ds if type(ds) == str else "annoynymous", dsl))
forData = " for data sets " if tags else ""
msg = msg + forData + " ".join(tags)
if self.verbose:
print("\n== " + msg + " ==")
def __printDone(self):
"""
print banner for any function
"""
if self.verbose:
print("done")
def __printStat(self, stat, pvalue, nhMsg, ahMsg, sigLev=.05):
"""
generic stat and pvalue output
Parameters
stat : stat value
pvalue : p value
nhMsg : null hypothesis violation message
ahMsg : null hypothesis message
sigLev : significance level
"""
if self.verbose:
print("\ntest result:")
print("stat: {:.3f}".format(stat))
print("pvalue: {:.3f}".format(pvalue))
print("significance level: {:.3f}".format(sigLev))
print(nhMsg if pvalue > sigLev else ahMsg)
def __printResult(self, *values):
"""
print results
Parameters
values : flattened kay and value pairs
"""
result = dict()
assert len(values) % 2 == 0, "key value list should have even number of items"
for i in range(0, len(values), 2):
result[values[i]] = values[i+1]
if self.verbose:
print("result details:")
self.pp.pprint(result)
return result
def __getBasicStats(self, data):
"""
get mean and std dev
Parameters
data : numpy array
"""
mean = np.average(data)
sd = np.std(data)
r = (mean, sd, np.max(data), np.min(data))
return r