import os import math class MaxEntropyModel: def __init__(self, ): self.train_set = [] self.features = {} self.labels = {} self.labels = { '1': 1, '2': 1 } def load_data(self, fn): with open(fn, "r") as rf: for line in rf: label, review = line.strip().split(',') label = label[1: -1] review = review.split(' ') fields = [str(int(label))] + review if review != '': label = str(int(label)) self.labels[label] = 1 for s in set(fields[1:]): if (label, s) not in self.features: self.features[(label, s)] = 1 else: self.features[(label, s)] += 1 self.train_set.append(fields) rf.close() def initialize_parameters(self, ): self.train_set_size = len(self.train_set) self.M = max([len(record)-1 for record in self.train_set]) self.ep = [0.0 for _ in range(len(self.features))] for i_f, feat in enumerate(self.features): self.ep[i_f] = float(self.features[feat]) / float(self.train_set_size) self.features[feat] = i_f self.weights = [0.0 for _ in range(len(self.features))] self.last_weights = self.weights def get_prob_weight(self, features, label): weight = 0.0 for feat in features: # print(label, feat) if (label, feat) in self.features: weight += self.weights[self.features[(label, feat)]] prob_weight = math.exp(weight) # print(f"label: {label}, prob_weight: {prob_weight}") return prob_weight def get_expected_features(self, ): expected_features = [0.0 for _ in range(len(self.features))] for record in self.train_set: features = record[1:] prob = self.calculate_probability(features) for feat in features: for w, l in prob: if (l, feat) in self.features: idx = self.features[(l, feat)] expected_features[idx] += w * (1.0 / self.train_set_size) return expected_features def calculate_probability(self, features): weights = [(self.get_prob_weight(features, l), l) for l in self.labels] tot_weights = [w for w, l in weights] Z = sum(tot_weights) prob = [(w / Z, l) for w, l in weights] return prob def train(self, max_iter=10000): self.initialize_parameters() for i in range(max_iter): print(f"[Training] iter {i + 1} ...") self.new_ep = self.get_expected_features() self.last_weights = self.weights[:] for i, w in enumerate(self.weights): delta = 1.0 / self.M * math.log(self.ep[i] / self.new_ep[i]) self.weights[i] = self.weights[i] + delta if i % 10 == 0: test_data_path = "../preprocessed_data/yelp_test.txt" print(f"Start testing...") self.test(test_data_path) def test(self, test_data_path): f = open(file=test_data_path) tot_test_nn = 0 correct_test_nn = 0 for line in f: label, review = line.strip().split(',') label = label[1: -1] review = review.split(' ') # fields = [str(int(label))] + review ## get split review ## # # input text: review # # output: label # # review # prob = self.calculate_probability(review) prob.sort(reverse=True) print(label, prob) ##### Calculate whether the prediction is correct ##### maxx_prob_idx = int(prob[0][1]) label_idx = int(label) if maxx_prob_idx == label_idx: correct_test_nn += 1 tot_test_nn += 1 ##### Calculate whether the prediction is correct ##### f.close() acc = float(correct_test_nn) / float(tot_test_nn) print(f"[Test] Acc: {acc}") def save_ckpt(self, sv_ckpt_path): sv_features = self.features sv_weights = self.last_weights sv_ckpt = { 'features': sv_features, 'weights': sv_weights } np.save(sv_ckpt_path, sv_ckpt) print(f"ckpt with features and weights saved to {sv_ckpt_path}")