Priyanka-Kumavat-At-TE's picture
Upload 7 files
4610f7a
raw
history blame
30.7 kB
#!/usr/local/bin/python3
# avenir-python: Machine Learning
# Author: Pranab Ghosh
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import sys
import random
import time
import math
import random
import numpy as np
from scipy import stats
from random import randint
from .util import *
from .stats import Histogram
def randomFloat(low, high):
"""
sample float within range
Parameters
low : low valuee
high : high valuee
"""
return random.random() * (high-low) + low
def randomInt(minv, maxv):
"""
sample int within range
Parameters
minv : low valuee
maxv : high valuee
"""
return randint(minv, maxv)
def randIndex(lData):
"""
random index of a list
Parameters
lData : list data
"""
return randint(0, len(lData)-1)
def randomUniformSampled(low, high):
"""
sample float within range
Parameters
low : low value
high : high value
"""
return np.random.uniform(low, high)
def randomUniformSampledList(low, high, size):
"""
sample floats within range to create list
Parameters
low : low value
high : high value
size ; size of list to be returned
"""
return np.random.uniform(low, high, size)
def randomNormSampled(mean, sd):
"""
sample float from normal
Parameters
mean : mean
sd : std deviation
"""
return np.random.normal(mean, sd)
def randomNormSampledList(mean, sd, size):
"""
sample float list from normal
Parameters
mean : mean
sd : std deviation
size : size of list to be returned
"""
return np.random.normal(mean, sd, size)
def randomSampledList(sampler, size):
"""
sample list from given sampler
Parameters
sampler : sampler object
size : size of list to be returned
"""
return list(map(lambda i : sampler.sample(), range(size)))
def minLimit(val, minv):
"""
min limit
Parameters
val : value
minv : min limit
"""
if (val < minv):
val = minv
return val
def rangeLimit(val, minv, maxv):
"""
range limit
Parameters
val : value
minv : min limit
maxv : max limit
"""
if (val < minv):
val = minv
elif (val > maxv):
val = maxv
return val
def sampleUniform(minv, maxv):
"""
sample int within range
Parameters
minv ; int min limit
maxv : int max limit
"""
return randint(minv, maxv)
def sampleFromBase(value, dev):
"""
sample int wrt base
Parameters
value : base value
dev : deviation
"""
return randint(value - dev, value + dev)
def sampleFloatFromBase(value, dev):
"""
sample float wrt base
Parameters
value : base value
dev : deviation
"""
return randomFloat(value - dev, value + dev)
def distrUniformWithRanndom(total, numItems, noiseLevel):
"""
uniformly distribute with some randomness and preserves total
Parameters
total : total count
numItems : no of bins
noiseLevel : noise level fraction
"""
perItem = total / numItems
var = perItem * noiseLevel
items = []
for i in range(numItems):
item = perItem + randomFloat(-var, var)
items.append(item)
#adjust last item
sm = sum(items[:-1])
items[-1] = total - sm
return items
def isEventSampled(threshold, maxv=100):
"""
sample event which occurs if sampled below threshold
Parameters
threshold : threshold for sampling
maxv : maximum values
"""
return randint(0, maxv) < threshold
def sampleBinaryEvents(events, probPercent):
"""
sample binary events
Parameters
events : two events
probPercent : probability as percentage
"""
if (randint(0, 100) < probPercent):
event = events[0]
else:
event = events[1]
return event
def addNoiseNum(value, sampler):
"""
add noise to numeric value
Parameters
value : base value
sampler : sampler for noise
"""
return value * (1 + sampler.sample())
def addNoiseCat(value, values, noise):
"""
add noise to categorical value i.e with some probability change value
Parameters
value : cat value
values : cat values
noise : noise level fraction
"""
newValue = value
threshold = int(noise * 100)
if (isEventSampled(threshold)):
newValue = selectRandomFromList(values)
while newValue == value:
newValue = selectRandomFromList(values)
return newValue
def sampleWithReplace(data, sampSize):
"""
sample with replacement
Parameters
data : array
sampSize : sample size
"""
sampled = list()
le = len(data)
if sampSize is None:
sampSize = le
for i in range(sampSize):
j = random.randint(0, le - 1)
sampled.append(data[j])
return sampled
class CumDistr:
"""
cumulative distr
"""
def __init__(self, data, numBins = None):
"""
initializer
Parameters
data : array
numBins : no of bins
"""
if not numBins:
numBins = int(len(data) / 5)
res = stats.cumfreq(data, numbins=numBins)
self.cdistr = res.cumcount / len(data)
self.loLim = res.lowerlimit
self.upLim = res.lowerlimit + res.binsize * res.cumcount.size
self.binWidth = res.binsize
def getDistr(self, value):
"""
get cumulative distribution
Parameters
value : value
"""
if value <= self.loLim:
d = 0.0
elif value >= self.upLim:
d = 1.0
else:
bin = int((value - self.loLim) / self.binWidth)
d = self.cdistr[bin]
return d
class BernoulliTrialSampler:
"""
bernoulli trial sampler return True or False
"""
def __init__(self, pr, events=None):
"""
initializer
Parameters
pr : probability
events : event values
"""
self.pr = pr
self.retEvent = False if events is None else True
self.events = events
def sample(self):
"""
samples value
"""
res = random.random() < self.pr
if self.retEvent:
res = self.events[0] if res else self.events[1]
return res
class PoissonSampler:
"""
poisson sampler returns number of events
"""
def __init__(self, rateOccur, maxSamp):
"""
initializer
Parameters
rateOccur : rate of occurence
maxSamp : max limit on no of samples
"""
self.rateOccur = rateOccur
self.maxSamp = int(maxSamp)
self.pmax = self.calculatePr(rateOccur)
def calculatePr(self, numOccur):
"""
calulates probability
Parameters
numOccur : no of occurence
"""
p = (self.rateOccur ** numOccur) * math.exp(-self.rateOccur) / math.factorial(numOccur)
return p
def sample(self):
"""
samples value
"""
done = False
samp = 0
while not done:
no = randint(0, self.maxSamp)
sp = randomFloat(0.0, self.pmax)
ap = self.calculatePr(no)
if sp < ap:
done = True
samp = no
return samp
class ExponentialSampler:
"""
returns interval between events
"""
def __init__(self, rateOccur, maxSamp = None):
"""
initializer
Parameters
rateOccur : rate of occurence
maxSamp : max limit on interval
"""
self.interval = 1.0 / rateOccur
self.maxSamp = int(maxSamp) if maxSamp is not None else None
def sample(self):
"""
samples value
"""
sampled = np.random.exponential(scale=self.interval)
if self.maxSamp is not None:
while sampled > self.maxSamp:
sampled = np.random.exponential(scale=self.interval)
return sampled
class UniformNumericSampler:
"""
uniform sampler for numerical values
"""
def __init__(self, minv, maxv):
"""
initializer
Parameters
minv : min value
maxv : max value
"""
self.minv = minv
self.maxv = maxv
def isNumeric(self):
"""
returns true
"""
return True
def sample(self):
"""
samples value
"""
samp = sampleUniform(self.minv, self.maxv) if isinstance(self.minv, int) else randomFloat(self.minv, self.maxv)
return samp
class UniformCategoricalSampler:
"""
uniform sampler for categorical values
"""
def __init__(self, cvalues):
"""
initializer
Parameters
cvalues : categorical value list
"""
self.cvalues = cvalues
def isNumeric(self):
return False
def sample(self):
"""
samples value
"""
return selectRandomFromList(self.cvalues)
class NormalSampler:
"""
normal sampler
"""
def __init__(self, mean, stdDev):
"""
initializer
Parameters
mean : mean
stdDev : std deviation
"""
self.mean = mean
self.stdDev = stdDev
self.sampleAsInt = False
def isNumeric(self):
return True
def sampleAsIntValue(self):
"""
set True to sample as int
"""
self.sampleAsInt = True
def sample(self):
"""
samples value
"""
samp = np.random.normal(self.mean, self.stdDev)
if self.sampleAsInt:
samp = int(samp)
return samp
class LogNormalSampler:
"""
log normal sampler
"""
def __init__(self, mean, stdDev):
"""
initializer
Parameters
mean : mean
stdDev : std deviation
"""
self.mean = mean
self.stdDev = stdDev
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
return np.random.lognormal(self.mean, self.stdDev)
class NormalSamplerWithTrendCycle:
"""
normal sampler with cycle and trend
"""
def __init__(self, mean, stdDev, dmean, cycle, step=1):
"""
initializer
Parameters
mean : mean
stdDev : std deviation
dmean : trend delta
cycle : cycle values wrt base mean
step : adjustment step for cycle and trend
"""
self.mean = mean
self.cmean = mean
self.stdDev = stdDev
self.dmean = dmean
self.cycle = cycle
self.clen = len(cycle) if cycle is not None else 0
self.step = step
self.count = 0
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
s = np.random.normal(self.cmean, self.stdDev)
self.count += 1
if self.count % self.step == 0:
cy = 0
if self.clen > 1:
coff = self.count % self.clen
cy = self.cycle[coff]
tr = self.count * self.dmean
self.cmean = self.mean + tr + cy
return s
class ParetoSampler:
"""
pareto sampler
"""
def __init__(self, mode, shape):
"""
initializer
Parameters
mode : mode
shape : shape
"""
self.mode = mode
self.shape = shape
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
return (np.random.pareto(self.shape) + 1) * self.mode
class GammaSampler:
"""
pareto sampler
"""
def __init__(self, shape, scale):
"""
initializer
Parameters
shape : shape
scale : scale
"""
self.shape = shape
self.scale = scale
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
return np.random.gamma(self.shape, self.scale)
class GaussianRejectSampler:
"""
gaussian sampling based on rejection sampling
"""
def __init__(self, mean, stdDev):
"""
initializer
Parameters
mean : mean
stdDev : std deviation
"""
self.mean = mean
self.stdDev = stdDev
self.xmin = mean - 3 * stdDev
self.xmax = mean + 3 * stdDev
self.ymin = 0.0
self.fmax = 1.0 / (math.sqrt(2.0 * 3.14) * stdDev)
self.ymax = 1.05 * self.fmax
self.sampleAsInt = False
def isNumeric(self):
return True
def sampleAsIntValue(self):
"""
sample as int value
"""
self.sampleAsInt = True
def sample(self):
"""
samples value
"""
done = False
samp = 0
while not done:
x = randomFloat(self.xmin, self.xmax)
y = randomFloat(self.ymin, self.ymax)
f = self.fmax * math.exp(-(x - self.mean) * (x - self.mean) / (2.0 * self.stdDev * self.stdDev))
if (y < f):
done = True
samp = x
if self.sampleAsInt:
samp = int(samp)
return samp
class DiscreteRejectSampler:
"""
non parametric sampling for discrete values using given distribution based
on rejection sampling
"""
def __init__(self, xmin, xmax, step, *values):
"""
initializer
Parameters
xmin : min value
xmax : max value
step : discrete step
values : distr values
"""
self.xmin = xmin
self.xmax = xmax
self.step = step
self.distr = values
if (len(self.distr) == 1):
self.distr = self.distr[0]
numSteps = int((self.xmax - self.xmin) / self.step)
#print("{:.3f} {:.3f} {:.3f} {}".format(self.xmin, self.xmax, self.step, numSteps))
assert len(self.distr) == numSteps + 1, "invalid number of distr values expected {}".format(numSteps + 1)
self.ximin = 0
self.ximax = numSteps
self.pmax = float(max(self.distr))
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
done = False
samp = None
while not done:
xi = randint(self.ximin, self.ximax)
#print(formatAny(xi, "xi"))
ps = randomFloat(0.0, self.pmax)
pa = self.distr[xi]
if ps < pa:
samp = self.xmin + xi * self.step
done = True
return samp
class TriangularRejectSampler:
"""
non parametric sampling using triangular distribution based on rejection sampling
"""
def __init__(self, xmin, xmax, vertexValue, vertexPos=None):
"""
initializer
Parameters
xmin : min value
xmax : max value
vertexValue : distr value at vertex
vertexPos : vertex pposition
"""
self.xmin = xmin
self.xmax = xmax
self.vertexValue = vertexValue
if vertexPos:
assert vertexPos > xmin and vertexPos < xmax, "vertex position outside bound"
self.vertexPos = vertexPos
else:
self.vertexPos = 0.5 * (xmin + xmax)
self.s1 = vertexValue / (self.vertexPos - xmin)
self.s2 = vertexValue / (xmax - self.vertexPos)
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
done = False
samp = None
while not done:
x = randomFloat(self.xmin, self.xmax)
y = randomFloat(0.0, self.vertexValue)
f = (x - self.xmin) * self.s1 if x < self.vertexPos else (self.xmax - x) * self.s2
if (y < f):
done = True
samp = x
return samp;
class NonParamRejectSampler:
"""
non parametric sampling using given distribution based on rejection sampling
"""
def __init__(self, xmin, binWidth, *values):
"""
initializer
Parameters
xmin : min value
binWidth : bin width
values : distr values
"""
self.values = values
if (len(self.values) == 1):
self.values = self.values[0]
self.xmin = xmin
self.xmax = xmin + binWidth * (len(self.values) - 1)
#print(self.xmin, self.xmax, binWidth)
self.binWidth = binWidth
self.fmax = 0
for v in self.values:
if (v > self.fmax):
self.fmax = v
self.ymin = 0
self.ymax = self.fmax
self.sampleAsInt = True
def isNumeric(self):
return True
def sampleAsFloat(self):
self.sampleAsInt = False
def sample(self):
"""
samples value
"""
done = False
samp = 0
while not done:
if self.sampleAsInt:
x = random.randint(self.xmin, self.xmax)
y = random.randint(self.ymin, self.ymax)
else:
x = randomFloat(self.xmin, self.xmax)
y = randomFloat(self.ymin, self.ymax)
bin = int((x - self.xmin) / self.binWidth)
f = self.values[bin]
if (y < f):
done = True
samp = x
return samp
class JointNonParamRejectSampler:
"""
non parametric sampling using given distribution based on rejection sampling
"""
def __init__(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values):
"""
initializer
Parameters
xmin : min value for x
xbinWidth : bin width for x
xnbin : no of bins for x
ymin : min value for y
ybinWidth : bin width for y
ynbin : no of bins for y
values : distr values
"""
self.values = values
if (len(self.values) == 1):
self.values = self.values[0]
assert len(self.values) == xnbin * ynbin, "wrong number of values for joint distr"
self.xmin = xmin
self.xmax = xmin + xbinWidth * xnbin
self.xbinWidth = xbinWidth
self.ymin = ymin
self.ymax = ymin + ybinWidth * ynbin
self.ybinWidth = ybinWidth
self.pmax = max(self.values)
self.values = np.array(self.values).reshape(xnbin, ynbin)
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
done = False
samp = 0
while not done:
x = randomFloat(self.xmin, self.xmax)
y = randomFloat(self.ymin, self.ymax)
xbin = int((x - self.xmin) / self.xbinWidth)
ybin = int((y - self.ymin) / self.ybinWidth)
ap = self.values[xbin][ybin]
sp = randomFloat(0.0, self.pmax)
if (sp < ap):
done = True
samp = [x,y]
return samp
class JointNormalSampler:
"""
joint normal sampler
"""
def __init__(self, *values):
"""
initializer
Parameters
values : 2 mean values followed by 4 values for covar matrix
"""
lvalues = list(values)
assert len(lvalues) == 6, "incorrect number of arguments for joint normal sampler"
mean = lvalues[:2]
self.mean = np.array(mean)
sd = lvalues[2:]
self.sd = np.array(sd).reshape(2,2)
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
return list(np.random.multivariate_normal(self.mean, self.sd))
class MultiVarNormalSampler:
"""
muti variate normal sampler
"""
def __init__(self, numVar, *values):
"""
initializer
Parameters
numVar : no of variables
values : numVar mean values followed by numVar x numVar values for covar matrix
"""
lvalues = list(values)
assert len(lvalues) == numVar + numVar * numVar, "incorrect number of arguments for multi var normal sampler"
mean = lvalues[:numVar]
self.mean = np.array(mean)
sd = lvalues[numVar:]
self.sd = np.array(sd).reshape(numVar,numVar)
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
return list(np.random.multivariate_normal(self.mean, self.sd))
class CategoricalRejectSampler:
"""
non parametric sampling for categorical attributes using given distribution based
on rejection sampling
"""
def __init__(self, *values):
"""
initializer
Parameters
values : list of tuples which contains a categorical value and the corresponsding distr value
"""
self.distr = values
if (len(self.distr) == 1):
self.distr = self.distr[0]
maxv = 0
for t in self.distr:
if t[1] > maxv:
maxv = t[1]
self.maxv = maxv
def sample(self):
"""
samples value
"""
done = False
samp = ""
while not done:
t = self.distr[randint(0, len(self.distr)-1)]
d = randomFloat(0, self.maxv)
if (d <= t[1]):
done = True
samp = t[0]
return samp
class CategoricalSetSampler:
"""
non parametric sampling for categorical attributes using uniform distribution based for
sampling a set of values from all values
"""
def __init__(self, *values):
"""
initializer
Parameters
values : list which contains a categorical values
"""
self.values = values
if (len(self.values) == 1):
self.values = self.values[0]
self.sampled = list()
def sample(self):
"""
samples value only from previously unsamopled
"""
samp = selectRandomFromList(self.values)
while True:
if samp in self.sampled:
samp = selectRandomFromList(self.values)
else:
self.sampled.append(samp)
break
return samp
def setSampled(self, sampled):
"""
set already sampled
Parameters
sampled : already sampled list
"""
self.sampled = sampled
def unsample(self, sample=None):
"""
rempve from sample history
Parameters
sample : sample to be removed
"""
if sample is None:
self.sampled.clear()
else:
self.sampled.remove(sample)
class DistrMixtureSampler:
"""
distr mixture sampler
"""
def __init__(self, mixtureWtDistr, *compDistr):
"""
initializer
Parameters
mixtureWtDistr : sampler that returns index into sampler list
compDistr : sampler list
"""
self.mixtureWtDistr = mixtureWtDistr
self.compDistr = compDistr
if (len(self.compDistr) == 1):
self.compDistr = self.compDistr[0]
def isNumeric(self):
return True
def sample(self):
"""
samples value
"""
comp = self.mixtureWtDistr.sample()
#sample sampled comp distr
return self.compDistr[comp].sample()
class AncestralSampler:
"""
ancestral sampler using conditional distribution
"""
def __init__(self, parentDistr, childDistr, numChildren):
"""
initializer
Parameters
parentDistr : parent distr
childDistr : childdren distribution dictionary
numChildren : no of children
"""
self.parentDistr = parentDistr
self.childDistr = childDistr
self.numChildren = numChildren
def sample(self):
"""
samples value
"""
parent = self.parentDistr.sample()
#sample all children conditioned on parent
children = []
for i in range(self.numChildren):
key = (parent, i)
child = self.childDistr[key].sample()
children.append(child)
return (parent, children)
class ClusterSampler:
"""
sample cluster and then sample member of sampled cluster
"""
def __init__(self, clusters, *clustDistr):
"""
initializer
Parameters
clusters : dictionary clusters
clustDistr : distr for clusters
"""
self.sampler = CategoricalRejectSampler(*clustDistr)
self.clusters = clusters
def sample(self):
"""
samples value
"""
cluster = self.sampler.sample()
member = random.choice(self.clusters[cluster])
return (cluster, member)
class MetropolitanSampler:
"""
metropolitan sampler
"""
def __init__(self, propStdDev, min, binWidth, values):
"""
initializer
Parameters
propStdDev : proposal distr std dev
min : min domain value for target distr
binWidth : bin width
values : target distr values
"""
self.targetDistr = Histogram.createInitialized(min, binWidth, values)
self.propsalDistr = GaussianRejectSampler(0, propStdDev)
self.proposalMixture = False
# bootstrap sample
(minv, maxv) = self.targetDistr.getMinMax()
self.curSample = random.randint(minv, maxv)
self.curDistr = self.targetDistr.value(self.curSample)
self.transCount = 0
def initialize(self):
"""
initialize
"""
(minv, maxv) = self.targetDistr.getMinMax()
self.curSample = random.randint(minv, maxv)
self.curDistr = self.targetDistr.value(self.curSample)
self.transCount = 0
def setProposalDistr(self, propsalDistr):
"""
set custom proposal distribution
Parameters
propsalDistr : proposal distribution
"""
self.propsalDistr = propsalDistr
def setGlobalProposalDistr(self, globPropStdDev, proposalChoiceThreshold):
"""
set custom proposal distribution
Parameters
globPropStdDev : global proposal distr std deviation
proposalChoiceThreshold : threshold for using global proposal distribution
"""
self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev)
self.proposalChoiceThreshold = proposalChoiceThreshold
self.proposalMixture = True
def sample(self):
"""
samples value
"""
nextSample = self.proposalSample(1)
self.targetSample(nextSample)
return self.curSample;
def proposalSample(self, skip):
"""
sample from proposal distribution
Parameters
skip : no of samples to skip
"""
for i in range(skip):
if not self.proposalMixture:
#one proposal distr
nextSample = self.curSample + self.propsalDistr.sample()
nextSample = self.targetDistr.boundedValue(nextSample)
else:
#mixture of proposal distr
if random.random() < self.proposalChoiceThreshold:
nextSample = self.curSample + self.propsalDistr.sample()
else:
nextSample = self.curSample + self.globalProposalDistr.sample()
nextSample = self.targetDistr.boundedValue(nextSample)
return nextSample
def targetSample(self, nextSample):
"""
target sample
Parameters
nextSample : proposal distr sample
"""
nextDistr = self.targetDistr.value(nextSample)
transition = False
if nextDistr > self.curDistr:
transition = True
else:
distrRatio = float(nextDistr) / self.curDistr
if random.random() < distrRatio:
transition = True
if transition:
self.curSample = nextSample
self.curDistr = nextDistr
self.transCount += 1
def subSample(self, skip):
"""
sub sample
Parameters
skip : no of samples to skip
"""
nextSample = self.proposalSample(skip)
self.targetSample(nextSample)
return self.curSample;
def setMixtureProposal(self, globPropStdDev, mixtureThreshold):
"""
mixture proposal
Parameters
globPropStdDev : global proposal distr std deviation
mixtureThreshold : threshold for using global proposal distribution
"""
self.globalProposalDistr = GaussianRejectSampler(0, globPropStdDev)
self.mixtureThreshold = mixtureThreshold
def samplePropsal(self):
"""
sample from proposal distr
"""
if self.globalPropsalDistr is None:
proposal = self.propsalDistr.sample()
else:
if random.random() < self.mixtureThreshold:
proposal = self.propsalDistr.sample()
else:
proposal = self.globalProposalDistr.sample()
return proposal
class PermutationSampler:
"""
permutation sampler by shuffling a list
"""
def __init__(self):
"""
initialize
"""
self.values = None
self.numShuffles = None
@staticmethod
def createSamplerWithValues(values, *numShuffles):
"""
creator with values
Parameters
values : list data
numShuffles : no of shuffles or range of no of shuffles
"""
sampler = PermutationSampler()
sampler.values = values
sampler.numShuffles = numShuffles
return sampler
@staticmethod
def createSamplerWithRange(minv, maxv, *numShuffles):
"""
creator with ramge min and max
Parameters
minv : min of range
maxv : max of range
numShuffles : no of shuffles or range of no of shuffles
"""
sampler = PermutationSampler()
sampler.values = list(range(minv, maxv + 1))
sampler.numShuffles = numShuffles
return sampler
def sample(self):
"""
sample new permutation
"""
cloned = self.values.copy()
shuffle(cloned, *self.numShuffles)
return cloned
class SpikeyDataSampler:
"""
samples spikey data
"""
def __init__(self, intvMean, intvScale, distr, spikeValueMean, spikeValueStd, spikeMaxDuration, baseValue = 0):
"""
initializer
Parameters
intvMean : interval mean
intvScale : interval std dev
distr : type of distr for interval
spikeValueMean : spike value mean
spikeValueStd : spike value std dev
spikeMaxDuration : max duration for spike
baseValue : base or offset value
"""
if distr == "norm":
self.intvSampler = NormalSampler(intvMean, intvScale)
elif distr == "expo":
rate = 1.0 / intvScale
self.intvSampler = ExponentialSampler(rate)
else:
raise ValueError("invalid distribution")
self.spikeSampler = NormalSampler(spikeValueMean, spikeValueStd)
self.spikeMaxDuration = spikeMaxDuration
self.baseValue = baseValue
self.inSpike = False
self.spikeCount = 0
self.baseCount = 0
self.baseLength = int(self.intvSampler.sample())
self.spikeValues = list()
self.spikeLength = None
def sample(self):
"""
sample new value
"""
if self.baseCount <= self.baseLength:
sampled = self.baseValue
self.baseCount += 1
else:
if not self.inSpike:
#starting spike
spikeVal = self.spikeSampler.sample()
self.spikeLength = sampleUniform(1, self.spikeMaxDuration)
spikeMaxPos = 0 if self.spikeLength == 1 else sampleUniform(0, self.spikeLength-1)
self.spikeValues.clear()
for i in range(self.spikeLength):
if i < spikeMaxPos:
frac = (i + 1) / (spikeMaxPos + 1)
frac = sampleFloatFromBase(frac, 0.1 * frac)
elif i > spikeMaxPos:
frac = (self.spikeLength - i) / (self.spikeLength - spikeMaxPos)
frac = sampleFloatFromBase(frac, 0.1 * frac)
else:
frac = 1.0
self.spikeValues.append(frac * spikeVal)
self.inSpike = True
self.spikeCount = 0
sampled = self.spikeValues[self.spikeCount]
self.spikeCount += 1
if self.spikeCount == self.spikeLength:
#ending spike
self.baseCount = 0
self.baseLength = int(self.intvSampler.sample())
self.inSpike = False
return sampled
class EventSampler:
"""
sample event
"""
def __init__(self, intvSampler, valSampler=None):
"""
initializer
Parameters
intvSampler : interval sampler
valSampler : value sampler
"""
self.intvSampler = intvSampler
self.valSampler = valSampler
self.trigger = int(self.intvSampler.sample())
self.count = 0
def reset(self):
"""
reset trigger
"""
self.trigger = int(self.intvSampler.sample())
self.count = 0
def sample(self):
"""
sample event
"""
if self.count == self.trigger:
sampled = self.valSampler.sample() if self.valSampler is not None else 1.0
self.trigger = int(self.intvSampler.sample())
self.count = 0
else:
sample = 0.0
self.count += 1
return sampled
def createSampler(data):
"""
create sampler
Parameters
data : sampler description
"""
#print(data)
items = data.split(":")
size = len(items)
dtype = items[-1]
stype = items[-2]
#print("sampler data {}".format(data))
#print("sampler {}".format(stype))
sampler = None
if stype == "uniform":
if dtype == "int":
min = int(items[0])
max = int(items[1])
sampler = UniformNumericSampler(min, max)
elif dtype == "float":
min = float(items[0])
max = float(items[1])
sampler = UniformNumericSampler(min, max)
elif dtype == "categorical":
values = items[:-2]
sampler = UniformCategoricalSampler(values)
elif stype == "normal":
mean = float(items[0])
sd = float(items[1])
sampler = NormalSampler(mean, sd)
if dtype == "int":
sampler.sampleAsIntValue()
elif stype == "nonparam":
if dtype == "int" or dtype == "float":
min = int(items[0])
binWidth = int(items[1])
values = items[2:-2]
values = list(map(lambda v: int(v), values))
sampler = NonParamRejectSampler(min, binWidth, values)
if dtype == "float":
sampler.sampleAsFloat()
elif dtype == "categorical":
values = list()
for i in range(0, size-2, 2):
cval = items[i]
dist = int(items[i+1])
pair = (cval, dist)
values.append(pair)
sampler = CategoricalRejectSampler(values)
elif dtype == "scategorical":
vfpath = items[0]
values = getFileLines(vfpath, None)
sampler = CategoricalSetSampler(values)
elif stype == "discrete":
vmin = int(items[0])
vmax = int(items[1])
step = int(items[2])
values = list(map(lambda i : int(items[i]), range(3, len(items)-2)))
sampler = DiscreteRejectSampler(vmin, vmax, step, values)
elif stype == "bernauli":
pr = float(items[0])
events = None
if len(items) == 5:
events = list()
if dtype == "int":
events.append(int(items[1]))
events.append(int(items[2]))
elif dtype == "categorical":
events.append(items[1])
events.append(items[2])
sampler = BernoulliTrialSampler(pr, events)
else:
raise ValueError("invalid sampler type " + stype)
return sampler