Spaces:
Sleeping
Sleeping
import numpy as np | |
import re | |
import time | |
from nltk.corpus import stopwords | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
# from sklearn.linear_model import LogisticRegression | |
# from sklearn.svm import SVC | |
import ssl | |
import os | |
import nltk | |
try: | |
_create_unverified_https_context = ssl._create_unverified_context | |
except AttributeError: | |
pass | |
else: | |
ssl._create_default_https_context = _create_unverified_https_context | |
# print(f"nltk version: {nltk.__version__}") | |
# nltk.download('stopwords') | |
# | |
class SVMModel: | |
def __init__(self, learning_rate=0.01, lambda_param=0.01, n_iters=1000): | |
self.learning_rate = learning_rate | |
self.lambda_param = lambda_param | |
self.n_iters = n_iters | |
self.w = None | |
self.b = None | |
self.X_train = None | |
self.X_test = None | |
self.y_train = None | |
self.y_test = None | |
def fit(self, X, y): | |
n_samples, n_features = X.shape | |
y_ = np.where(y <= 0, -1, 1) # Convert labels to -1 and 1 | |
print(f"y_ max: {np.max(y_)}, y_ min: {np.min(y_)}") | |
self.w = np.zeros(n_features) | |
self.b = 0 | |
self.lambda_param = 1.0 / float(n_samples) | |
for _ in range(self.n_iters): | |
print(f"Epoch: {_}") | |
for idx, x_i in enumerate(X): | |
condition = y_[idx] * (np.dot(x_i, self.w) - self.b) >= 1 | |
if condition: | |
self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w) | |
else: | |
self.w = self.w - self.learning_rate * (2 * self.lambda_param * self.w - np.dot(x_i, y_[idx])) | |
self.b = self.b - self.learning_rate * y_[idx] | |
if _ % 1 == 0: | |
# print(f"Iteration: {_}") | |
st_time = time.time() | |
self.test() | |
print(f"Time: {time.time() - st_time}") | |
def predict(self, X): | |
linear_output = np.matmul(X, self.w[:, None]) - self.b # [] | |
return np.sign(linear_output[:, 0]) | |
def test(self, ): | |
# test_ours(self, ): | |
linear_output = self.predict(self.X_test) | |
print(f"linear_output: {linear_output.shape}, self.X_test: {self.X_test.shape}") | |
acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32)) | |
print(f"Test Acc: {acc}") | |
return linear_output | |
# weights_dict = self.svm_model.get_weights_dict() | |
def get_weights_dict(self, ): | |
weights_dict = { | |
'w': self.w, | |
'b': self.b | |
} | |
return weights_dict | |
class SVM: | |
def __init__(self, ): | |
# file_path = | |
self.x_train = [] | |
self.y_train = [] | |
self.x_test = [] | |
self.y_test = [] | |
self.data_folder = '.' | |
print(f"Start loading data") | |
# self._load_data() | |
print(f"Setting vectorizer") | |
# self.vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english')) | |
# parmas_dict = np.load("svm_vectorizer.npy", allow_pickle=True).item() | |
# print(f"parmas_dict: {parmas_dict.keys()}") | |
# self.vectorizer.set_params(**parmas_dict) | |
import pickle | |
self.vectorizer = pickle.load(open("tfidf.pickle", "rb")) | |
# print(f"Start preprocessing data") | |
# self._preprocess_data() | |
# self.setup_model() | |
self.setup_model_ours() | |
pass | |
def _load_data(self, ): | |
file_path = '.' | |
x_train = [] | |
y_train = [] | |
with open(os.path.join(self.data_folder, 'train.csv'), "r") as f: | |
for line in f: | |
l = line.strip().split(',') | |
senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower() | |
x_train.append(text) | |
y_train.append(int(senti[1]) - 1) | |
f.close() | |
x_test = [] | |
y_test = [] | |
with open(os.path.join(self.data_folder, 'test.csv'), "r") as f: | |
for line in f: | |
l = line.strip().split(',') | |
senti, text = l[0], re.sub('[^a-zA-Z \']', '', re.sub('\\\\n', ' ', ','.join(l[1:]))).lower() | |
x_test.append(text) | |
y_test.append(int(senti[1]) - 1) | |
f.close() | |
self.x_train = x_train | |
self.x_test = x_test | |
self.y_train = np.array(y_train, dtype=np.int32) | |
self.y_test = np.array(y_test, dtype=np.int32) | |
print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}") | |
def _preprocess_data(self, ): | |
self.X_train = self.vectorizer.fit_transform(self.x_train).toarray() | |
import pickle | |
# self.vectorizer_params = self.vectorizer.get_params() | |
# np.save("svm_vectorizer.npy", self.vectorizer_params) | |
pickle.dump(self.vectorizer, open("tfidf.pickle", "wb")) | |
self.X_test = self.vectorizer.transform(self.x_test).toarray() | |
# self.X_train = self.vectorizer.transform | |
def setup_model_ours(self, ): | |
self.svm_model = SVMModel() | |
def train_ours(self, ): | |
self.y_train = self.y_train.astype(np.float32) | |
self.y_test = self.y_test.astype(np.float32) | |
self.y_train = self.y_train * 2 - 1.0 | |
self.y_test = self.y_test * 2 - 1.0 | |
print(f"max_y_train: {np.max(self.y_train)}, min_y_train: {np.min(self.y_train)}") | |
self.svm_model.X_train = self.X_train | |
self.svm_model.X_test = self.X_test | |
self.svm_model.y_train = self.y_train | |
self.svm_model.y_test = self.y_test | |
self.svm_model.fit(self.X_train, self.y_train) | |
def test_ours(self, ): | |
linear_output = self.svm_model.test() | |
acc = np.mean((linear_output == np.sign(self.y_test)).astype(np.float32)) | |
print(f"Test Acc: {acc}") | |
weights_dict = self.svm_model.get_weights_dict() | |
np.save("svm_weights.npy", weights_dict) | |
print(f"svm weights saved to svm_weights.npy") | |
# def setup_model(self, ): | |
# self.svc = SVC() | |
# def train(self, ): | |
# self.svc.fit(self.X_train, self.y_train) | |
# def test(self, ): | |
# self.train_acc = self.svc.score(self.X_train, self.y_train) | |
# self.test_acc = self.svc.score(self.X_test, self.y_test) | |
# print(f'Train Acc: {self.train_acc * 100}\n', f'Test Acc: {self.test_acc * 100}\n') | |
# CUDA_VISIBLE_DEVICES=2 python log_reg.py | |
# y_train = np.asarray(y_train) | |
# y_test = np.asarray(y_test) | |
# print(f"After getting data") | |
# start_time = time.time() | |
# vectorizer = TfidfVectorizer(max_features=4000, min_df=7, max_df=0.8, stop_words=stopwords.words('english')) | |
# print(f"After setting the vectorizer") | |
# X_train = vectorizer.fit_transform(x_train).toarray() | |
# X_test = vectorizer.transform(x_test).toarray() | |
# print(f"X_train: {X_train.shape}, X_test: {X_test.shape}") | |
# # lr_classfier = LogisticRegression() | |
# # lr_classfier.fit(X_train,y_train) | |
# # train_acc = lr_classfier.score(X_train,y_train) | |
# # test_acc = lr_classfier.score(X_test,y_test) | |
# svc = SVC() | |
# svc.fit(X_train,y_train) | |
# train_acc = svc.score(X_train,y_train) | |
# test_acc = svc.score(X_test,y_test) | |
# print('Train Acc: %.2f' % float(train_acc*100), 'Test Acc: %.2f' % float(test_acc*100),'Time: %.4f' % float(time.time()-start_time)) | |
# # CUDA_VISIBLE_DEVICES=2 python log_reg.py | |