File size: 6,097 Bytes
60cb352
 
 
 
 
 
 
 
 
 
 
 
 
8467cc8
 
60cb352
 
 
186a961
 
60cb352
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186a961
60cb352
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8467cc8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import streamlit as st
import time
import os
import logging
import torch
import json
import string
import re
import string
import nltk
import numpy as np
import torch.nn as nn
import transformers
nltk.download('wordnet')
nltk.download('stopwords')
from collections import Counter
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

stop_words = set(stopwords.words('english'))

def preprocess_single_string(input_string: str, seq_len: int, vocab_to_int: dict):
    preprocessed_string = data_preprocessing(input_string)
    result_list = []
    for word in preprocessed_string.split():
        try:
            result_list.append(vocab_to_int[word])
        except KeyError as e:
            continue
    result_padded = padding([result_list], seq_len)[0]
    return torch.tensor(result_padded)



def padding(reviews_int: list, seq_len: int):
    features = np.zeros((len(reviews_int), seq_len), dtype = int)
    for i, review in enumerate(reviews_int):
        if len(review) <= seq_len:
            zeros = list(np.zeros(seq_len - len(review)))
            new = zeros + review
        else:
            new = review[: seq_len]
        features[i, :] = np.array(new)
    return features


def data_preprocessing(text: str):
    wn_lemmatizer = WordNetLemmatizer()
    text = text.lower()
    text = re.sub('<.*?>', '', text) 
    text = ''.join([c for c in text if c not in string.punctuation])
    text = [wn_lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
    text = ' '.join(text)
    return text

with open('lstm_vocab_to_int.json') as json_file:
    vocab_to_int = json.load(json_file)

with open('lstm_embedding_matrix.npy', 'rb') as f:
    embedding_matrix = np.load(f)

embedding_layer = torch.nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix))

class LSTMClassifier(nn.Module):
    def __init__(self, embedding_dim: int, seq_len:int, hidden_size:int = 32, dropout:int = 0, num_layers:int = 1) -> None:
        super().__init__()

        self.embedding_dim = embedding_dim
        self.hidden_size = hidden_size
        self.embedding = embedding_layer
        self.dropout = dropout
        self.num_layers = num_layers
        self.seq_len = seq_len
        self.lstm = nn.LSTM(
            input_size=self.embedding_dim,
            hidden_size=self.hidden_size,
            batch_first=True,
            bidirectional=True,
            dropout=self.dropout,
            num_layers=self.num_layers
        )
        self.linear    = nn.Sequential(
            nn.Linear(self.hidden_size * self.seq_len * 2, 128),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        embeddings = self.embedding(x)
        output, _ = self.lstm(embeddings)
        output = output.contiguous().view(output.size(0), -1)
        out = self.linear(output.squeeze(0))
        return out
    
bert_model_class = transformers.DistilBertModel
bert_tokenizer_class = transformers.DistilBertTokenizer
bert_pretrained_weights = torch.load('basic_bert_weights.pt', map_location=torch.device('cpu'))
bert_tokenizer = bert_tokenizer_class.from_pretrained('distilbert-base-uncased')
bert_basic_model = bert_model_class.from_pretrained('distilbert-base-uncased')

class BertReviews(nn.Module):
    def __init__(self, model):
        super(BertReviews, self).__init__()
        self.bert = model
        for param in self.bert.parameters():
            param.requires_grad = False
        for i in range(6):
            self.bert.transformer.layer[i].output_layer_norm.weight.requires_grad = True
            self.bert.transformer.layer[i].output_layer_norm.bias.requires_grad = True
        self.fc  = nn.Linear(768, 1)

    def forward(self, samples, att_masks):

        embeddings = self.bert(samples, attention_mask=att_masks)
        model_out = self.fc(embeddings[0][:, 0, :])

        return embeddings, model_out
    
bert_model = BertReviews(bert_basic_model)
bert_model.load_state_dict(torch.load('bert_weights.pt', map_location=torch.device('cpu')))
bert_model.to('cpu').eval()

model_lstm = LSTMClassifier(embedding_dim=64, hidden_size=64, seq_len = 150, dropout=0.5, num_layers=4)
model_lstm.load_state_dict(torch.load('lstm_model_weights.pt', map_location=torch.device('cpu')))
model_lstm.to('cpu').eval()



def predict_sentence_lstm(text: str):
    start_time = time.time()
    text = preprocess_single_string(text, 150, vocab_to_int)
    res = int(torch.sigmoid(model_lstm(text.unsqueeze(0))).cpu().detach().numpy().round())
    end_time = time.time() 
    execution_time = end_time - start_time
    return res, execution_time

def predict_sentence_bert(text: str):
    start_time = time.time()
    text = bert_tokenizer.encode(text, add_special_tokens=True, truncation=True, max_length=200)
    text = np.array([text + [0]*(200-len(text))])
    attention_mask = torch.Tensor(np.where(text != 0, 1, 0)).to(torch.int64)
    text = torch.Tensor(text).to(torch.int64)
    # output = bert_model(text, attention_mask)[1]
    # res = output.squeeze().detach().numpy().round()

    res = int(torch.sigmoid(bert_model(text, attention_mask)[1]).cpu().detach().numpy().round())
    end_time = time.time() 
    execution_time = end_time - start_time
    return res, execution_time

reses = {0: 'negative', 1: 'positive'}

def process_text(input_text):
    res_lstm, time_lstm = predict_sentence_lstm(input_text)
    res_bert, time_bert = predict_sentence_bert(input_text)
    st.write('Results:')
    st.write(f'LSTM: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.')
    st.write(f'Upgraded Bert: {reses[res_bert]}, execution time: {time_bert:.2f} seconds.')

st.title('Film reviews classifier')
st.write('Write a film review in a box below, and the application, powered by two NLP models (LSTM and upgraded Bert), will tell if it is a positive or a negative review.')

user_input = st.text_area("Enter your text:")
if st.button("Send a review for processing"):
    if user_input:
        processed_text = process_text(user_input)
    else:
        st.warning("Please enter some text before processing.")