Spaces:
Runtime error
Runtime error
#!/usr/local/bin/python3 | |
# avenir-python: Machine Learning | |
# Author: Pranab Ghosh | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you | |
# may not use this file except in compliance with the License. You may | |
# obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. See the License for the specific language governing | |
# permissions and limitations under the License. | |
import sys | |
import random | |
import time | |
import math | |
import random | |
import numpy as np | |
from scipy import stats | |
from random import randint | |
from .util import * | |
from .stats import Histogram | |
def randomFloat(low, high): | |
""" | |
sample float within range | |
Parameters | |
low : low valuee | |
high : high valuee | |
""" | |
return random.random() * (high-low) + low | |
def randomInt(minv, maxv): | |
""" | |
sample int within range | |
Parameters | |
minv : low valuee | |
maxv : high valuee | |
""" | |
return randint(minv, maxv) | |
def randIndex(lData): | |
""" | |
random index of a list | |
Parameters | |
lData : list data | |
""" | |
return randint(0, len(lData)-1) | |
def randomUniformSampled(low, high): | |
""" | |
sample float within range | |
Parameters | |
low : low value | |
high : high value | |
""" | |
return np.random.uniform(low, high) | |
def randomUniformSampledList(low, high, size): | |
""" | |
sample floats within range to create list | |
Parameters | |
low : low value | |
high : high value | |
size ; size of list to be returned | |
""" | |
return np.random.uniform(low, high, size) | |
def randomNormSampled(mean, sd): | |
""" | |
sample float from normal | |
Parameters | |
mean : mean | |
sd : std deviation | |
""" | |
return np.random.normal(mean, sd) | |
def randomNormSampledList(mean, sd, size): | |
""" | |
sample float list from normal | |
Parameters | |
mean : mean | |
sd : std deviation | |
size : size of list to be returned | |
""" | |
return np.random.normal(mean, sd, size) | |
def randomSampledList(sampler, size): | |
""" | |
sample list from given sampler | |
Parameters | |
sampler : sampler object | |
size : size of list to be returned | |
""" | |
return list(map(lambda i : sampler.sample(), range(size))) | |
def minLimit(val, minv): | |
""" | |
min limit | |
Parameters | |
val : value | |
minv : min limit | |
""" | |
if (val < minv): | |
val = minv | |
return val | |
def rangeLimit(val, minv, maxv): | |
""" | |
range limit | |
Parameters | |
val : value | |
minv : min limit | |
maxv : max limit | |
""" | |
if (val < minv): | |
val = minv | |
elif (val > maxv): | |
val = maxv | |
return val | |
def sampleUniform(minv, maxv): | |
""" | |
sample int within range | |
Parameters | |
minv ; int min limit | |
maxv : int max limit | |
""" | |
return randint(minv, maxv) | |
def sampleFromBase(value, dev): | |
""" | |
sample int wrt base | |
Parameters | |
value : base value | |
dev : deviation | |
""" | |
return randint(value - dev, value + dev) | |
def sampleFloatFromBase(value, dev): | |
""" | |
sample float wrt base | |
Parameters | |
value : base value | |
dev : deviation | |
""" | |
return randomFloat(value - dev, value + dev) | |
def distrUniformWithRanndom(total, numItems, noiseLevel): | |
""" | |
uniformly distribute with some randomness and preserves total | |
Parameters | |
total : total count | |
numItems : no of bins | |
noiseLevel : noise level fraction | |
""" | |
perItem = total / numItems | |
var = perItem * noiseLevel | |
items = [] | |
for i in range(numItems): | |
item = perItem + randomFloat(-var, var) | |
items.append(item) | |
#adjust last item | |
sm = sum(items[:-1]) | |
items[-1] = total - sm | |
return items | |
def isEventSampled(threshold, maxv=100): | |
""" | |
sample event which occurs if sampled below threshold | |
Parameters | |
threshold : threshold for sampling | |
maxv : maximum values | |
""" | |
return randint(0, maxv) < threshold | |
def sampleBinaryEvents(events, probPercent): | |
""" | |
sample binary events | |
Parameters | |
events : two events | |
probPercent : probability as percentage | |
""" | |
if (randint(0, 100) < probPercent): | |
event = events[0] | |
else: | |
event = events[1] | |
return event | |
def addNoiseNum(value, sampler): | |
""" | |
add noise to numeric value | |
Parameters | |
value : base value | |
sampler : sampler for noise | |
""" | |
return value * (1 + sampler.sample()) | |
def addNoiseCat(value, values, noise): | |
""" | |
add noise to categorical value i.e with some probability change value | |
Parameters | |
value : cat value | |
values : cat values | |
noise : noise level fraction | |
""" | |
newValue = value | |
threshold = int(noise * 100) | |
if (isEventSampled(threshold)): | |
newValue = selectRandomFromList(values) | |
while newValue == value: | |
newValue = selectRandomFromList(values) | |
return newValue | |
def sampleWithReplace(data, sampSize): | |
""" | |
sample with replacement | |
Parameters | |
data : array | |
sampSize : sample size | |
""" | |
sampled = list() | |
le = len(data) | |
if sampSize is None: | |
sampSize = le | |
for i in range(sampSize): | |
j = random.randint(0, le - 1) | |
sampled.append(data[j]) | |
return sampled | |
class CumDistr: | |
""" | |
cumulative distr | |
""" | |
def __init__(self, data, numBins = None): | |
""" | |
initializer | |
Parameters | |
data : array | |
numBins : no of bins | |
""" | |
if not numBins: | |
numBins = int(len(data) / 5) | |
res = stats.cumfreq(data, numbins=numBins) | |
self.cdistr = res.cumcount / len(data) | |
self.loLim = res.lowerlimit | |
self.upLim = res.lowerlimit + res.binsize * res.cumcount.size | |
self.binWidth = res.binsize | |
def getDistr(self, value): | |
""" | |
get cumulative distribution | |
Parameters | |
value : value | |
""" | |
if value <= self.loLim: | |
d = 0.0 | |
elif value >= self.upLim: | |
d = 1.0 | |
else: | |
bin = int((value - self.loLim) / self.binWidth) | |
d = self.cdistr[bin] | |
return d | |
class BernoulliTrialSampler: | |
""" | |
bernoulli trial sampler return True or False | |
""" | |
def __init__(self, pr, events=None): | |
""" | |
initializer | |
Parameters | |
pr : probability | |
events : event values | |
""" | |
self.pr = pr | |
self.retEvent = False if events is None else True | |
self.events = events | |
def sample(self): | |
""" | |
samples value | |
""" | |
res = random.random() < self.pr | |
if self.retEvent: | |
res = self.events[0] if res else self.events[1] | |
return res | |
class PoissonSampler: | |
""" | |
poisson sampler returns number of events | |
""" | |
def __init__(self, rateOccur, maxSamp): | |
""" | |
initializer | |
Parameters | |
rateOccur : rate of occurence | |
maxSamp : max limit on no of samples | |
""" | |
self.rateOccur = rateOccur | |
self.maxSamp = int(maxSamp) | |
self.pmax = self.calculatePr(rateOccur) | |
def calculatePr(self, numOccur): | |
""" | |
calulates probability | |
Parameters | |
numOccur : no of occurence | |
""" | |
p = (self.rateOccur ** numOccur) * math.exp(-self.rateOccur) / math.factorial(numOccur) | |
return p | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = 0 | |
while not done: | |
no = randint(0, self.maxSamp) | |
sp = randomFloat(0.0, self.pmax) | |
ap = self.calculatePr(no) | |
if sp < ap: | |
done = True | |
samp = no | |
return samp | |
class ExponentialSampler: | |
""" | |
returns interval between events | |
""" | |
def __init__(self, rateOccur, maxSamp = None): | |
""" | |
initializer | |
Parameters | |
rateOccur : rate of occurence | |
maxSamp : max limit on interval | |
""" | |
self.interval = 1.0 / rateOccur | |
self.maxSamp = int(maxSamp) if maxSamp is not None else None | |
def sample(self): | |
""" | |
samples value | |
""" | |
sampled = np.random.exponential(scale=self.interval) | |
if self.maxSamp is not None: | |
while sampled > self.maxSamp: | |
sampled = np.random.exponential(scale=self.interval) | |
return sampled | |
class UniformNumericSampler: | |
""" | |
uniform sampler for numerical values | |
""" | |
def __init__(self, minv, maxv): | |
""" | |
initializer | |
Parameters | |
minv : min value | |
maxv : max value | |
""" | |
self.minv = minv | |
self.maxv = maxv | |
def isNumeric(self): | |
""" | |
returns true | |
""" | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
samp = sampleUniform(self.minv, self.maxv) if isinstance(self.minv, int) else randomFloat(self.minv, self.maxv) | |
return samp | |
class UniformCategoricalSampler: | |
""" | |
uniform sampler for categorical values | |
""" | |
def __init__(self, cvalues): | |
""" | |
initializer | |
Parameters | |
cvalues : categorical value list | |
""" | |
self.cvalues = cvalues | |
def isNumeric(self): | |
return False | |
def sample(self): | |
""" | |
samples value | |
""" | |
return selectRandomFromList(self.cvalues) | |
class NormalSampler: | |
""" | |
normal sampler | |
""" | |
def __init__(self, mean, stdDev): | |
""" | |
initializer | |
Parameters | |
mean : mean | |
stdDev : std deviation | |
""" | |
self.mean = mean | |
self.stdDev = stdDev | |
self.sampleAsInt = False | |
def isNumeric(self): | |
return True | |
def sampleAsIntValue(self): | |
""" | |
set True to sample as int | |
""" | |
self.sampleAsInt = True | |
def sample(self): | |
""" | |
samples value | |
""" | |
samp = np.random.normal(self.mean, self.stdDev) | |
if self.sampleAsInt: | |
samp = int(samp) | |
return samp | |
class LogNormalSampler: | |
""" | |
log normal sampler | |
""" | |
def __init__(self, mean, stdDev): | |
""" | |
initializer | |
Parameters | |
mean : mean | |
stdDev : std deviation | |
""" | |
self.mean = mean | |
self.stdDev = stdDev | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
return np.random.lognormal(self.mean, self.stdDev) | |
class NormalSamplerWithTrendCycle: | |
""" | |
normal sampler with cycle and trend | |
""" | |
def __init__(self, mean, stdDev, dmean, cycle, step=1): | |
""" | |
initializer | |
Parameters | |
mean : mean | |
stdDev : std deviation | |
dmean : trend delta | |
cycle : cycle values wrt base mean | |
step : adjustment step for cycle and trend | |
""" | |
self.mean = mean | |
self.cmean = mean | |
self.stdDev = stdDev | |
self.dmean = dmean | |
self.cycle = cycle | |
self.clen = len(cycle) if cycle is not None else 0 | |
self.step = step | |
self.count = 0 | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
s = np.random.normal(self.cmean, self.stdDev) | |
self.count += 1 | |
if self.count % self.step == 0: | |
cy = 0 | |
if self.clen > 1: | |
coff = self.count % self.clen | |
cy = self.cycle[coff] | |
tr = self.count * self.dmean | |
self.cmean = self.mean + tr + cy | |
return s | |
class ParetoSampler: | |
""" | |
pareto sampler | |
""" | |
def __init__(self, mode, shape): | |
""" | |
initializer | |
Parameters | |
mode : mode | |
shape : shape | |
""" | |
self.mode = mode | |
self.shape = shape | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
return (np.random.pareto(self.shape) + 1) * self.mode | |
class GammaSampler: | |
""" | |
pareto sampler | |
""" | |
def __init__(self, shape, scale): | |
""" | |
initializer | |
Parameters | |
shape : shape | |
scale : scale | |
""" | |
self.shape = shape | |
self.scale = scale | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
return np.random.gamma(self.shape, self.scale) | |
class GaussianRejectSampler: | |
""" | |
gaussian sampling based on rejection sampling | |
""" | |
def __init__(self, mean, stdDev): | |
""" | |
initializer | |
Parameters | |
mean : mean | |
stdDev : std deviation | |
""" | |
self.mean = mean | |
self.stdDev = stdDev | |
self.xmin = mean - 3 * stdDev | |
self.xmax = mean + 3 * stdDev | |
self.ymin = 0.0 | |
self.fmax = 1.0 / (math.sqrt(2.0 * 3.14) * stdDev) | |
self.ymax = 1.05 * self.fmax | |
self.sampleAsInt = False | |
def isNumeric(self): | |
return True | |
def sampleAsIntValue(self): | |
""" | |
sample as int value | |
""" | |
self.sampleAsInt = True | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = 0 | |
while not done: | |
x = randomFloat(self.xmin, self.xmax) | |
y = randomFloat(self.ymin, self.ymax) | |
f = self.fmax * math.exp(-(x - self.mean) * (x - self.mean) / (2.0 * self.stdDev * self.stdDev)) | |
if (y < f): | |
done = True | |
samp = x | |
if self.sampleAsInt: | |
samp = int(samp) | |
return samp | |
class DiscreteRejectSampler: | |
""" | |
non parametric sampling for discrete values using given distribution based | |
on rejection sampling | |
""" | |
def __init__(self, xmin, xmax, step, *values): | |
""" | |
initializer | |
Parameters | |
xmin : min value | |
xmax : max value | |
step : discrete step | |
values : distr values | |
""" | |
self.xmin = xmin | |
self.xmax = xmax | |
self.step = step | |
self.distr = values | |
if (len(self.distr) == 1): | |
self.distr = self.distr[0] | |
numSteps = int((self.xmax - self.xmin) / self.step) | |
#print("{:.3f} {:.3f} {:.3f} {}".format(self.xmin, self.xmax, self.step, numSteps)) | |
assert len(self.distr) == numSteps + 1, "invalid number of distr values expected {}".format(numSteps + 1) | |
self.ximin = 0 | |
self.ximax = numSteps | |
self.pmax = float(max(self.distr)) | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = None | |
while not done: | |
xi = randint(self.ximin, self.ximax) | |
#print(formatAny(xi, "xi")) | |
ps = randomFloat(0.0, self.pmax) | |
pa = self.distr[xi] | |
if ps < pa: | |
samp = self.xmin + xi * self.step | |
done = True | |
return samp | |
class TriangularRejectSampler: | |
""" | |
non parametric sampling using triangular distribution based on rejection sampling | |
""" | |
def __init__(self, xmin, xmax, vertexValue, vertexPos=None): | |
""" | |
initializer | |
Parameters | |
xmin : min value | |
xmax : max value | |
vertexValue : distr value at vertex | |
vertexPos : vertex pposition | |
""" | |
self.xmin = xmin | |
self.xmax = xmax | |
self.vertexValue = vertexValue | |
if vertexPos: | |
assert vertexPos > xmin and vertexPos < xmax, "vertex position outside bound" | |
self.vertexPos = vertexPos | |
else: | |
self.vertexPos = 0.5 * (xmin + xmax) | |
self.s1 = vertexValue / (self.vertexPos - xmin) | |
self.s2 = vertexValue / (xmax - self.vertexPos) | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = None | |
while not done: | |
x = randomFloat(self.xmin, self.xmax) | |
y = randomFloat(0.0, self.vertexValue) | |
f = (x - self.xmin) * self.s1 if x < self.vertexPos else (self.xmax - x) * self.s2 | |
if (y < f): | |
done = True | |
samp = x | |
return samp; | |
class NonParamRejectSampler: | |
""" | |
non parametric sampling using given distribution based on rejection sampling | |
""" | |
def __init__(self, xmin, binWidth, *values): | |
""" | |
initializer | |
Parameters | |
xmin : min value | |
binWidth : bin width | |
values : distr values | |
""" | |
self.values = values | |
if (len(self.values) == 1): | |
self.values = self.values[0] | |
self.xmin = xmin | |
self.xmax = xmin + binWidth * (len(self.values) - 1) | |
#print(self.xmin, self.xmax, binWidth) | |
self.binWidth = binWidth | |
self.fmax = 0 | |
for v in self.values: | |
if (v > self.fmax): | |
self.fmax = v | |
self.ymin = 0 | |
self.ymax = self.fmax | |
self.sampleAsInt = True | |
def isNumeric(self): | |
return True | |
def sampleAsFloat(self): | |
self.sampleAsInt = False | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = 0 | |
while not done: | |
if self.sampleAsInt: | |
x = random.randint(self.xmin, self.xmax) | |
y = random.randint(self.ymin, self.ymax) | |
else: | |
x = randomFloat(self.xmin, self.xmax) | |
y = randomFloat(self.ymin, self.ymax) | |
bin = int((x - self.xmin) / self.binWidth) | |
f = self.values[bin] | |
if (y < f): | |
done = True | |
samp = x | |
return samp | |
class JointNonParamRejectSampler: | |
""" | |
non parametric sampling using given distribution based on rejection sampling | |
""" | |
def __init__(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values): | |
""" | |
initializer | |
Parameters | |
xmin : min value for x | |
xbinWidth : bin width for x | |
xnbin : no of bins for x | |
ymin : min value for y | |
ybinWidth : bin width for y | |
ynbin : no of bins for y | |
values : distr values | |
""" | |
self.values = values | |
if (len(self.values) == 1): | |
self.values = self.values[0] | |
assert len(self.values) == xnbin * ynbin, "wrong number of values for joint distr" | |
self.xmin = xmin | |
self.xmax = xmin + xbinWidth * xnbin | |
self.xbinWidth = xbinWidth | |
self.ymin = ymin | |
self.ymax = ymin + ybinWidth * ynbin | |
self.ybinWidth = ybinWidth | |
self.pmax = max(self.values) | |
self.values = np.array(self.values).reshape(xnbin, ynbin) | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = 0 | |
while not done: | |
x = randomFloat(self.xmin, self.xmax) | |
y = randomFloat(self.ymin, self.ymax) | |
xbin = int((x - self.xmin) / self.xbinWidth) | |
ybin = int((y - self.ymin) / self.ybinWidth) | |
ap = self.values[xbin][ybin] | |
sp = randomFloat(0.0, self.pmax) | |
if (sp < ap): | |
done = True | |
samp = [x,y] | |
return samp | |
class JointNormalSampler: | |
""" | |
joint normal sampler | |
""" | |
def __init__(self, *values): | |
""" | |
initializer | |
Parameters | |
values : 2 mean values followed by 4 values for covar matrix | |
""" | |
lvalues = list(values) | |
assert len(lvalues) == 6, "incorrect number of arguments for joint normal sampler" | |
mean = lvalues[:2] | |
self.mean = np.array(mean) | |
sd = lvalues[2:] | |
self.sd = np.array(sd).reshape(2,2) | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
return list(np.random.multivariate_normal(self.mean, self.sd)) | |
class MultiVarNormalSampler: | |
""" | |
muti variate normal sampler | |
""" | |
def __init__(self, numVar, *values): | |
""" | |
initializer | |
Parameters | |
numVar : no of variables | |
values : numVar mean values followed by numVar x numVar values for covar matrix | |
""" | |
lvalues = list(values) | |
assert len(lvalues) == numVar + numVar * numVar, "incorrect number of arguments for multi var normal sampler" | |
mean = lvalues[:numVar] | |
self.mean = np.array(mean) | |
sd = lvalues[numVar:] | |
self.sd = np.array(sd).reshape(numVar,numVar) | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
return list(np.random.multivariate_normal(self.mean, self.sd)) | |
class CategoricalRejectSampler: | |
""" | |
non parametric sampling for categorical attributes using given distribution based | |
on rejection sampling | |
""" | |
def __init__(self, *values): | |
""" | |
initializer | |
Parameters | |
values : list of tuples which contains a categorical value and the corresponsding distr value | |
""" | |
self.distr = values | |
if (len(self.distr) == 1): | |
self.distr = self.distr[0] | |
maxv = 0 | |
for t in self.distr: | |
if t[1] > maxv: | |
maxv = t[1] | |
self.maxv = maxv | |
def sample(self): | |
""" | |
samples value | |
""" | |
done = False | |
samp = "" | |
while not done: | |
t = self.distr[randint(0, len(self.distr)-1)] | |
d = randomFloat(0, self.maxv) | |
if (d <= t[1]): | |
done = True | |
samp = t[0] | |
return samp | |
class CategoricalSetSampler: | |
""" | |
non parametric sampling for categorical attributes using uniform distribution based for | |
sampling a set of values from all values | |
""" | |
def __init__(self, *values): | |
""" | |
initializer | |
Parameters | |
values : list which contains a categorical values | |
""" | |
self.values = values | |
if (len(self.values) == 1): | |
self.values = self.values[0] | |
self.sampled = list() | |
def sample(self): | |
""" | |
samples value only from previously unsamopled | |
""" | |
samp = selectRandomFromList(self.values) | |
while True: | |
if samp in self.sampled: | |
samp = selectRandomFromList(self.values) | |
else: | |
self.sampled.append(samp) | |
break | |
return samp | |
def setSampled(self, sampled): | |
""" | |
set already sampled | |
Parameters | |
sampled : already sampled list | |
""" | |
self.sampled = sampled | |
def unsample(self, sample=None): | |
""" | |
rempve from sample history | |
Parameters | |
sample : sample to be removed | |
""" | |
if sample is None: | |
self.sampled.clear() | |
else: | |
self.sampled.remove(sample) | |
class DistrMixtureSampler: | |
""" | |
distr mixture sampler | |
""" | |
def __init__(self, mixtureWtDistr, *compDistr): | |
""" | |
initializer | |
Parameters | |
mixtureWtDistr : sampler that returns index into sampler list | |
compDistr : sampler list | |
""" | |
self.mixtureWtDistr = mixtureWtDistr | |
self.compDistr = compDistr | |
if (len(self.compDistr) == 1): | |
self.compDistr = self.compDistr[0] | |
def isNumeric(self): | |
return True | |
def sample(self): | |
""" | |
samples value | |
""" | |
comp = self.mixtureWtDistr.sample() | |
#sample sampled comp distr | |
return self.compDistr[comp].sample() | |
class AncestralSampler: | |
""" | |
ancestral sampler using conditional distribution | |
""" | |
def __init__(self, parentDistr, childDistr, numChildren): | |
""" | |
initializer | |
Parameters | |
parentDistr : parent distr | |
childDistr : childdren distribution dictionary | |
numChildren : no of children | |
""" | |
self.parentDistr = parentDistr | |
self.childDistr = childDistr | |
self.numChildren = numChildren | |
def sample(self): | |
""" | |
samples value | |
""" | |
parent = self.parentDistr.sample() | |
#sample all children conditioned on parent | |
children = [] | |
for i in range(self.numChildren): | |
key = (parent, i) | |
child = self.childDistr[key].sample() | |
children.append(child) | |
return (parent, children) | |
class ClusterSampler: | |
""" | |
sample cluster and then sample member of sampled cluster | |
""" | |
def __init__(self, clusters, *clustDistr): | |
""" | |
initializer | |
Parameters | |
clusters : dictionary clusters | |
clustDistr : distr for clusters | |
""" | |
self.sampler = CategoricalRejectSampler(*clustDistr) | |
self.clusters = clusters | |
def sample(self): | |
""" | |
samples value | |
""" | |
cluster = self.sampler.sample() | |
member = random.choice(self.clusters[cluster]) | |
return (cluster, member) | |
class MetropolitanSampler: | |
""" | |
metropolitan sampler | |
""" | |
def __init__(self, propStdDev, min, binWidth, values): | |
""" | |
initializer | |
Parameters | |
propStdDev : proposal distr std dev | |
min : min domain value for target distr | |
binWidth : bin width | |
values : target distr values | |
""" | |
self.targetDistr = Histogram.createInitialized(min, binWidth, values) | |
self.propsalDistr = GaussianRejectSampler(0, propStdDev) | |
self.proposalMixture = False | |
# bootstrap sample | |
(minv, maxv) = self.targetDistr.getMinMax() | |
self.curSample = random.randint(minv, maxv) | |
self.curDistr = self.targetDistr.value(self.curSample) | |
self.transCount = 0 | |
def initialize(self): | |
""" | |
initialize | |
""" | |
(minv, maxv) = self.targetDistr.getMinMax() | |
self.curSample = random.randint(minv, maxv) | |
self.curDistr = self.targetDistr.value(self.curSample) | |
self.transCount = 0 | |
def setProposalDistr(self, propsalDistr): | |
""" | |
set custom proposal distribution | |
Parameters | |
propsalDistr : proposal distribution | |
""" | |
self.propsalDistr = propsalDistr | |
def setGlobalProposalDistr(self, globPropStdDev, proposalChoiceThreshold): | |
""" | |
set custom proposal distribution | |
Parameters | |
globPropStdDev : global proposal distr std deviation | |
proposalChoiceThreshold : threshold for using global proposal distribution | |
""" | |
self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev) | |
self.proposalChoiceThreshold = proposalChoiceThreshold | |
self.proposalMixture = True | |
def sample(self): | |
""" | |
samples value | |
""" | |
nextSample = self.proposalSample(1) | |
self.targetSample(nextSample) | |
return self.curSample; | |
def proposalSample(self, skip): | |
""" | |
sample from proposal distribution | |
Parameters | |
skip : no of samples to skip | |
""" | |
for i in range(skip): | |
if not self.proposalMixture: | |
#one proposal distr | |
nextSample = self.curSample + self.propsalDistr.sample() | |
nextSample = self.targetDistr.boundedValue(nextSample) | |
else: | |
#mixture of proposal distr | |
if random.random() < self.proposalChoiceThreshold: | |
nextSample = self.curSample + self.propsalDistr.sample() | |
else: | |
nextSample = self.curSample + self.globalProposalDistr.sample() | |
nextSample = self.targetDistr.boundedValue(nextSample) | |
return nextSample | |
def targetSample(self, nextSample): | |
""" | |
target sample | |
Parameters | |
nextSample : proposal distr sample | |
""" | |
nextDistr = self.targetDistr.value(nextSample) | |
transition = False | |
if nextDistr > self.curDistr: | |
transition = True | |
else: | |
distrRatio = float(nextDistr) / self.curDistr | |
if random.random() < distrRatio: | |
transition = True | |
if transition: | |
self.curSample = nextSample | |
self.curDistr = nextDistr | |
self.transCount += 1 | |
def subSample(self, skip): | |
""" | |
sub sample | |
Parameters | |
skip : no of samples to skip | |
""" | |
nextSample = self.proposalSample(skip) | |
self.targetSample(nextSample) | |
return self.curSample; | |
def setMixtureProposal(self, globPropStdDev, mixtureThreshold): | |
""" | |
mixture proposal | |
Parameters | |
globPropStdDev : global proposal distr std deviation | |
mixtureThreshold : threshold for using global proposal distribution | |
""" | |
self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev) | |
self.mixtureThreshold = mixtureThreshold | |
def samplePropsal(self): | |
""" | |
sample from proposal distr | |
""" | |
if self.globalPropsalDistr is None: | |
proposal = self.propsalDistr.sample() | |
else: | |
if random.random() < self.mixtureThreshold: | |
proposal = self.propsalDistr.sample() | |
else: | |
proposal = self.globalProposalDistr.sample() | |
return proposal | |
class PermutationSampler: | |
""" | |
permutation sampler by shuffling a list | |
""" | |
def __init__(self): | |
""" | |
initialize | |
""" | |
self.values = None | |
self.numShuffles = None | |
def createSamplerWithValues(values, *numShuffles): | |
""" | |
creator with values | |
Parameters | |
values : list data | |
numShuffles : no of shuffles or range of no of shuffles | |
""" | |
sampler = PermutationSampler() | |
sampler.values = values | |
sampler.numShuffles = numShuffles | |
return sampler | |
def createSamplerWithRange(minv, maxv, *numShuffles): | |
""" | |
creator with ramge min and max | |
Parameters | |
minv : min of range | |
maxv : max of range | |
numShuffles : no of shuffles or range of no of shuffles | |
""" | |
sampler = PermutationSampler() | |
sampler.values = list(range(minv, maxv + 1)) | |
sampler.numShuffles = numShuffles | |
return sampler | |
def sample(self): | |
""" | |
sample new permutation | |
""" | |
cloned = self.values.copy() | |
shuffle(cloned, *self.numShuffles) | |
return cloned | |
class SpikeyDataSampler: | |
""" | |
samples spikey data | |
""" | |
def __init__(self, intvMean, intvScale, distr, spikeValueMean, spikeValueStd, spikeMaxDuration, baseValue = 0): | |
""" | |
initializer | |
Parameters | |
intvMean : interval mean | |
intvScale : interval std dev | |
distr : type of distr for interval | |
spikeValueMean : spike value mean | |
spikeValueStd : spike value std dev | |
spikeMaxDuration : max duration for spike | |
baseValue : base or offset value | |
""" | |
if distr == "norm": | |
self.intvSampler = NormalSampler(intvMean, intvScale) | |
elif distr == "expo": | |
rate = 1.0 / intvScale | |
self.intvSampler = ExponentialSampler(rate) | |
else: | |
raise ValueError("invalid distribution") | |
self.spikeSampler = NormalSampler(spikeValueMean, spikeValueStd) | |
self.spikeMaxDuration = spikeMaxDuration | |
self.baseValue = baseValue | |
self.inSpike = False | |
self.spikeCount = 0 | |
self.baseCount = 0 | |
self.baseLength = int(self.intvSampler.sample()) | |
self.spikeValues = list() | |
self.spikeLength = None | |
def sample(self): | |
""" | |
sample new value | |
""" | |
if self.baseCount <= self.baseLength: | |
sampled = self.baseValue | |
self.baseCount += 1 | |
else: | |
if not self.inSpike: | |
#starting spike | |
spikeVal = self.spikeSampler.sample() | |
self.spikeLength = sampleUniform(1, self.spikeMaxDuration) | |
spikeMaxPos = 0 if self.spikeLength == 1 else sampleUniform(0, self.spikeLength-1) | |
self.spikeValues.clear() | |
for i in range(self.spikeLength): | |
if i < spikeMaxPos: | |
frac = (i + 1) / (spikeMaxPos + 1) | |
frac = sampleFloatFromBase(frac, 0.1 * frac) | |
elif i > spikeMaxPos: | |
frac = (self.spikeLength - i) / (self.spikeLength - spikeMaxPos) | |
frac = sampleFloatFromBase(frac, 0.1 * frac) | |
else: | |
frac = 1.0 | |
self.spikeValues.append(frac * spikeVal) | |
self.inSpike = True | |
self.spikeCount = 0 | |
sampled = self.spikeValues[self.spikeCount] | |
self.spikeCount += 1 | |
if self.spikeCount == self.spikeLength: | |
#ending spike | |
self.baseCount = 0 | |
self.baseLength = int(self.intvSampler.sample()) | |
self.inSpike = False | |
return sampled | |
class EventSampler: | |
""" | |
sample event | |
""" | |
def __init__(self, intvSampler, valSampler=None): | |
""" | |
initializer | |
Parameters | |
intvSampler : interval sampler | |
valSampler : value sampler | |
""" | |
self.intvSampler = intvSampler | |
self.valSampler = valSampler | |
self.trigger = int(self.intvSampler.sample()) | |
self.count = 0 | |
def reset(self): | |
""" | |
reset trigger | |
""" | |
self.trigger = int(self.intvSampler.sample()) | |
self.count = 0 | |
def sample(self): | |
""" | |
sample event | |
""" | |
if self.count == self.trigger: | |
sampled = self.valSampler.sample() if self.valSampler is not None else 1.0 | |
self.trigger = int(self.intvSampler.sample()) | |
self.count = 0 | |
else: | |
sample = 0.0 | |
self.count += 1 | |
return sampled | |
def createSampler(data): | |
""" | |
create sampler | |
Parameters | |
data : sampler description | |
""" | |
#print(data) | |
items = data.split(":") | |
size = len(items) | |
dtype = items[-1] | |
stype = items[-2] | |
#print("sampler data {}".format(data)) | |
#print("sampler {}".format(stype)) | |
sampler = None | |
if stype == "uniform": | |
if dtype == "int": | |
min = int(items[0]) | |
max = int(items[1]) | |
sampler = UniformNumericSampler(min, max) | |
elif dtype == "float": | |
min = float(items[0]) | |
max = float(items[1]) | |
sampler = UniformNumericSampler(min, max) | |
elif dtype == "categorical": | |
values = items[:-2] | |
sampler = UniformCategoricalSampler(values) | |
elif stype == "normal": | |
mean = float(items[0]) | |
sd = float(items[1]) | |
sampler = NormalSampler(mean, sd) | |
if dtype == "int": | |
sampler.sampleAsIntValue() | |
elif stype == "nonparam": | |
if dtype == "int" or dtype == "float": | |
min = int(items[0]) | |
binWidth = int(items[1]) | |
values = items[2:-2] | |
values = list(map(lambda v: int(v), values)) | |
sampler = NonParamRejectSampler(min, binWidth, values) | |
if dtype == "float": | |
sampler.sampleAsFloat() | |
elif dtype == "categorical": | |
values = list() | |
for i in range(0, size-2, 2): | |
cval = items[i] | |
dist = int(items[i+1]) | |
pair = (cval, dist) | |
values.append(pair) | |
sampler = CategoricalRejectSampler(values) | |
elif dtype == "scategorical": | |
vfpath = items[0] | |
values = getFileLines(vfpath, None) | |
sampler = CategoricalSetSampler(values) | |
elif stype == "discrete": | |
vmin = int(items[0]) | |
vmax = int(items[1]) | |
step = int(items[2]) | |
values = list(map(lambda i : int(items[i]), range(3, len(items)-2))) | |
sampler = DiscreteRejectSampler(vmin, vmax, step, values) | |
elif stype == "bernauli": | |
pr = float(items[0]) | |
events = None | |
if len(items) == 5: | |
events = list() | |
if dtype == "int": | |
events.append(int(items[1])) | |
events.append(int(items[2])) | |
elif dtype == "categorical": | |
events.append(items[1]) | |
events.append(items[2]) | |
sampler = BernoulliTrialSampler(pr, events) | |
else: | |
raise ValueError("invalid sampler type " + stype) | |
return sampler | |