Spaces:
Sleeping
Sleeping
import numpy as np | |
import torch | |
import torch.nn as nn | |
# import torch.nn.functional as F | |
import torch.optim as optim | |
# from torch.autograd import Variable | |
#import torch.distributed as dist | |
# import time | |
import os | |
import re | |
# import sys | |
# import io | |
from tqdm import tqdm | |
import nltk | |
from lstm_model_new import LSTM_model, BiLSTMModel | |
from max_ent_model import MaxEntropyModel | |
from svm_model import SVM | |
nltk.download('punkt') | |
class Trainer(object): | |
def __init__(self, vocab_size, sequence_len, batch_size, nn_epochs, model_type): | |
# vocab_size = 8000 | |
# sequence_len = 150 | |
self.vocab_size = vocab_size | |
self.vocab_sizeb = self.vocab_size + 1 | |
self.sequence_len = sequence_len | |
self.model_type = model_type | |
self.batch_size = batch_size | |
self.nn_epochs = nn_epochs | |
self.processed_data_folder = "../preprocessed_data/" | |
self._load_data() | |
self._get_model() | |
# self._setup_optimizer() | |
pass | |
def _load_data(self, ): | |
dict_fn = "yelp_dictionary.npy" | |
id_to_word = np.load(dict_fn, allow_pickle=True) # .item() | |
print(type(id_to_word)) | |
print(id_to_word[0], len(id_to_word)) | |
word_to_id = { | |
id_to_word[idx]: idx for idx in range(len(id_to_word)) | |
} | |
# word_to_id = {v: k for k, v in id_to_word.items()} | |
self.word_to_id = word_to_id | |
# x_train = np.load('../preprocessed_data/x_train.npy') | |
# y_train = np.load('../preprocessed_data/y_train.npy') | |
# #x_train = x_train[:10000] | |
# #y_train = y_train[:10000] | |
# x_test = np.load('../preprocessed_data/x_test.npy') | |
# y_test = np.load('../preprocessed_data/y_test.npy') | |
# x_train_path = os.path.join(self.processed_data_folder, "x_train.npy") | |
# y_train_path = os.path.join(self.processed_data_folder, "y_train.npy") | |
# x_test_path = os.path.join(self.processed_data_folder, "x_test.npy") | |
# y_test_path = os.path.join(self.processed_data_folder, "y_test.npy") | |
# x_train = np.load(x_train_path) | |
# y_train = np.load(y_train_path) | |
# x_test = np.load(x_test_path) | |
# y_test = np.load(y_test_path) | |
# self.x_train = x_train | |
# self.y_train = y_train | |
# self.x_test = x_test | |
# self.y_test = y_test | |
def _get_model(self, ): | |
if self.model_type == "lstm": | |
self.model = LSTM_model(self.vocab_sizeb, 800) | |
elif self.model_type == "bilstm": | |
self.model = BiLSTMModel(self.vocab_sizeb, 800) | |
elif self.model_type == "max_ent": | |
self.model = MaxEntropyModel() | |
elif self.model_type == "svm": | |
self.model = SVM() | |
else: | |
raise ValueError("Model type not supported") | |
# self.model.cuda() | |
if self.model_type in ['lstm', 'bilstm']: | |
# self.model = self.model.cuda() | |
model_ckpt_fn = f"{self.model_type}.pth" | |
self.model.load_state_dict(torch.load(model_ckpt_fn, map_location=torch.device('cpu'))) | |
elif self.model_type in ['max_ent']: | |
model_ckpt_fn = f"{self.model_type}_ckpt.npy" # max_ent # | |
model_params = np.load(model_ckpt_fn, allow_pickle=True).item() | |
features = model_params["features"] | |
weights = model_params["weights"] | |
self.model.weights = weights # .tolist() | |
# print(f"self.model.weights: {self.model.weights[:10]}") | |
self.model.last_weights = weights # .tolist() | |
self.model.features = features | |
# print(f"self.model.features: {list(self.model.features.keys())[:10]}") | |
elif self.model_type in ['svm']: | |
model_ckpt_fn = f"{self.model_type}_weights.npy" | |
model_params = np.load(model_ckpt_fn, allow_pickle=True).item() | |
w = model_params['w'] | |
b = model_params['b'] | |
self.model.svm_model.w = w | |
self.model.svm_model.b = b | |
else: | |
raise ValueError("Model type not supported") | |
def _setup_optimizer(self, ): | |
self.lr = 0.001 | |
self.opt = optim.Adam(self.model.parameters(), lr=self.lr) | |
def _train(self, ): | |
train_losses = [] | |
train_accs = [] | |
test_accs = [0.0] | |
for epoch in range(self.nn_epochs): | |
print(f"Epoch: {epoch}") | |
self.model.train() | |
nn_acc = 0 | |
nn_total = 0 | |
epoch_loss = 0.0 | |
train_permutation_idxes = np.random.permutation(self.y_train.shape[0]) | |
for i in tqdm(range(0, len(self.y_train), self.batch_size)): | |
batched_x = self.x_train[train_permutation_idxes[i: i + self.batch_size]] | |
batched_y = self.y_train[train_permutation_idxes[i: i + self.batch_size]] | |
data = torch.from_numpy(batched_x).long().cuda() | |
target = torch.from_numpy(batched_y).float().cuda() | |
self.opt.zero_grad() | |
loss, predicted_labels = self.model(data, target) | |
loss.backward() | |
norm = nn.utils.clip_grad_norm_(self.model.parameters(), 2.0) | |
self.opt.step() | |
predicted_labels = predicted_labels >= 0 | |
gts = target >= 0.5 | |
acc = torch.sum((predicted_labels == gts).float()).item() | |
nn_acc += acc | |
epoch_loss += loss.item() | |
nn_total += len(batched_y) | |
train_acc = float(nn_acc) / float(nn_total) | |
train_loss = epoch_loss / float(self.batch_size) | |
train_losses.append(train_loss) | |
train_accs.append(train_acc) | |
print(f"[Epoch {epoch}] Train Loss: {train_loss}, Train Acc: {train_acc}") | |
self._test() | |
def _process_text(self, input_text): | |
text = re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(input_text))).lower() | |
tokens = nltk.word_tokenize(text) | |
token_ids = [ self.word_to_id.get(token, -1) + 1 for token in tokens ] | |
token_ids = np.array(token_ids) | |
token_ids[token_ids > self.vocab_size] = 0 | |
if token_ids.shape[0] > self.sequence_len: | |
start_index = np.random.randint(token_ids.shape[0 ]- self.sequence_len + 1) | |
token_ids = token_ids[start_index: (start_index + self.sequence_len)] | |
else: | |
token_ids = np.concatenate([token_ids, np.zeros(self.sequence_len - token_ids.shape[0])]) | |
return token_ids | |
def _process_text_maxent(self, input_text): | |
text = re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(input_text))).lower() | |
tokens = nltk.word_tokenize(text) | |
token_ids = [ self.word_to_id.get(token, -1) + 1 for token in tokens ] | |
# token_ids = np.array(token_ids) | |
token_ids = [ str(word_idx) for word_idx in token_ids ] | |
return token_ids | |
# token_ids[token_ids > self.vocab_size] = 0 | |
# return token_ids | |
def _process_text_svm(self, input_text): | |
text = re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(input_text))).lower() | |
tokens = self.model.vectorizer.transform([text]).toarray() | |
# tokens = nltk.word_tokenize(text) | |
# token_ids = [ self.word_to_id.get(token, -1) + 1 for token in tokens ] | |
# # token_ids = np.array(token_ids) | |
# token_ids = [ str(word_idx) for word_idx in token_ids ] | |
return tokens | |
def predict_maxent(self, input_text): | |
text_ids = self._process_text_maxent(input_text) | |
prob = self.model.calculate_probability(text_ids) | |
prob.sort(reverse=True) | |
# print(label, prob) | |
print(prob) | |
##### Calculate whether the prediction is correct ##### | |
maxx_prob_idx = int(prob[0][1]) | |
# data = torch.from_numpy(text_ids).long() # .cuda() | |
# data = data.unsqueeze(0) | |
# target = torch.zeros((data.size(0), ), dtype=torch.float) | |
# # print(f"data: {data.shape}, target: {target.shape}") | |
# with torch.no_grad(): | |
# loss, predicted_labels = self.model(data, target) | |
# predicted_labels = predicted_labels >= 0 | |
if maxx_prob_idx == 2: | |
return "Positive" | |
else: | |
return "Negative" | |
def predict_svm(self, input_text): | |
text_ids = self._process_text_svm(input_text) | |
predicted_label = self.model.svm_model.predict(text_ids) | |
if float(predicted_label[0]) > 0: | |
return "Positive" | |
else: | |
return "Negative" | |
# prob = self.model.calculate_probability(text_ids) | |
# prob.sort(reverse=True) | |
# # print(label, prob) | |
# print(prob) | |
# ##### Calculate whether the prediction is correct ##### | |
# maxx_prob_idx = int(prob[0][1]) | |
# # data = torch.from_numpy(text_ids).long() # .cuda() | |
# # data = data.unsqueeze(0) | |
# # target = torch.zeros((data.size(0), ), dtype=torch.float) | |
# # # print(f"data: {data.shape}, target: {target.shape}") | |
# # with torch.no_grad(): | |
# # loss, predicted_labels = self.model(data, target) | |
# # predicted_labels = predicted_labels >= 0 | |
# if maxx_prob_idx == 2: | |
# return "Positive" | |
# else: | |
# return "Negative" | |
def predict(self, input_text): | |
text_ids = self._process_text(input_text) | |
data = torch.from_numpy(text_ids).long() # .cuda() | |
data = data.unsqueeze(0) | |
target = torch.zeros((data.size(0), ), dtype=torch.float) | |
# print(f"data: {data.shape}, target: {target.shape}") | |
with torch.no_grad(): | |
loss, predicted_labels = self.model(data, target) | |
predicted_labels = predicted_labels >= 0 | |
if predicted_labels.item(): | |
return "Positive" | |
else: | |
return "Negative" | |
# return predicted_labels.item() | |
def _test(self, ): | |
self.model.eval() | |
nn_acc = 0 | |
loss = 0 | |
nn_total = 0 | |
test_permutation_idxes = np.random.permutation(self.y_test.shape[0]) | |
for i in tqdm(range(0, len(self.y_test), self.batch_size)): | |
batched_x = self.x_test[test_permutation_idxes[i: i + self.batch_size]] | |
batched_y = self.y_test[test_permutation_idxes[i: i + self.batch_size]] | |
data = torch.from_numpy(batched_x).long().cuda() | |
target = torch.from_numpy(batched_y).float().cuda() | |
with torch.no_grad(): | |
loss, predicted_labels = self.model(data, target) | |
predicted_labels = predicted_labels >= 0 | |
gts = target >= 0.5 | |
acc = torch.sum((predicted_labels == gts).float()).item() | |
nn_acc += acc | |
nn_total += len(batched_y) | |
acc = float(nn_acc) / float(nn_total) | |
print(f"Test Acc: {acc}") | |
if __name__=='__main__': | |
vocab_size = 8000 | |
sequence_len = 150 | |
# batch_size = 1024 | |
batch_size = 256 | |
nn_epochs = 20 | |
model_type = "lstm" | |
model_type = "bilstm" | |
trainer = Trainer(vocab_size, sequence_len, batch_size, nn_epochs, model_type) | |
trainer._train() | |
# CUDA_VISIBLE_DEVICES=0 python trainer.py |