import os import sys import matplotlib.pyplot as plt import numpy as np import random import jprops from random import randint from matumizi.util import * from matumizi.mlutil import * """ Markov chain classifier """ class MarkovChainClassifier(): def __init__(self, configFile): """ constructor Parameters configFile: config file path """ defValues = {} defValues["common.model.directory"] = ("model", None) defValues["common.model.file"] = (None, None) defValues["common.verbose"] = (False, None) defValues["common.states"] = (None, "missing state list") defValues["train.data.file"] = (None, "missing training data file") defValues["train.data.class.labels"] = (["F", "T"], None) defValues["train.data.key.len"] = (1, None) defValues["train.model.save"] = (False, None) defValues["train.score.method"] = ("accuracy", None) defValues["predict.data.file"] = (None, None) defValues["predict.use.saved.model"] = (True, None) defValues["predict.log.odds.threshold"] = (0, None) defValues["validate.data.file"] = (None, "missing validation data file") defValues["validate.use.saved.model"] = (False, None) defValues["valid.accuracy.metric"] = ("acc", None) self.config = Configuration(configFile, defValues) self.stTranPr = dict() self.clabels = self.config.getStringListConfig("train.data.class.labels")[0] self.states = self.config.getStringListConfig("common.states")[0] self.nstates = len(self.states) for cl in self.clabels: stp = np.ones((self.nstates,self.nstates)) self.stTranPr[cl] = stp def train(self): """ train model """ #state transition matrix tdfPath = self.config.getStringConfig("train.data.file")[0] klen = self.config.getIntConfig("train.data.key.len")[0] for rec in fileRecGen(tdfPath): cl = rec[klen] rlen = len(rec) for i in range(klen+1, rlen-1, 1): fst = self.states.index(rec[i]) tst = self.states.index(rec[i+1]) self.stTranPr[cl][fst][tst] += 1 #normalize to probability for cl in self.clabels: stp = self.stTranPr[cl] for i in range(self.nstates): s = stp[i].sum() r = stp[i] / s stp[i] = r #save if self.config.getBooleanConfig("train.model.save")[0]: mdPath = self.config.getStringConfig("common.model.directory")[0] assert os.path.exists(mdPath), "model save directory does not exist" mfPath = self.config.getStringConfig("common.model.file")[0] mfPath = os.path.join(mdPath, mfPath) with open(mfPath, "w") as fh: for cl in self.clabels: fh.write("label:" + cl +"\n") stp = self.stTranPr[cl] for r in stp: rs = ",".join(toStrList(r, 6)) + "\n" fh.write(rs) def validate(self): """ validate using model """ useSavedModel = self.config.getBooleanConfig("predict.use.saved.model")[0] if useSavedModel: self.__restoreModel() else: self.train() vdfPath = self.config.getStringConfig("validate.data.file")[0] accMetric = self.config.getStringConfig("valid.accuracy.metric")[0] yac, ypr = self.__getPrediction(vdfPath, True) if type(self.clabels[0]) == str: yac = self.__toIntClabel(yac) ypr = self.__toIntClabel(ypr) score = perfMetric(accMetric, yac, ypr) print(formatFloat(3, score, "perf score")) def predict(self): """ predict using model """ useSavedModel = self.config.getBooleanConfig("predict.use.saved.model")[0] if useSavedModel: self.__restoreModel() else: self.train() #predict pdfPath = self.config.getStringConfig("predict.data.file")[0] _ , ypr = self.__getPrediction(pdfPath) return ypr def __restoreModel(self): """ restore model """ mdPath = self.config.getStringConfig("common.model.directory")[0] assert os.path.exists(mdPath), "model save directory does not exist" mfPath = self.config.getStringConfig("common.model.file")[0] mfPath = os.path.join(mdPath, mfPath) stp = None cl = None for rec in fileRecGen(mfPath): if len(rec) == 1: if stp is not None: stp = np.array(stp) self.stTranPr[cl] = stp cl = rec[0].split(":")[1] stp = list() else: frec = asFloatList(rec) stp.append(frec) stp = np.array(stp) self.stTranPr[cl] = stp def __getPrediction(self, fpath, validate=False): """ get predictions Parameters fpath : data file path validate: True if validation """ nc = self.clabels[0] pc = self.clabels[1] thold = self.config.getFloatConfig("predict.log.odds.threshold")[0] klen = self.config.getIntConfig("train.data.key.len")[0] offset = klen+1 if validate else klen ypr = list() yac = list() for rec in fileRecGen(fpath): lodds = 0 rlen = len(rec) for i in range(offset, rlen-1, 1): fst = self.states.index(rec[i]) tst = self.states.index(rec[i+1]) odds = self.stTranPr[pc][fst][tst] / self.stTranPr[nc][fst][tst] lodds += math.log(odds) prc = pc if lodds > thold else nc ypr.append(prc) if validate: yac.append(rec[klen]) else: recp = prc + "\t" + ",".join(rec) print(recp) re = (yac, ypr) return re def __toIntClabel(self, labels): """ convert string class label to int Parameters labels : class label values """ return list(map(lambda l : self.clabels.index(l), labels))