Priyanka-Kumavat-At-TE's picture
Upload 7 files
e03eaf2
#!/usr/local/bin/python3
# avenir-python: Machine Learning
# Author: Pranab Ghosh
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
# Package imports
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import matplotlib
import random
import jprops
import statistics
from matplotlib import pyplot
from .util import *
from .mlutil import *
from .sampler import *
class MonteCarloSimulator(object):
"""
monte carlo simulator for intergation, various statistic for complex fumctions
"""
def __init__(self, numIter, callback, logFilePath, logLevName):
"""
constructor
Parameters
numIter :num of iterations
callback : call back method
logFilePath : log file path
logLevName : log level
"""
self.samplers = list()
self.numIter = numIter;
self.callback = callback
self.extraArgs = None
self.output = list()
self.sum = None
self.mean = None
self.sd = None
self.replSamplers = dict()
self.prSamples = None
self.logger = None
if logFilePath is not None:
self.logger = createLogger(__name__, logFilePath, logLevName)
self.logger.info("******** stating new session of MonteCarloSimulator")
def registerBernoulliTrialSampler(self, pr):
"""
bernoulli trial sampler
Parameters
pr : probability
"""
self.samplers.append(BernoulliTrialSampler(pr))
def registerPoissonSampler(self, rateOccur, maxSamp):
"""
poisson sampler
Parameters
rateOccur : rate of occurence
maxSamp : max limit on no of samples
"""
self.samplers.append(PoissonSampler(rateOccur, maxSamp))
def registerUniformSampler(self, minv, maxv):
"""
uniform sampler
Parameters
minv : min value
maxv : max value
"""
self.samplers.append(UniformNumericSampler(minv, maxv))
def registerTriangularSampler(self, min, max, vertexValue, vertexPos=None):
"""
triangular sampler
Parameters
xmin : min value
xmax : max value
vertexValue : distr value at vertex
vertexPos : vertex pposition
"""
self.samplers.append(TriangularRejectSampler(min, max, vertexValue, vertexPos))
def registerGaussianSampler(self, mean, sd):
"""
gaussian sampler
Parameters
mean : mean
sd : std deviation
"""
self.samplers.append(GaussianRejectSampler(mean, sd))
def registerNormalSampler(self, mean, sd):
"""
gaussian sampler using numpy
Parameters
mean : mean
sd : std deviation
"""
self.samplers.append(NormalSampler(mean, sd))
def registerLogNormalSampler(self, mean, sd):
"""
log normal sampler using numpy
Parameters
mean : mean
sd : std deviation
"""
self.samplers.append(LogNormalSampler(mean, sd))
def registerParetoSampler(self, mode, shape):
"""
pareto sampler using numpy
Parameters
mode : mode
shape : shape
"""
self.samplers.append(ParetoSampler(mode, shape))
def registerGammaSampler(self, shape, scale):
"""
gamma sampler using numpy
Parameters
shape : shape
scale : scale
"""
self.samplers.append(GammaSampler(shape, scale))
def registerDiscreteRejectSampler(self, xmin, xmax, step, *values):
"""
disccrete int sampler
Parameters
xmin : min value
xmax : max value
step : discrete step
values : distr values
"""
self.samplers.append(DiscreteRejectSampler(xmin, xmax, step, *values))
def registerNonParametricSampler(self, minv, binWidth, *values):
"""
nonparametric sampler
Parameters
xmin : min value
binWidth : bin width
values : distr values
"""
sampler = NonParamRejectSampler(minv, binWidth, *values)
sampler.sampleAsFloat()
self.samplers.append(sampler)
def registerMultiVarNormalSampler(self, numVar, *values):
"""
multi var gaussian sampler using numpy
Parameters
numVar : no of variables
values : numVar mean values followed by numVar x numVar values for covar matrix
"""
self.samplers.append(MultiVarNormalSampler(numVar, *values))
def registerJointNonParamRejectSampler(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values):
"""
joint nonparametric sampler
Parameters
xmin : min value for x
xbinWidth : bin width for x
xnbin : no of bins for x
ymin : min value for y
ybinWidth : bin width for y
ynbin : no of bins for y
values : distr values
"""
self.samplers.append(JointNonParamRejectSampler(xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values))
def registerRangePermutationSampler(self, minv, maxv, *numShuffles):
"""
permutation sampler with range
Parameters
minv : min of range
maxv : max of range
numShuffles : no of shuffles or range of no of shuffles
"""
self.samplers.append(PermutationSampler.createSamplerWithRange(minv, maxv, *numShuffles))
def registerValuesPermutationSampler(self, values, *numShuffles):
"""
permutation sampler with values
Parameters
values : list data
numShuffles : no of shuffles or range of no of shuffles
"""
self.samplers.append(PermutationSampler.createSamplerWithValues(values, *numShuffles))
def registerNormalSamplerWithTrendCycle(self, mean, stdDev, trend, cycle, step=1):
"""
normal sampler with trend and cycle
Parameters
mean : mean
stdDev : std deviation
dmean : trend delta
cycle : cycle values wrt base mean
step : adjustment step for cycle and trend
"""
self.samplers.append(NormalSamplerWithTrendCycle(mean, stdDev, trend, cycle, step))
def registerCustomSampler(self, sampler):
"""
eventsampler
Parameters
sampler : sampler with sample() method
"""
self.samplers.append(sampler)
def registerEventSampler(self, intvSampler, valSampler=None):
"""
event sampler
Parameters
intvSampler : interval sampler
valSampler : value sampler
"""
self.samplers.append(EventSampler(intvSampler, valSampler))
def registerMetropolitanSampler(self, propStdDev, minv, binWidth, values):
"""
metropolitan sampler
Parameters
propStdDev : proposal distr std dev
minv : min domain value for target distr
binWidth : bin width
values : target distr values
"""
self.samplers.append(MetropolitanSampler(propStdDev, minv, binWidth, values))
def setSampler(self, var, iter, sampler):
"""
set sampler for some variable when iteration reaches certain point
Parameters
var : sampler index
iter : iteration count
sampler : new sampler
"""
key = (var, iter)
self.replSamplers[key] = sampler
def registerExtraArgs(self, *args):
"""
extra args
Parameters
args : extra argument list
"""
self.extraArgs = args
def replSampler(self, iter):
"""
replace samper for this iteration
Parameters
iter : iteration number
"""
if len(self.replSamplers) > 0:
for v in range(self.numVars):
key = (v, iter)
if key in self.replSamplers:
sampler = self.replSamplers[key]
self.samplers[v] = sampler
def run(self):
"""
run simulator
"""
self.sum = None
self.mean = None
self.sd = None
self.numVars = len(self.samplers)
vOut = 0
#print(formatAny(self.numIter, "num iterations"))
for i in range(self.numIter):
self.replSampler(i)
args = list()
for s in self.samplers:
arg = s.sample()
if type(arg) is list:
args.extend(arg)
else:
args.append(arg)
slen = len(args)
if self.extraArgs:
args.extend(self.extraArgs)
args.append(self)
args.append(i)
vOut = self.callback(args)
self.output.append(vOut)
self.prSamples = args[:slen]
def getOutput(self):
"""
get raw output
"""
return self.output
def setOutput(self, values):
"""
set raw output
Parameters
values : output values
"""
self.output = values
self.numIter = len(values)
def drawHist(self, myTitle, myXlabel, myYlabel):
"""
draw histogram
Parameters
myTitle : title
myXlabel : label for x
myYlabel : label for y
"""
pyplot.hist(self.output, density=True)
pyplot.title(myTitle)
pyplot.xlabel(myXlabel)
pyplot.ylabel(myYlabel)
pyplot.show()
def getSum(self):
"""
get sum
"""
if not self.sum:
self.sum = sum(self.output)
return self.sum
def getMean(self):
"""
get average
"""
if self.mean is None:
self.mean = statistics.mean(self.output)
return self.mean
def getStdDev(self):
"""
get std dev
"""
if self.sd is None:
self.sd = statistics.stdev(self.output, xbar=self.mean) if self.mean else statistics.stdev(self.output)
return self.sd
def getMedian(self):
"""
get average
"""
med = statistics.median(self.output)
return med
def getMax(self):
"""
get max
"""
return max(self.output)
def getMin(self):
"""
get min
"""
return min(self.output)
def getIntegral(self, bounds):
"""
integral
Parameters
bounds : bound on sum
"""
if not self.sum:
self.sum = sum(self.output)
return self.sum * bounds / self.numIter
def getLowerTailStat(self, zvalue, numIntPoints=50):
"""
get lower tail stat
Parameters
zvalue : zscore upper bound
numIntPoints : no of interpolation point for cum distribution
"""
mean = self.getMean()
sd = self.getStdDev()
tailStart = self.getMin()
tailEnd = mean - zvalue * sd
cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints)
reqConf = floatRange(0.0, 0.150, .01)
msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[-1], cvaCounts[-1][1])
assert reqConf[-1] < cvaCounts[-1][1], msg
critValues = self.interpolateCritValues(reqConf, cvaCounts, True, tailStart, tailEnd)
return critValues
def getPercentile(self, cvalue):
"""
percentile
Parameters
cvalue : value for percentile
"""
count = 0
for v in self.output:
if v < cvalue:
count += 1
percent = int(count * 100.0 / self.numIter)
return percent
def getCritValue(self, pvalue):
"""
critical value for probabaility threshold
Parameters
pvalue : pvalue
"""
assertWithinRange(pvalue, 0.0, 1.0, "invalid probabaility value")
svalues = self.output.sorted()
ppval = None
cpval = None
intv = 1.0 / (self.numIter - 1)
for i in range(self.numIter - 1):
cpval = (i + 1) / self.numIter
if cpval > pvalue:
sl = svalues[i] - svalues[i-1]
cval = svalues[i-1] + sl * (pvalue - ppval)
break
ppval = cpval
return cval
def getUpperTailStat(self, zvalue, numIntPoints=50):
"""
upper tail stat
Parameters
zvalue : zscore upper bound
numIntPoints : no of interpolation point for cum distribution
"""
mean = self.getMean()
sd = self.getStdDev()
tailStart = mean + zvalue * sd
tailEnd = self.getMax()
cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints)
reqConf = floatRange(0.85, 1.0, .01)
msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[0], cvaCounts[0][1])
assert reqConf[0] > cvaCounts[0][1], msg
critValues = self.interpolateCritValues(reqConf, cvaCounts, False, tailStart, tailEnd)
return critValues
def cumDistr(self, tailStart, tailEnd, numIntPoints):
"""
cumulative distribution at tail
Parameters
tailStart : tail start
tailEnd : tail end
numIntPoints : no of interpolation points
"""
delta = (tailEnd - tailStart) / numIntPoints
cvalues = floatRange(tailStart, tailEnd, delta)
cvaCounts = list()
for cv in cvalues:
count = 0
for v in self.output:
if v < cv:
count += 1
p = (cv, count/self.numIter)
if self.logger is not None:
self.logger.info("{:.3f} {:.3f}".format(p[0], p[1]))
cvaCounts.append(p)
return cvaCounts
def interpolateCritValues(self, reqConf, cvaCounts, lowertTail, tailStart, tailEnd):
"""
interpolate for spefici confidence limits
Parameters
reqConf : confidence level values
cvaCounts : cum values
lowertTail : True if lower tail
tailStart ; tail start
tailEnd : tail end
"""
critValues = list()
if self.logger is not None:
self.logger.info("target conf limit " + str(reqConf))
reqConfSub = reqConf[1:] if lowertTail else reqConf[:-1]
for rc in reqConfSub:
for i in range(len(cvaCounts) -1):
if rc >= cvaCounts[i][1] and rc < cvaCounts[i+1][1]:
#print("interpoltate between " + str(cvaCounts[i]) + " and " + str(cvaCounts[i+1]))
slope = (cvaCounts[i+1][0] - cvaCounts[i][0]) / (cvaCounts[i+1][1] - cvaCounts[i][1])
cval = cvaCounts[i][0] + slope * (rc - cvaCounts[i][1])
p = (rc, cval)
if self.logger is not None:
self.logger.debug("interpolated crit values {:.3f} {:.3f}".format(p[0], p[1]))
critValues.append(p)
break
if lowertTail:
p = (0.0, tailStart)
critValues.insert(0, p)
else:
p = (1.0, tailEnd)
critValues.append(p)
return critValues