import numpy as np import re import time from nltk.corpus import stopwords from sklearn.feature_extraction.text import TfidfVectorizer # from sklearn.linear_model import LogisticRegression # from sklearn.svm import SVC import ssl import os import nltk try: _create_unverified_https_context = ssl._create_unverified_context except AttributeError: pass else: ssl._create_default_https_context = _create_unverified_https_context # print(f"nltk version: {nltk.__version__}") # nltk.download('stopwords') # class SVMModel: def __init__(self, learning_rate=0.01, lambda_param=0.01, n_iters=1000): self.learning_rate = learning_rate self.lambda_param = lambda_param self.n_iters = n_iters self.w = None self.b = None self.X_train = None self.X_test = None self.y_train = None self.y_test = None def fit(self, X, y): n_samples, n_features = X.shape y_ = np.where(y <= 0, -1, 1) # Convert labels to -1 and 1 print(f"y_ max: {np.max(y_)}, y_ min: {np.min(y_)}") self.w = np.zeros(n_features) self.b = 0 self.lambda_param = 1.0 / float(n_samples) for _ in range(self.n_iters): print(f"Epoch: {_}") for idx, x_i in enumerate(X): condition = y_[idx] * (np.dot(x_i, self.w) - self.b) >= 1 if condition: self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w) else: self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w - np.dot(x_i, y_[idx])) self.b = self.b - self.learning_rate * y_[idx] if _ % 1 == 0: # print(f"Iteration: {_}") st_time = time.time() self.test() print(f"Time: {time.time() - st_time}") def predict(self, X): linear_output = np.matmul(X, self.w[:, None]) - self.b # [] return np.sign(linear_output[:, 0]) def test(self, ): # test_ours(self, ): linear_output = self.predict(self.X_test) print(f"linear_output: {linear_output.shape}, self.X_test: {self.X_test.shape}") acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32)) print(f"Test Acc: {acc}") return linear_output # weights_dict = self.svm_model.get_weights_dict() def get_weights_dict(self, ): weights_dict = { 'w': self.w, 'b': self.b } return weights_dict class SVM: def __init__(self, ): # file_path = self.x_train = [] self.y_train = [] self.x_test = [] self.y_test = [] self.data_folder = '.' print(f"Start loading data") # self._load_data() print(f"Setting vectorizer") # self.vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english')) # parmas_dict = np.load("svm_vectorizer.npy", allow_pickle=True).item() # print(f"parmas_dict: {parmas_dict.keys()}") # self.vectorizer.set_params(**parmas_dict) import pickle self.vectorizer = pickle.load(open("tfidf.pickle", "rb")) # print(f"Start preprocessing data") # self._preprocess_data() # self.setup_model() self.setup_model_ours() pass def _load_data(self, ): file_path = '.' x_train = [] y_train = [] with open(os.path.join(self.data_folder, 'train.csv'), "r") as f: for line in f: l = line.strip().split(',') senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower() x_train.append(text) y_train.append(int(senti[1]) - 1) f.close() x_test = [] y_test = [] with open(os.path.join(self.data_folder, 'test.csv'), "r") as f: for line in f: l = line.strip().split(',') senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower() x_test.append(text) y_test.append(int(senti[1]) - 1) f.close() self.x_train = x_train self.x_test = x_test self.y_train = np.array(y_train, dtype=np.int32) self.y_test = np.array(y_test, dtype=np.int32) print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}") def _preprocess_data(self, ): self.X_train = self.vectorizer.fit_transform(self.x_train).toarray() import pickle # self.vectorizer_params = self.vectorizer.get_params() # np.save("svm_vectorizer.npy", self.vectorizer_params) pickle.dump(self.vectorizer, open("tfidf.pickle", "wb")) self.X_test = self.vectorizer.transform(self.x_test).toarray() # self.X_train = self.vectorizer.transform def setup_model_ours(self, ): self.svm_model = SVMModel() def train_ours(self, ): self.y_train = self.y_train.astype(np.float32) self.y_test = self.y_test.astype(np.float32) self.y_train = self.y_train * 2 - 1.0 self.y_test = self.y_test * 2 - 1.0 print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}") self.svm_model.X_train = self.X_train self.svm_model.X_test = self.X_test self.svm_model.y_train = self.y_train self.svm_model.y_test = self.y_test self.svm_model.fit(self.X_train, self.y_train) def test_ours(self, ): linear_output = self.svm_model.test() acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32)) print(f"Test Acc: {acc}") weights_dict = self.svm_model.get_weights_dict() np.save("svm_weights.npy", weights_dict) print(f"svm weights saved to svm_weights.npy") # def setup_model(self, ): # self.svc = SVC() # def train(self, ): # self.svc.fit(self.X_train, self.y_train) # def test(self, ): # self.train_acc = self.svc.score(self.X_train, self.y_train) # self.test_acc = self.svc.score(self.X_test, self.y_test) # print(f'Train Acc: {self.train_acc * 100}\n', f'Test Acc: {self.test_acc * 100}\n') # CUDA_VISIBLE_DEVICES=2 python log_reg.py # y_train = np.asarray(y_train) # y_test = np.asarray(y_test) # print(f"After getting data") # start_time = time.time() # vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english')) # print(f"After setting the vectorizer") # X_train = vectorizer.fit_transform(x_train).toarray() # X_test = vectorizer.transform(x_test).toarray() # print(f"X_train: {X_train.shape}, X_test: {X_test.shape}") # # lr_classfier = LogisticRegression() # # lr_classfier.fit(X_train,y_train) # # train_acc = lr_classfier.score(X_train,y_train) # # test_acc = lr_classfier.score(X_test,y_test) # svc = SVC() # svc.fit(X_train,y_train) # train_acc = svc.score(X_train,y_train) # test_acc = svc.score(X_test,y_test) # print('Train Acc: %.2f' % float(train_acc*100), 'Test Acc: %.2f' % float(test_acc*100),'Time: %.4f' % float(time.time()-start_time)) # # CUDA_VISIBLE_DEVICES=2 python log_reg.py