#!/usr/local/bin/python3 # avenir-python: Machine Learning # Author: Pranab Ghosh # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You may # obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. # Package imports import os import sys import matplotlib.pyplot as plt import numpy as np import matplotlib import random import jprops import statistics from matplotlib import pyplot from .util import * from .mlutil import * from .sampler import * class MonteCarloSimulator(object): """ monte carlo simulator for intergation, various statistic for complex fumctions """ def __init__(self, numIter, callback, logFilePath, logLevName): """ constructor Parameters numIter :num of iterations callback : call back method logFilePath : log file path logLevName : log level """ self.samplers = list() self.numIter = numIter; self.callback = callback self.extraArgs = None self.output = list() self.sum = None self.mean = None self.sd = None self.replSamplers = dict() self.prSamples = None self.logger = None if logFilePath is not None: self.logger = createLogger(__name__, logFilePath, logLevName) self.logger.info("******** stating new session of MonteCarloSimulator") def registerBernoulliTrialSampler(self, pr): """ bernoulli trial sampler Parameters pr : probability """ self.samplers.append(BernoulliTrialSampler(pr)) def registerPoissonSampler(self, rateOccur, maxSamp): """ poisson sampler Parameters rateOccur : rate of occurence maxSamp : max limit on no of samples """ self.samplers.append(PoissonSampler(rateOccur, maxSamp)) def registerUniformSampler(self, minv, maxv): """ uniform sampler Parameters minv : min value maxv : max value """ self.samplers.append(UniformNumericSampler(minv, maxv)) def registerTriangularSampler(self, min, max, vertexValue, vertexPos=None): """ triangular sampler Parameters xmin : min value xmax : max value vertexValue : distr value at vertex vertexPos : vertex pposition """ self.samplers.append(TriangularRejectSampler(min, max, vertexValue, vertexPos)) def registerGaussianSampler(self, mean, sd): """ gaussian sampler Parameters mean : mean sd : std deviation """ self.samplers.append(GaussianRejectSampler(mean, sd)) def registerNormalSampler(self, mean, sd): """ gaussian sampler using numpy Parameters mean : mean sd : std deviation """ self.samplers.append(NormalSampler(mean, sd)) def registerLogNormalSampler(self, mean, sd): """ log normal sampler using numpy Parameters mean : mean sd : std deviation """ self.samplers.append(LogNormalSampler(mean, sd)) def registerParetoSampler(self, mode, shape): """ pareto sampler using numpy Parameters mode : mode shape : shape """ self.samplers.append(ParetoSampler(mode, shape)) def registerGammaSampler(self, shape, scale): """ gamma sampler using numpy Parameters shape : shape scale : scale """ self.samplers.append(GammaSampler(shape, scale)) def registerDiscreteRejectSampler(self, xmin, xmax, step, *values): """ disccrete int sampler Parameters xmin : min value xmax : max value step : discrete step values : distr values """ self.samplers.append(DiscreteRejectSampler(xmin, xmax, step, *values)) def registerNonParametricSampler(self, minv, binWidth, *values): """ nonparametric sampler Parameters xmin : min value binWidth : bin width values : distr values """ sampler = NonParamRejectSampler(minv, binWidth, *values) sampler.sampleAsFloat() self.samplers.append(sampler) def registerMultiVarNormalSampler(self, numVar, *values): """ multi var gaussian sampler using numpy Parameters numVar : no of variables values : numVar mean values followed by numVar x numVar values for covar matrix """ self.samplers.append(MultiVarNormalSampler(numVar, *values)) def registerJointNonParamRejectSampler(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values): """ joint nonparametric sampler Parameters xmin : min value for x xbinWidth : bin width for x xnbin : no of bins for x ymin : min value for y ybinWidth : bin width for y ynbin : no of bins for y values : distr values """ self.samplers.append(JointNonParamRejectSampler(xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values)) def registerRangePermutationSampler(self, minv, maxv, *numShuffles): """ permutation sampler with range Parameters minv : min of range maxv : max of range numShuffles : no of shuffles or range of no of shuffles """ self.samplers.append(PermutationSampler.createSamplerWithRange(minv, maxv, *numShuffles)) def registerValuesPermutationSampler(self, values, *numShuffles): """ permutation sampler with values Parameters values : list data numShuffles : no of shuffles or range of no of shuffles """ self.samplers.append(PermutationSampler.createSamplerWithValues(values, *numShuffles)) def registerNormalSamplerWithTrendCycle(self, mean, stdDev, trend, cycle, step=1): """ normal sampler with trend and cycle Parameters mean : mean stdDev : std deviation dmean : trend delta cycle : cycle values wrt base mean step : adjustment step for cycle and trend """ self.samplers.append(NormalSamplerWithTrendCycle(mean, stdDev, trend, cycle, step)) def registerCustomSampler(self, sampler): """ eventsampler Parameters sampler : sampler with sample() method """ self.samplers.append(sampler) def registerEventSampler(self, intvSampler, valSampler=None): """ event sampler Parameters intvSampler : interval sampler valSampler : value sampler """ self.samplers.append(EventSampler(intvSampler, valSampler)) def registerMetropolitanSampler(self, propStdDev, minv, binWidth, values): """ metropolitan sampler Parameters propStdDev : proposal distr std dev minv : min domain value for target distr binWidth : bin width values : target distr values """ self.samplers.append(MetropolitanSampler(propStdDev, minv, binWidth, values)) def setSampler(self, var, iter, sampler): """ set sampler for some variable when iteration reaches certain point Parameters var : sampler index iter : iteration count sampler : new sampler """ key = (var, iter) self.replSamplers[key] = sampler def registerExtraArgs(self, *args): """ extra args Parameters args : extra argument list """ self.extraArgs = args def replSampler(self, iter): """ replace samper for this iteration Parameters iter : iteration number """ if len(self.replSamplers) > 0: for v in range(self.numVars): key = (v, iter) if key in self.replSamplers: sampler = self.replSamplers[key] self.samplers[v] = sampler def run(self): """ run simulator """ self.sum = None self.mean = None self.sd = None self.numVars = len(self.samplers) vOut = 0 #print(formatAny(self.numIter, "num iterations")) for i in range(self.numIter): self.replSampler(i) args = list() for s in self.samplers: arg = s.sample() if type(arg) is list: args.extend(arg) else: args.append(arg) slen = len(args) if self.extraArgs: args.extend(self.extraArgs) args.append(self) args.append(i) vOut = self.callback(args) self.output.append(vOut) self.prSamples = args[:slen] def getOutput(self): """ get raw output """ return self.output def setOutput(self, values): """ set raw output Parameters values : output values """ self.output = values self.numIter = len(values) def drawHist(self, myTitle, myXlabel, myYlabel): """ draw histogram Parameters myTitle : title myXlabel : label for x myYlabel : label for y """ pyplot.hist(self.output, density=True) pyplot.title(myTitle) pyplot.xlabel(myXlabel) pyplot.ylabel(myYlabel) pyplot.show() def getSum(self): """ get sum """ if not self.sum: self.sum = sum(self.output) return self.sum def getMean(self): """ get average """ if self.mean is None: self.mean = statistics.mean(self.output) return self.mean def getStdDev(self): """ get std dev """ if self.sd is None: self.sd = statistics.stdev(self.output, xbar=self.mean) if self.mean else statistics.stdev(self.output) return self.sd def getMedian(self): """ get average """ med = statistics.median(self.output) return med def getMax(self): """ get max """ return max(self.output) def getMin(self): """ get min """ return min(self.output) def getIntegral(self, bounds): """ integral Parameters bounds : bound on sum """ if not self.sum: self.sum = sum(self.output) return self.sum * bounds / self.numIter def getLowerTailStat(self, zvalue, numIntPoints=50): """ get lower tail stat Parameters zvalue : zscore upper bound numIntPoints : no of interpolation point for cum distribution """ mean = self.getMean() sd = self.getStdDev() tailStart = self.getMin() tailEnd = mean - zvalue * sd cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints) reqConf = floatRange(0.0, 0.150, .01) msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[-1], cvaCounts[-1][1]) assert reqConf[-1] < cvaCounts[-1][1], msg critValues = self.interpolateCritValues(reqConf, cvaCounts, True, tailStart, tailEnd) return critValues def getPercentile(self, cvalue): """ percentile Parameters cvalue : value for percentile """ count = 0 for v in self.output: if v < cvalue: count += 1 percent = int(count * 100.0 / self.numIter) return percent def getCritValue(self, pvalue): """ critical value for probabaility threshold Parameters pvalue : pvalue """ assertWithinRange(pvalue, 0.0, 1.0, "invalid probabaility value") svalues = self.output.sorted() ppval = None cpval = None intv = 1.0 / (self.numIter - 1) for i in range(self.numIter - 1): cpval = (i + 1) / self.numIter if cpval > pvalue: sl = svalues[i] - svalues[i-1] cval = svalues[i-1] + sl * (pvalue - ppval) break ppval = cpval return cval def getUpperTailStat(self, zvalue, numIntPoints=50): """ upper tail stat Parameters zvalue : zscore upper bound numIntPoints : no of interpolation point for cum distribution """ mean = self.getMean() sd = self.getStdDev() tailStart = mean + zvalue * sd tailEnd = self.getMax() cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints) reqConf = floatRange(0.85, 1.0, .01) msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[0], cvaCounts[0][1]) assert reqConf[0] > cvaCounts[0][1], msg critValues = self.interpolateCritValues(reqConf, cvaCounts, False, tailStart, tailEnd) return critValues def cumDistr(self, tailStart, tailEnd, numIntPoints): """ cumulative distribution at tail Parameters tailStart : tail start tailEnd : tail end numIntPoints : no of interpolation points """ delta = (tailEnd - tailStart) / numIntPoints cvalues = floatRange(tailStart, tailEnd, delta) cvaCounts = list() for cv in cvalues: count = 0 for v in self.output: if v < cv: count += 1 p = (cv, count/self.numIter) if self.logger is not None: self.logger.info("{:.3f} {:.3f}".format(p[0], p[1])) cvaCounts.append(p) return cvaCounts def interpolateCritValues(self, reqConf, cvaCounts, lowertTail, tailStart, tailEnd): """ interpolate for spefici confidence limits Parameters reqConf : confidence level values cvaCounts : cum values lowertTail : True if lower tail tailStart ; tail start tailEnd : tail end """ critValues = list() if self.logger is not None: self.logger.info("target conf limit " + str(reqConf)) reqConfSub = reqConf[1:] if lowertTail else reqConf[:-1] for rc in reqConfSub: for i in range(len(cvaCounts) -1): if rc >= cvaCounts[i][1] and rc < cvaCounts[i+1][1]: #print("interpoltate between " + str(cvaCounts[i]) + " and " + str(cvaCounts[i+1])) slope = (cvaCounts[i+1][0] - cvaCounts[i][0]) / (cvaCounts[i+1][1] - cvaCounts[i][1]) cval = cvaCounts[i][0] + slope * (rc - cvaCounts[i][1]) p = (rc, cval) if self.logger is not None: self.logger.debug("interpolated crit values {:.3f} {:.3f}".format(p[0], p[1])) critValues.append(p) break if lowertTail: p = (0.0, tailStart) critValues.insert(0, p) else: p = (1.0, tailEnd) critValues.append(p) return critValues