xymeow7's picture
Update svm_model.py
2b0e8d0 verified
import numpy as np
import re
import time
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
# from sklearn.linear_model import LogisticRegression
# from sklearn.svm import SVC
import ssl
import os
import nltk
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context
# print(f"nltk version: {nltk.__version__}")
# nltk.download('stopwords')
#
class SVMModel:
def __init__(self, learning_rate=0.01, lambda_param=0.01, n_iters=1000):
self.learning_rate = learning_rate
self.lambda_param = lambda_param
self.n_iters = n_iters
self.w = None
self.b = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
def fit(self, X, y):
n_samples, n_features = X.shape
y_ = np.where(y <= 0, -1, 1) # Convert labels to -1 and 1
print(f"y_ max: {np.max(y_)}, y_ min: {np.min(y_)}")
self.w = np.zeros(n_features)
self.b = 0
self.lambda_param = 1.0 / float(n_samples)
for _ in range(self.n_iters):
print(f"Epoch: {_}")
for idx, x_i in enumerate(X):
condition = y_[idx] * (np.dot(x_i, self.w) - self.b) >= 1
if condition:
self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w)
else:
self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w - np.dot(x_i, y_[idx]))
self.b = self.b - self.learning_rate * y_[idx]
if _ % 1 == 0:
# print(f"Iteration: {_}")
st_time = time.time()
self.test()
print(f"Time: {time.time() - st_time}")
def predict(self, X):
linear_output = np.matmul(X, self.w[:, None]) - self.b # []
return np.sign(linear_output[:, 0])
def test(self, ):
# test_ours(self, ):
linear_output = self.predict(self.X_test)
print(f"linear_output: {linear_output.shape}, self.X_test: {self.X_test.shape}")
acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32))
print(f"Test Acc: {acc}")
return linear_output
# weights_dict = self.svm_model.get_weights_dict()
def get_weights_dict(self, ):
weights_dict = {
'w': self.w,
'b': self.b
}
return weights_dict
class SVM:
def __init__(self, ):
# file_path =
self.x_train = []
self.y_train = []
self.x_test = []
self.y_test = []
self.data_folder = '.'
print(f"Start loading data")
# self._load_data()
print(f"Setting vectorizer")
# self.vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english'))
# parmas_dict = np.load("svm_vectorizer.npy", allow_pickle=True).item()
# print(f"parmas_dict: {parmas_dict.keys()}")
# self.vectorizer.set_params(**parmas_dict)
import pickle
self.vectorizer = pickle.load(open("tfidf.pickle", "rb"))
# print(f"Start preprocessing data")
# self._preprocess_data()
# self.setup_model()
self.setup_model_ours()
pass
def _load_data(self, ):
file_path = '.'
x_train = []
y_train = []
with open(os.path.join(self.data_folder, 'train.csv'), "r") as f:
for line in f:
l = line.strip().split(',')
senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower()
x_train.append(text)
y_train.append(int(senti[1]) - 1)
f.close()
x_test = []
y_test = []
with open(os.path.join(self.data_folder, 'test.csv'), "r") as f:
for line in f:
l = line.strip().split(',')
senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower()
x_test.append(text)
y_test.append(int(senti[1]) - 1)
f.close()
self.x_train = x_train
self.x_test = x_test
self.y_train = np.array(y_train, dtype=np.int32)
self.y_test = np.array(y_test, dtype=np.int32)
print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}")
def _preprocess_data(self, ):
self.X_train = self.vectorizer.fit_transform(self.x_train).toarray()
import pickle
# self.vectorizer_params = self.vectorizer.get_params()
# np.save("svm_vectorizer.npy", self.vectorizer_params)
pickle.dump(self.vectorizer, open("tfidf.pickle", "wb"))
self.X_test = self.vectorizer.transform(self.x_test).toarray()
# self.X_train = self.vectorizer.transform
def setup_model_ours(self, ):
self.svm_model = SVMModel()
def train_ours(self, ):
self.y_train = self.y_train.astype(np.float32)
self.y_test = self.y_test.astype(np.float32)
self.y_train = self.y_train * 2 - 1.0
self.y_test = self.y_test * 2 - 1.0
print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}")
self.svm_model.X_train = self.X_train
self.svm_model.X_test = self.X_test
self.svm_model.y_train = self.y_train
self.svm_model.y_test = self.y_test
self.svm_model.fit(self.X_train, self.y_train)
def test_ours(self, ):
linear_output = self.svm_model.test()
acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32))
print(f"Test Acc: {acc}")
weights_dict = self.svm_model.get_weights_dict()
np.save("svm_weights.npy", weights_dict)
print(f"svm weights saved to svm_weights.npy")
# def setup_model(self, ):
# self.svc = SVC()
# def train(self, ):
# self.svc.fit(self.X_train, self.y_train)
# def test(self, ):
# self.train_acc = self.svc.score(self.X_train, self.y_train)
# self.test_acc = self.svc.score(self.X_test, self.y_test)
# print(f'Train Acc: {self.train_acc * 100}\n', f'Test Acc: {self.test_acc * 100}\n')
# CUDA_VISIBLE_DEVICES=2 python log_reg.py
# y_train = np.asarray(y_train)
# y_test = np.asarray(y_test)
# print(f"After getting data")
# start_time = time.time()
# vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english'))
# print(f"After setting the vectorizer")
# X_train = vectorizer.fit_transform(x_train).toarray()
# X_test = vectorizer.transform(x_test).toarray()
# print(f"X_train: {X_train.shape}, X_test: {X_test.shape}")
# # lr_classfier = LogisticRegression()
# # lr_classfier.fit(X_train,y_train)
# # train_acc = lr_classfier.score(X_train,y_train)
# # test_acc = lr_classfier.score(X_test,y_test)
# svc = SVC()
# svc.fit(X_train,y_train)
# train_acc = svc.score(X_train,y_train)
# test_acc = svc.score(X_test,y_test)
# print('Train Acc: %.2f' % float(train_acc*100), 'Test Acc: %.2f' % float(test_acc*100),'Time: %.4f' % float(time.time()-start_time))
# # CUDA_VISIBLE_DEVICES=2 python log_reg.py