Spaces:
Sleeping
Sleeping
import streamlit as st | |
import time | |
import os | |
import logging | |
import torch | |
import json | |
import string | |
import re | |
import string | |
import nltk | |
import numpy as np | |
import torch.nn as nn | |
import transformers | |
import lightgbm as lgb | |
import pickle | |
nltk.download('wordnet') | |
nltk.download('stopwords') | |
from collections import Counter | |
from nltk.corpus import stopwords | |
from nltk.stem import WordNetLemmatizer | |
from nltk.tokenize import RegexpTokenizer | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.linear_model import LogisticRegression | |
stop_words = set(stopwords.words('english')) | |
with open('logreg.pkl', 'rb') as f: | |
logreg = pickle.load(f) | |
with open('tf.pkl', 'rb') as f: | |
tf = pickle.load(f) | |
def classical_pipeline(text): | |
text = text.lower() | |
text = re.sub(r'\d+', ' ', text) | |
text = text.translate(str.maketrans('', '', string.punctuation)) | |
text = re.sub(r'\n', '', text) | |
wn_lemmatizer = WordNetLemmatizer() | |
text = ' '.join([wn_lemmatizer.lemmatize(word) for word in text.split()]) | |
reg_tokenizer = RegexpTokenizer('\w+') | |
text = reg_tokenizer.tokenize_sents([text]) | |
sw = stopwords.words('english') | |
text = ' '.join([word for word in text[0] if word not in sw]) | |
text = tf.transform([text]) | |
return text | |
def preprocess_single_string(input_string: str, seq_len: int, vocab_to_int: dict): | |
preprocessed_string = data_preprocessing(input_string) | |
result_list = [] | |
for word in preprocessed_string.split(): | |
try: | |
result_list.append(vocab_to_int[word]) | |
except KeyError as e: | |
continue | |
result_padded = padding([result_list], seq_len)[0] | |
return torch.tensor(result_padded) | |
def padding(reviews_int: list, seq_len: int): | |
features = np.zeros((len(reviews_int), seq_len), dtype = int) | |
for i, review in enumerate(reviews_int): | |
if len(review) <= seq_len: | |
zeros = list(np.zeros(seq_len - len(review))) | |
new = zeros + review | |
else: | |
new = review[: seq_len] | |
features[i, :] = np.array(new) | |
return features | |
def data_preprocessing(text: str): | |
wn_lemmatizer = WordNetLemmatizer() | |
text = text.lower() | |
text = re.sub('<.*?>', '', text) | |
text = ''.join([c for c in text if c not in string.punctuation]) | |
text = [wn_lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words] | |
text = ' '.join(text) | |
return text | |
with open('lstm_vocab_to_int.json') as json_file: | |
vocab_to_int = json.load(json_file) | |
with open('lstm_embedding_matrix.npy', 'rb') as f: | |
embedding_matrix = np.load(f) | |
embedding_layer = torch.nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix)) | |
class LSTMClassifier(nn.Module): | |
def __init__(self, embedding_dim: int, seq_len:int, hidden_size:int = 32, dropout:int = 0, num_layers:int = 1) -> None: | |
super().__init__() | |
self.embedding_dim = embedding_dim | |
self.hidden_size = hidden_size | |
self.embedding = embedding_layer | |
self.dropout = dropout | |
self.num_layers = num_layers | |
self.seq_len = seq_len | |
self.lstm = nn.LSTM( | |
input_size=self.embedding_dim, | |
hidden_size=self.hidden_size, | |
batch_first=True, | |
bidirectional=True, | |
dropout=self.dropout, | |
num_layers=self.num_layers | |
) | |
self.linear = nn.Sequential( | |
nn.Linear(self.hidden_size * self.seq_len * 2, 128), | |
nn.Linear(128, 1) | |
) | |
def forward(self, x): | |
embeddings = self.embedding(x) | |
output, _ = self.lstm(embeddings) | |
output = output.contiguous().view(output.size(0), -1) | |
out = self.linear(output.squeeze(0)) | |
return out | |
bert_model_class = transformers.DistilBertModel | |
bert_tokenizer_class = transformers.DistilBertTokenizer | |
bert_pretrained_weights = torch.load('basic_bert_weights.pt', map_location=torch.device('cpu')) | |
bert_tokenizer = bert_tokenizer_class.from_pretrained('distilbert-base-uncased') | |
bert_basic_model = bert_model_class.from_pretrained('distilbert-base-uncased') | |
class BertReviews(nn.Module): | |
def __init__(self, model): | |
super(BertReviews, self).__init__() | |
self.bert = model | |
for param in self.bert.parameters(): | |
param.requires_grad = False | |
for i in range(6): | |
self.bert.transformer.layer[i].output_layer_norm.weight.requires_grad = True | |
self.bert.transformer.layer[i].output_layer_norm.bias.requires_grad = True | |
self.fc = nn.Linear(768, 1) | |
def forward(self, samples, att_masks): | |
embeddings = self.bert(samples, attention_mask=att_masks) | |
model_out = self.fc(embeddings[0][:, 0, :]) | |
return embeddings, model_out | |
bert_model = BertReviews(bert_basic_model) | |
bert_model.load_state_dict(torch.load('bert_weights.pt', map_location=torch.device('cpu'))) | |
bert_model.to('cpu').eval() | |
model_lstm = LSTMClassifier(embedding_dim=64, hidden_size=64, seq_len = 150, dropout=0.5, num_layers=4) | |
model_lstm.load_state_dict(torch.load('lstm_model_weights.pt', map_location=torch.device('cpu'))) | |
model_lstm.to('cpu').eval() | |
def predict_sentence_classical(text: str): | |
start_time = time.time() | |
text = classical_pipeline(text) | |
res = logreg.predict(text)[0] | |
end_time = time.time() | |
execution_time = end_time - start_time | |
return res, execution_time | |
def predict_sentence_lstm(text: str): | |
start_time = time.time() | |
text = preprocess_single_string(text, 150, vocab_to_int) | |
res = int(torch.sigmoid(model_lstm(text.unsqueeze(0))).cpu().detach().numpy().round()) | |
end_time = time.time() | |
execution_time = end_time - start_time | |
return res, execution_time | |
def predict_sentence_bert(text: str): | |
start_time = time.time() | |
text = bert_tokenizer.encode(text, add_special_tokens=True, truncation=True, max_length=200) | |
text = np.array([text + [0]*(200-len(text))]) | |
attention_mask = torch.Tensor(np.where(text != 0, 1, 0)).to(torch.int64) | |
text = torch.Tensor(text).to(torch.int64) | |
# output = bert_model(text, attention_mask)[1] | |
# res = output.squeeze().detach().numpy().round() | |
res = int(torch.sigmoid(bert_model(text, attention_mask)[1]).cpu().detach().numpy().round()) | |
end_time = time.time() | |
execution_time = end_time - start_time | |
return res, execution_time | |
reses = {0: 'negative', 1: 'positive'} | |
def process_text(input_text): | |
res_classical, time_classical = predict_sentence_classical(input_text) | |
res_lstm, time_lstm = predict_sentence_lstm(input_text) | |
res_bert, time_bert = predict_sentence_bert(input_text) | |
st.write('Results:') | |
st.write(f'Logistic regression: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.') | |
st.write(f'LSTM: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.') | |
st.write(f'Upgraded Bert: {reses[res_bert]}, execution time: {time_bert:.2f} seconds.') | |
st.title('Film reviews classifier') | |
st.write('Write a film review in a box below, and the application, powered by three NLP models (logistic regression, LSTM and upgraded Bert), will tell if it is a positive or a negative review.') | |
user_input = st.text_area("Enter your text:") | |
if st.button("Send a review for processing"): | |
if user_input: | |
processed_text = process_text(user_input) | |
else: | |
st.warning("Please enter some text before processing.") | |