Spaces:
Sleeping
Sleeping
import os | |
import math | |
class MaxEntropyModel: | |
def __init__(self, ): | |
self.train_set = [] | |
self.features = {} | |
self.labels = {} | |
self.labels = { | |
'1': 1, '2': 1 | |
} | |
def load_data(self, fn): | |
with open(fn, "r") as rf: | |
for line in rf: | |
label, review = line.strip().split(',') | |
label = label[1: -1] | |
review = review.split(' ') | |
fields = [str(int(label))] + review | |
if review != '': | |
label = str(int(label)) | |
self.labels[label] = 1 | |
for s in set(fields[1:]): | |
if (label, s) not in self.features: | |
self.features[(label, s)] = 1 | |
else: | |
self.features[(label, s)] += 1 | |
self.train_set.append(fields) | |
rf.close() | |
def initialize_parameters(self, ): | |
self.train_set_size = len(self.train_set) | |
self.M = max([len(record)-1 for record in self.train_set]) | |
self.ep = [0.0 for _ in range(len(self.features))] | |
for i_f, feat in enumerate(self.features): | |
self.ep[i_f] = float(self.features[feat]) / float(self.train_set_size) | |
self.features[feat] = i_f | |
self.weights = [0.0 for _ in range(len(self.features))] | |
self.last_weights = self.weights | |
def get_prob_weight(self, features, label): | |
weight = 0.0 | |
for feat in features: | |
# print(label, feat) | |
if (label, feat) in self.features: | |
weight += self.weights[self.features[(label, feat)]] | |
prob_weight = math.exp(weight) | |
# print(f"label: {label}, prob_weight: {prob_weight}") | |
return prob_weight | |
def get_expected_features(self, ): | |
expected_features = [0.0 for _ in range(len(self.features))] | |
for record in self.train_set: | |
features = record[1:] | |
prob = self.calculate_probability(features) | |
for feat in features: | |
for w, l in prob: | |
if (l, feat) in self.features: | |
idx = self.features[(l, feat)] | |
expected_features[idx] += w * (1.0 / self.train_set_size) | |
return expected_features | |
def calculate_probability(self, features): | |
weights = [(self.get_prob_weight(features, l), l) for l in self.labels] | |
tot_weights = [w for w, l in weights] | |
Z = sum(tot_weights) | |
prob = [(w / Z, l) for w, l in weights] | |
return prob | |
def train(self, max_iter=10000): | |
self.initialize_parameters() | |
for i in range(max_iter): | |
print(f"[Training] iter {i + 1} ...") | |
self.new_ep = self.get_expected_features() | |
self.last_weights = self.weights[:] | |
for i, w in enumerate(self.weights): | |
delta = 1.0 / self.M * math.log(self.ep[i] / self.new_ep[i]) | |
self.weights[i] = self.weights[i] + delta | |
if i % 10 == 0: | |
test_data_path = "../preprocessed_data/yelp_test.txt" | |
print(f"Start testing...") | |
self.test(test_data_path) | |
def test(self, test_data_path): | |
f = open(file=test_data_path) | |
tot_test_nn = 0 | |
correct_test_nn = 0 | |
for line in f: | |
label, review = line.strip().split(',') | |
label = label[1: -1] | |
review = review.split(' ') | |
# fields = [str(int(label))] + review ## get split review ## # | |
# input text: review # | |
# output: label # | |
# review # | |
prob = self.calculate_probability(review) | |
prob.sort(reverse=True) | |
print(label, prob) | |
##### Calculate whether the prediction is correct ##### | |
maxx_prob_idx = int(prob[0][1]) | |
label_idx = int(label) | |
if maxx_prob_idx == label_idx: | |
correct_test_nn += 1 | |
tot_test_nn += 1 | |
##### Calculate whether the prediction is correct ##### | |
f.close() | |
acc = float(correct_test_nn) / float(tot_test_nn) | |
print(f"[Test] Acc: {acc}") | |
def save_ckpt(self, sv_ckpt_path): | |
sv_features = self.features | |
sv_weights = self.last_weights | |
sv_ckpt = { | |
'features': sv_features, | |
'weights': sv_weights | |
} | |
np.save(sv_ckpt_path, sv_ckpt) | |
print(f"ckpt with features and weights saved to {sv_ckpt_path}") | |