Spaces:
Runtime error
Runtime error
#!/usr/local/bin/python3 | |
# avenir-python: Machine Learning | |
# Author: Pranab Ghosh | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you | |
# may not use this file except in compliance with the License. You may | |
# obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. See the License for the specific language governing | |
# permissions and limitations under the License. | |
# Package imports | |
import os | |
import sys | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import sklearn as sk | |
from sklearn.neighbors import KDTree | |
import matplotlib | |
import random | |
import jprops | |
from random import randint | |
import statistics | |
sys.path.append(os.path.abspath("../lib")) | |
from util import * | |
from mlutil import * | |
from tnn import * | |
from stats import * | |
""" | |
neural model calibration | |
""" | |
class ModelCalibration(object): | |
def __init__(self): | |
pass | |
def findModelCalibration(model): | |
""" | |
pmodel calibration | |
""" | |
FeedForwardNetwork.prepValidate(model) | |
FeedForwardNetwork.validateModel(model) | |
yPred = model.yPred.flatten() | |
yActual = model.validOutData.flatten() | |
nsamp = len(yActual) | |
#print(yPred.shape) | |
#print(yActual.shape) | |
nBins = model.config.getIntConfig("calibrate.num.bins")[0] | |
prThreshhold = model.config.getFloatConfig("calibrate.pred.prob.thresh")[0] | |
minConf = yPred.min() | |
maxConf = yPred.max() | |
bsize = (maxConf - minConf) / nBins | |
#print("minConf {:.3f} maxConf {:.3f} bsize {:.3f}".format(minConf, maxConf, bsize)) | |
blist = list(map(lambda i : None, range(nBins))) | |
#binning | |
for yp, ya in zip(yPred, yActual): | |
indx = int((yp - minConf) / bsize) | |
if indx == nBins: | |
indx = nBins - 1 | |
#print("yp {:.3f} indx {}".format(yp, indx)) | |
pair = (yp, ya) | |
plist = blist[indx] | |
if plist is None: | |
plist = list() | |
blist[indx] = plist | |
plist.append(pair) | |
x = list() | |
y = list() | |
yideal = list() | |
ece = 0 | |
mce = 0 | |
# per bin confidence and accuracy | |
b = 0 | |
for plist in blist: | |
if plist is not None: | |
#confidence | |
ypl = list(map(lambda p : p[0], plist)) | |
ypm = statistics.mean(ypl) | |
x.append(ypm) | |
#accuracy | |
ypcount = 0 | |
for p in plist: | |
yp = 1 if p[0] > prThreshhold else 0 | |
if (yp == 1 and p[1] == 1): | |
ypcount += 1 | |
acc = ypcount / len(plist) | |
y.append(acc) | |
yideal.append(ypm) | |
ce = abs(ypm - acc) | |
ece += len(plist) * ce | |
if ce > mce: | |
mce = ce | |
else: | |
ypm = minConf + (b + 0.5) * bsize | |
x.append(ypm) | |
yideal.append(ypm) | |
y.append(0) | |
b += 1 | |
#calibration plot | |
drawPairPlot(x, y, yideal, "confidence", "accuracy", "actual", "ideal") | |
print("confidence\taccuracy") | |
for z in zip(x,y): | |
print("{:.3f}\t{:.3f}".format(z[0], z[1])) | |
#expected calibration error | |
ece /= nsamp | |
print("expected calibration error\t{:.3f}".format(ece)) | |
print("maximum calibration error\t{:.3f}".format(mce)) | |
def findModelCalibrationLocal(model): | |
""" | |
pmodel calibration based k nearest neghbors | |
""" | |
FeedForwardNetwork.prepValidate(model) | |
FeedForwardNetwork.validateModel(model) | |
yPred = model.yPred.flatten() | |
yActual = model.validOutData.flatten() | |
nsamp = len(yActual) | |
neighborCnt = model.config.getIntConfig("calibrate.num.nearest.neighbors")[0] | |
prThreshhold = model.config.getFloatConfig("calibrate.pred.prob.thresh")[0] | |
fData = model.validFeatData.numpy() | |
tree = KDTree(fData, leaf_size=4) | |
dist, ind = tree.query(fData, k=neighborCnt) | |
calibs = list() | |
#all data | |
for si, ni in enumerate(ind): | |
conf = 0 | |
ypcount = 0 | |
#all neighbors | |
for i in ni: | |
conf += yPred[i] | |
yp = 1 if yPred[i] > prThreshhold else 0 | |
if (yp == 1 and yActual[i] == 1): | |
ypcount += 1 | |
conf /= neighborCnt | |
acc = ypcount / neighborCnt | |
calib = (si, conf, acc) | |
calibs.append(calib) | |
#descending sort by difference between confidence and accuracy | |
calibs = sorted(calibs, key=lambda c : abs(c[1] - c[2]), reverse=True) | |
print("local calibration") | |
print("conf\taccu\trecord") | |
for i in range(19): | |
si, conf, acc = calibs[i] | |
rec = toStrFromList(fData[si], 3) | |
print("{:.3f}\t{:.3f}\t{}".format(conf, acc, rec)) | |
def findModelSharpness(model): | |
""" | |
pmodel calibration | |
""" | |
FeedForwardNetwork.prepValidate(model) | |
FeedForwardNetwork.validateModel(model) | |
yPred = model.yPred.flatten() | |
yActual = model.validOutData.flatten() | |
nsamp = len(yActual) | |
#print(yPred.shape) | |
#print(yActual.shape) | |
nBins = model.config.getIntConfig("calibrate.num.bins")[0] | |
prThreshhold = model.config.getFloatConfig("calibrate.pred.prob.thresh")[0] | |
minConf = yPred.min() | |
maxConf = yPred.max() | |
bsize = (maxConf - minConf) / nBins | |
#print("minConf {:.3f} maxConf {:.3f} bsize {:.3f}".format(minConf, maxConf, bsize)) | |
blist = list(map(lambda i : None, range(nBins))) | |
#binning | |
for yp, ya in zip(yPred, yActual): | |
indx = int((yp - minConf) / bsize) | |
if indx == nBins: | |
indx = nBins - 1 | |
#print("yp {:.3f} indx {}".format(yp, indx)) | |
pair = (yp, ya) | |
plist = blist[indx] | |
if plist is None: | |
plist = list() | |
blist[indx] = plist | |
plist.append(pair) | |
y = list() | |
ypgcount = 0 | |
# per bin confidence and accuracy | |
for plist in blist: | |
#ypl = list(map(lambda p : p[0], plist)) | |
#ypm = statistics.mean(ypl) | |
#x.append(ypm) | |
ypcount = 0 | |
for p in plist: | |
yp = 1 if p[0] > prThreshhold else 0 | |
if (yp == 1 and p[1] == 1): | |
ypcount += 1 | |
ypgcount += 1 | |
acc = ypcount / len(plist) | |
y.append(acc) | |
print("{} {}".format(ypgcount, nsamp)) | |
accg = ypgcount / nsamp | |
accgl = [accg] * nBins | |
x = list(range(nBins)) | |
drawPairPlot(x, y, accgl, "discretized confidence", "accuracy", "local", "global") | |
contrast = list(map(lambda acc : abs(acc - accg), y)) | |
contrast = statistics.mean(contrast) | |
print("contrast {:.3f}".format(contrast)) | |
""" | |
neural model robustness | |
""" | |
class ModelRobustness(object): | |
def __init__(self): | |
pass | |
def localPerformance(self, model, fpath, nsamp, neighborCnt): | |
""" | |
local performnance sampling | |
""" | |
#load data | |
fData, oData = FeedForwardNetwork.prepData(model, fpath) | |
#print(type(fData)) | |
#print(type(oData)) | |
#print(fData.shape) | |
dsize = fData.shape[0] | |
ncol = fData.shape[1] | |
#kdd | |
tree = KDTree(fData, leaf_size=4) | |
scores = list() | |
indices = list() | |
for _ in range(nsamp): | |
indx = randomInt(0, dsize - 1) | |
indices.append(indx) | |
frow = fData[indx] | |
frow = np.reshape(frow, (1, ncol)) | |
dist, ind = tree.query(frow, k=neighborCnt) | |
ind = ind[0] | |
vfData = fData[ind] | |
voData = oData[ind] | |
#print(type(vfData)) | |
#print(vfData.shape) | |
#print(type(voData)) | |
#print(voData.shape) | |
model.setValidationData((vfData, voData), False) | |
score = FeedForwardNetwork.validateModel(model) | |
scores.append(score) | |
#performance distribution | |
m, s = basicStat(scores) | |
print("model performance: mean {:.3f}\tstd dev {:.3f}".format(m,s)) | |
drawHist(scores, "model accuracy", "accuracy", "frequency") | |
#worst performance | |
lscores = sorted(zip(indices, scores), key=lambda s : s[1]) | |
print(lscores[:5]) | |
lines = getFileLines(fpath, None) | |
print("worst performing features regions") | |
for i,s in lscores[:5]: | |
print("score {:.3f}\t{}".format(s, lines[i])) | |
""" | |
conformal prediction for regression | |
""" | |
class ConformalRegressionPrediction(object): | |
def __init__(self): | |
self.calibration = dict() | |
def calibrate(self, ypair, confBound): | |
""" n | |
calibration for conformal prediction | |
""" | |
cscores = list() | |
ymax = None | |
ymin = None | |
for yp, ya in ypair: | |
cscore = abs(yp - ya) | |
cscores.append(cscore) | |
if ymax is None: | |
ymax = ya | |
ymin = ya | |
else: | |
ymax = ya if ya > ymax else ymax | |
ymin = ya if ya < ymin else ymin | |
cscores.sort() | |
drawHist(cscores, "conformal score distribution", "conformal score", "frequency", 20) | |
cbi = int(confBound * len(cscores)) | |
scoreConfBound = cscores[cbi] | |
self.calibration["scoreConfBound"] = scoreConfBound | |
self.calibration["ymin"] = ymin | |
self.calibration["ymax"] = ymax | |
print(self.calibration) | |
def saveCalib(self, fPath): | |
""" | |
saves scoformal score calibration | |
""" | |
saveObject(self.calibration, fPath) | |
def restoreCalib(self, fPath): | |
""" | |
saves scoformal score calibration | |
""" | |
self.calibration = restoreObject(fPath) | |
print(self.calibration) | |
def getPredRange(self, yp, nstep=100): | |
""" | |
get prediction range and related data | |
""" | |
ymin = self.calibration["ymin"] | |
ymax = self.calibration["ymax"] | |
step = (ymax - ymin) / nstep | |
scoreConfBound = self.calibration["scoreConfBound"] | |
rmin = None | |
rmax = None | |
rcount = 0 | |
#print(ymin, ymax, step) | |
for ya in np.arange(ymin, ymax, step): | |
cscore = abs(yp - ya) | |
if cscore < scoreConfBound: | |
if rmin is None: | |
#lower bound | |
rmin = ya | |
rmax = ya | |
else: | |
#keep updating upper bound | |
rmax = ya if ya > rmax else rmax | |
rcount += 1 | |
else: | |
if rmax is not None and rcount > 0: | |
#past upper bound | |
break | |
res = dict() | |
res["predRangeMin"] = rmin | |
res["predRangeMax"] = rmax | |
accepted = yp >= rmin and yp <= rmax | |
res["status"] = "accepted" if accepted else "rejected" | |
conf = 1.0 - (rmax - rmin) / (ymax - ymin) | |
res["confidence"] = conf | |
return res | |