#!/usr/local/bin/python3 # avenir-python: Machine Learning # Author: Pranab Ghosh # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You may # obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. # Package imports import os import sys import matplotlib.pyplot as plt import numpy as np import sklearn as sk import matplotlib import random import jprops from io import StringIO from sklearn.model_selection import cross_val_score import joblib from random import randint from io import StringIO sys.path.append(os.path.abspath("../lib")) from util import * from mlutil import * from pasearch import * #base classifier class class BaseClassifier(object): def __init__(self, configFile, defValues, mname): self.config = Configuration(configFile, defValues) self.subSampleRate = None self.featData = None self.clsData = None self.classifier = None self.trained = False self.verbose = self.config.getBooleanConfig("common.verbose")[0] logFilePath = self.config.getStringConfig("common.logging.file")[0] logLevName = self.config.getStringConfig("common.logging.level")[0] self.logger = createLogger(mname, logFilePath, logLevName) self.logger.info("********* starting session") def initConfig(self, configFile, defValues): """ initialize config """ self.config = Configuration(configFile, defValues) def getConfig(self): """ get config object """ return self.config def setConfigParam(self, name, value): """ set config param """ self.config.setParam(name, value) def getMode(self): """ get mode """ return self.config.getStringConfig("common.mode")[0] def getSearchParamStrategy(self): """ get search parameter """ return self.config.getStringConfig("train.search.param.strategy")[0] def train(self): """ train model """ #build model self.buildModel() # training data if self.featData is None: (featData, clsData) = self.prepTrainingData() (self.featData, self.clsData) = (featData, clsData) else: (featData, clsData) = (self.featData, self.clsData) if self.subSampleRate is not None: (featData, clsData) = subSample(featData, clsData, self.subSampleRate, False) self.logger.info("subsample size " + str(featData.shape[0])) # parameters modelSave = self.config.getBooleanConfig("train.model.save")[0] #train self.logger.info("...training model") self.classifier.fit(featData, clsData) score = self.classifier.score(featData, clsData) successCriterion = self.config.getStringConfig("train.success.criterion")[0] result = None if successCriterion == "accuracy": self.logger.info("accuracy with training data {:06.3f}".format(score)) result = score elif successCriterion == "error": error = 1.0 - score self.logger.info("error with training data {:06.3f}".format(error)) result = error else: raise ValueError("invalid success criterion") if modelSave: self.logger.info("...saving model") modelFilePath = self.getModelFilePath() joblib.dump(self.classifier, modelFilePath) self.trained = True return result def trainValidate(self): """ train with k fold validation """ #build model self.buildModel() # training data (featData, clsData) = self.prepTrainingData() #parameter validation = self.config.getStringConfig("train.validation")[0] numFolds = self.config.getIntConfig("train.num.folds")[0] successCriterion = self.config.getStringConfig("train.success.criterion")[0] scoreMethod = self.config.getStringConfig("train.score.method")[0] #train with validation self.logger.info("...training and kfold cross validating model") scores = cross_val_score(self.classifier, featData, clsData, cv=numFolds,scoring=scoreMethod) avScore = np.mean(scores) result = self.reportResult(avScore, successCriterion, scoreMethod) return result def trainValidateSearch(self): """ train with k fold validation and search parameter space for optimum """ self.logger.info("...starting train validate with parameter search") searchStrategyName = self.getSearchParamStrategy() if searchStrategyName is not None: if searchStrategyName == "grid": searchStrategy = GuidedParameterSearch(self.verbose) elif searchStrategyName == "random": searchStrategy = RandomParameterSearch(self.verbose) maxIter = self.config.getIntConfig("train.search.max.iterations")[0] searchStrategy.setMaxIter(maxIter) elif searchStrategyName == "simuan": searchStrategy = SimulatedAnnealingParameterSearch(self.verbose) maxIter = self.config.getIntConfig("train.search.max.iterations")[0] searchStrategy.setMaxIter(maxIter) temp = self.config.getFloatConfig("train.search.sa.temp")[0] searchStrategy.setTemp(temp) tempRedRate = self.config.getFloatConfig("train.search.sa.temp.red.rate")[0] searchStrategy.setTempReductionRate(tempRedRate) else: raise ValueError("invalid paramtere search strategy") else: raise ValueError("missing search strategy") # add search params searchParams = self.config.getStringConfig("train.search.params")[0].split(",") searchParamNames = [] extSearchParamNames = [] if searchParams is not None: for searchParam in searchParams: paramItems = searchParam.split(":") extSearchParamNames.append(paramItems[0]) #get rid name component search paramNameItems = paramItems[0].split(".") del paramNameItems[1] paramItems[0] = ".".join(paramNameItems) searchStrategy.addParam(paramItems) searchParamNames.append(paramItems[0]) else: raise ValueError("missing search parameter list") # add search param data list for each param for (searchParamName,extSearchParamName) in zip(searchParamNames,extSearchParamNames): searchParamData = self.config.getStringConfig(extSearchParamName)[0].split(",") searchStrategy.addParamVaues(searchParamName, searchParamData) # train and validate for various param value combination searchStrategy.prepare() paramValues = searchStrategy.nextParamValues() searchResults = [] while paramValues is not None: self.logger.info("...next parameter set") paramStr = "" for paramValue in paramValues: self.setConfigParam(paramValue[0], str(paramValue[1])) paramStr = paramStr + paramValue[0] + "=" + str(paramValue[1]) + " " result = self.trainValidate() searchStrategy.setCost(result) searchResults.append((paramStr, result)) paramValues = searchStrategy.nextParamValues() # output self.logger.info("all parameter search results") for searchResult in searchResults: self.logger.info("{}\t{06.3f}".format(searchResult[0], searchResult[1])) self.logger.info("best parameter search result") bestSolution = searchStrategy.getBestSolution() paramStr = "" for paramValue in bestSolution[0]: paramStr = paramStr + paramValue[0] + "=" + str(paramValue[1]) + " " self.logger.info("{}\t{:06.3f}".format(paramStr, bestSolution[1])) return bestSolution def validate(self): """ predict """ # create model useSavedModel = self.config.getBooleanConfig("validate.use.saved.model")[0] if useSavedModel: # load saved model self.logger.info("...loading model") modelFilePath = self.getModelFilePath() self.classifier = joblib.load(modelFilePath) else: # train model if not self.trained: self.train() # prepare test data (featData, clsDataActual) = self.prepValidationData() #predict self.logger.info("...predicting") clsDataPred = self.classifier.predict(featData) self.logger.info("...validating") #print clsData scoreMethod = self.config.getStringConfig("validate.score.method")[0] if scoreMethod == "accuracy": accuracy = sk.metrics.accuracy_score(clsDataActual, clsDataPred) self.logger.info("accuracy:") self.logger.info(accuracy) elif scoreMethod == "confusionMatrix": confMatrx = sk.metrics.confusion_matrix(clsDataActual, clsDataPred) self.logger.info("confusion matrix:") self.logger.info(confMatrx) def predictx(self): """ predict """ # create model self.prepModel() # prepare test data featData = self.prepPredictData() #predict self.logger.info("...predicting") clsData = self.classifier.predict(featData) self.logger.info(clsData) def predict(self, recs=None): """ predict with in memory data """ # create model self.prepModel() #input record if recs: #passed record featData = self.prepStringPredictData(recs) if (featData.ndim == 1): featData = featData.reshape(1, -1) else: #file featData = self.prepPredictData() #predict self.logger.info("...predicting") clsData = self.classifier.predict(featData) return clsData def predictProb(self, recs): """ predict probability with in memory data """ raise ValueError("can not predict class probability") def prepModel(self): """ preparing model """ useSavedModel = self.config.getBooleanConfig("predict.use.saved.model")[0] if (useSavedModel and not self.classifier): # load saved model self.logger.info("...loading saved model") modelFilePath = self.getModelFilePath() self.classifier = joblib.load(modelFilePath) else: # train model if not self.trained: self.train() def prepTrainingData(self): """ loads and prepares training data """ # parameters dataFile = self.config.getStringConfig("train.data.file")[0] fieldIndices = self.config.getStringConfig("train.data.fields")[0] if not fieldIndices is None: fieldIndices = strToIntArray(fieldIndices, ",") featFieldIndices = self.config.getStringConfig("train.data.feature.fields")[0] if not featFieldIndices is None: featFieldIndices = strToIntArray(featFieldIndices, ",") classFieldIndex = self.config.getIntConfig("train.data.class.field")[0] #training data (data, featData) = loadDataFile(dataFile, ",", fieldIndices, featFieldIndices) if (self.config.getStringConfig("common.preprocessing")[0] == "scale"): scalingMethod = self.config.getStringConfig("common.scaling.method")[0] featData = scaleData(featData, scalingMethod) clsData = extrColumns(data, classFieldIndex) clsData = np.array([int(a) for a in clsData]) return (featData, clsData) def prepValidationData(self): """ loads and prepares training data """ # parameters dataFile = self.config.getStringConfig("validate.data.file")[0] fieldIndices = self.config.getStringConfig("validate.data.fields")[0] if not fieldIndices is None: fieldIndices = strToIntArray(fieldIndices, ",") featFieldIndices = self.config.getStringConfig("validate.data.feature.fields")[0] if not featFieldIndices is None: featFieldIndices = strToIntArray(featFieldIndices, ",") classFieldIndex = self.config.getIntConfig("validate.data.class.field")[0] #training data (data, featData) = loadDataFile(dataFile, ",", fieldIndices, featFieldIndices) if (self.config.getStringConfig("common.preprocessing")[0] == "scale"): scalingMethod = self.config.getStringConfig("common.scaling.method")[0] featData = scaleData(featData, scalingMethod) clsData = extrColumns(data, classFieldIndex) clsData = [int(a) for a in clsData] return (featData, clsData) def prepPredictData(self): """ loads and prepares training data """ # parameters dataFile = self.config.getStringConfig("predict.data.file")[0] if dataFile is None: raise ValueError("missing prediction data file") fieldIndices = self.config.getStringConfig("predict.data.fields")[0] if not fieldIndices is None: fieldIndices = strToIntArray(fieldIndices, ",") featFieldIndices = self.config.getStringConfig("predict.data.feature.fields")[0] if not featFieldIndices is None: featFieldIndices = strToIntArray(featFieldIndices, ",") #training data (data, featData) = loadDataFile(dataFile, ",", fieldIndices, featFieldIndices) if (self.config.getStringConfig("common.preprocessing")[0] == "scale"): scalingMethod = self.config.getStringConfig("common.scaling.method")[0] featData = scaleData(featData, scalingMethod) return featData def prepStringPredictData(self, recs): """ prepare string predict data """ frecs = StringIO(recs) featData = np.loadtxt(frecs, delimiter=',') return featData def getModelFilePath(self): """ get model file path """ modelDirectory = self.config.getStringConfig("common.model.directory")[0] modelFile = self.config.getStringConfig("common.model.file")[0] if modelFile is None: raise ValueError("missing model file name") modelFilePath = modelDirectory + "/" + modelFile return modelFilePath def reportResult(self, score, successCriterion, scoreMethod): """ report result """ if successCriterion == "accuracy": self.logger.info("average " + scoreMethod + " with k fold cross validation {:06.3f}".format(score)) result = score elif successCriterion == "error": error = 1.0 - score self.logger.info("average error with k fold cross validation {:06.3f}".format(error)) result = error else: raise ValueError("invalid success criterion") return result def autoTrain(self): """ auto train """ maxTestErr = self.config.getFloatConfig("train.auto.max.test.error")[0] maxErr = self.config.getFloatConfig("train.auto.max.error")[0] maxErrDiff = self.config.getFloatConfig("train.auto.max.error.diff")[0] self.config.setParam("train.model.save", "False") #train, validate and serach optimum parameter result = self.trainValidateSearch() testError = result[1] #subsample training size to match train size for k fold validation numFolds = self.config.getIntConfig("train.num.folds")[0] self.subSampleRate = float(numFolds - 1) / numFolds #train only with optimum parameter values for paramValue in result[0]: pName = paramValue[0] pValue = paramValue[1] self.logger.info(pName + " " + pValue) self.setConfigParam(pName, pValue) trainError = self.train() if testError < maxTestErr: # criteria based on test error only self.logger.info("Successfullt trained. Low test error level") status = 1 else: # criteria based on bias error and generalization error avError = (trainError + testError) / 2 diffError = testError - trainError self.logger.info("Auto training completed: training error {:06.3f} test error: {:06.3f}".format(trainError, testError)) self.logger.info("Average of test and training error: {:06.3f} test and training error diff: {:06.3f}".format(avError, diffError)) if diffError > maxErrDiff: # high generalization error if avError > maxErr: # high bias error self.logger.info("High generalization error and high error. Need larger training data set and increased model complexity") status = 4 else: # low bias error self.logger.info("High generalization error. Need larger training data set") status = 3 else: # low generalization error if avError > maxErr: # high bias error self.logger.info("Converged, but with high error rate. Need to increase model complexity") status = 2 else: # low bias error self.logger.info("Successfullt trained. Low generalization error and low bias error level") status = 1 if status == 1: #train final model, use all data and save model self.logger.info("...training the final model") self.config.setParam("train.model.save", "True") self.subSampleRate = None trainError = self.train() self.logger.info("training error in final model {:06.3f}".format(trainError)) return status