Deployment-Trial / customFunctions.py
mo01018's picture
Add tag encoder to FFNN (#15)
4433805 verified
import pandas as pd
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from seqeval.metrics import accuracy_score, f1_score, classification_report
from seqeval.scheme import IOB2
import sklearn_crfsuite
from sklearn_crfsuite import metrics
from sklearn.metrics.pairwise import cosine_similarity
from gensim.models import Word2Vec, KeyedVectors
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.feature_extraction.text import TfidfVectorizer
import gensim.downloader as api
from itertools import product
from sklearn.model_selection import train_test_split, GridSearchCV
from joblib import dump
class preprocess_sentences():
def __init__(self):
pass
def fit(self, X, y=None):
print('PREPROCESSING')
return self
def transform(self, X):
# X = train['tokens'], y =
sentences = X.apply(lambda x: x.tolist()).tolist()
print('--> Preprocessing complete \n', flush=True)
return sentences
EMBEDDING_DIM = 500
PAD_VALUE= -1
MAX_LENGTH = 376
BATCH_SIZE = 16
class Word2VecTransformer():
def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM):
self.model = None
self.vector_size = vector_size
self.window = window
self.min_count = min_count
self.workers = workers
self.embedding_dim = embedding_dim
def fit(self, X, y):
# https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging
# https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function
print('WORD2VEC:', flush=True)
# This fits the word2vec model
self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window
, min_count=self.min_count, workers=self.workers)
print('--> Word2Vec Fitted', flush=True)
return self
def transform(self, X):
# This bit should transform the sentences
embedded_sentences = []
for sentence in X:
sentence_vectors = []
for word in sentence:
if word in self.model.wv:
vec = self.model.wv[word]
else:
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,))
sentence_vectors.append(vec)
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32))
print('--> Embeddings Complete \n', flush=True)
return embedded_sentences
class Word2VecTransformer_CRF():
def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM):
self.model = None
self.vector_size = vector_size
self.window = window
self.min_count = min_count
self.workers = workers
self.embedding_dim = embedding_dim
def fit(self, X, y):
# https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging
# https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function
print('WORD2VEC:', flush=True)
# This fits the word2vec model
self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window
, min_count=self.min_count, workers=self.workers)
print('--> Word2Vec Fitted', flush=True)
return self
def transform(self, X):
# This bit should transform the sentences
embedded_sentences = []
for sentence in X:
sentence_vectors = []
for word in sentence:
features = {
'bias': 1.0,
'word.lower()': word.lower(),
'word[-3:]': word[-3:],
'word[-2:]': word[-2:],
'word.isupper()': word.isupper(),
'word.istitle()': word.istitle(),
'word.isdigit()': word.isdigit(),
}
if word in self.model.wv:
vec = self.model.wv[word]
else:
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,))
# https://stackoverflow.com/questions/58736548/how-to-use-word-embedding-as-features-for-crf-sklearn-crfsuite-model-training
for index in range(len(vec)):
features[f"embedding_{index}"] = vec[index]
sentence_vectors.append(features)
embedded_sentences.append(sentence_vectors)
print('--> Embeddings Complete \n', flush=True)
return embedded_sentences
class tfidfTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
self.model = None
self.embedding_dim = None
self.idf = None
self.vocab_size = None
self.vocab = None
def fit(self, X, y = None):
print('TFIDF:', flush=True)
joined_sentences = [' '.join(tokens) for tokens in X]
self.model = TfidfVectorizer()
self.model.fit(joined_sentences)
self.vocab = self.model.vocabulary_
self.idf = self.model.idf_
self.vocab_size = len(self.vocab)
self.embedding_dim = self.vocab_size
print('--> TFIDF Fitted', flush=True)
return self
def transform(self, X):
embedded = []
for sentence in X:
sent_vecs = []
token_counts = {}
for word in sentence:
token_counts[word] = token_counts.get(word, 0) + 1
sent_len = len(sentence)
for word in sentence:
vec = np.zeros(self.vocab_size)
if word in self.vocab:
tf = token_counts[word] / sent_len
token_idx = self.vocab[word]
vec[token_idx] = tf * self.idf[token_idx]
sent_vecs.append(vec)
embedded.append(torch.tensor(sent_vecs, dtype=torch.float32))
print('--> Embeddings Complete \n', flush=True)
return embedded
class GloveTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
self.model = None
self.embedding_dim = 300
def fit(self, X, y=None):
print('GLOVE', flush = True)
self.model = api.load('glove-wiki-gigaword-300')
print('--> Glove Downloaded', flush=True)
return self
def transform(self, X):
# This bit should transform the sentences
print('--> Beginning embeddings', flush=True)
embedded_sentences = []
for sentence in X:
sentence_vectors = []
for word in sentence:
if word in self.model:
vec = self.model[word]
else:
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,))
sentence_vectors.append(vec)
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32))
print('--> Embeddings Complete \n', flush=True)
return embedded_sentences
class Bio2VecTransformer():
def __init__(self, vector_size = 200, window = 5, min_count = 1, workers = 1, embedding_dim=200):
self.model = None
self.vector_size = vector_size
self.window = window
self.min_count = min_count
self.workers = workers
self.embedding_dim = embedding_dim
def fit(self, X, y):
print('BIO2VEC:', flush=True)
# https://stackoverflow.com/questions/58055415/how-to-load-bio2vec-in-gensim
self.model = Bio2VecModel
print('--> BIO2VEC Fitted', flush=True)
return self
def transform(self, X):
# This bit should transform the sentences
embedded_sentences = []
for sentence in X:
sentence_vectors = []
for word in sentence:
if word in self.model:
vec = self.model[word]
else:
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,))
sentence_vectors.append(vec)
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32))
print('--> Embeddings Complete \n', flush=True)
return embedded_sentences
class BiLSTM_NER(nn.Module):
def __init__(self,input_dim, hidden_dim, tagset_size):
super(BiLSTM_NER, self).__init__()
# Embedding layer
#Freeze= false means that it will fine tune
#self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze = False, padding_idx=-1)
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, bidirectional=True)
self.fc = nn.Linear(hidden_dim*2, tagset_size)
def forward(self, sentences):
#embeds = self.embedding(sentences)
lstm_out, _ = self.lstm(sentences)
tag_scores = self.fc(lstm_out)
return tag_scores
def pad(batch):
# batch is a list of (X, y) pairs
X_batch, y_batch = zip(*batch)
# Convert to tensors
X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in X_batch]
y_batch = [torch.tensor(seq, dtype=torch.long) for seq in y_batch]
# Pad sequences
X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE)
y_padded = pad_sequence(y_batch, batch_first=True, padding_value=PAD_VALUE)
return X_padded, y_padded
def pred_pad(batch):
X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in batch]
X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE)
return X_padded
class Ner_Dataset(Dataset):
def __init__(self, X, y):
self.X = X
self.y = y
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
class LSTM(BaseEstimator, ClassifierMixin):
def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None):
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.epochs = epochs
self.learning_rate = learning_rate
self.tag2idx = tag2idx
def fit(self, embedded, encoded_tags):
#print('LSTM started:', flush=True)
data = Ner_Dataset(embedded, encoded_tags)
train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad)
self.model = self.train_LSTM(train_loader)
#print('--> Epochs: ', self.epochs, flush=True)
#print('--> Learning Rate: ', self.learning_rate)
return self
def predict(self, X):
# Switch to evaluation mode
test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad)
self.model.eval()
predictions = []
# Iterate through test data
with torch.no_grad():
for X_batch in test_loader:
X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
tag_scores = self.model(X_batch)
_, predicted_tags = torch.max(tag_scores, dim=2)
flattened_pred = predicted_tags.view(-1)
predictions.append(list(flattened_pred.cpu().numpy()))
#print('before concat',predictions)
#predictions = np.concatenate(predictions)
#print('after concat',predictions)
tag_encoder = LabelEncoder()
tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF'])
str_pred = []
for sentence in predictions:
str_sentence = tag_encoder.inverse_transform(sentence)
str_pred.append(list(str_sentence))
return str_pred
def train_LSTM(self, train_loader):
input_dim = self.embedding_dim
# Instantiate the lstm_model
lstm_model = BiLSTM_NER(input_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx))
lstm_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
# Loss function and optimizer
loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding
optimizer = optim.Adam(lstm_model.parameters(), lr=self.learning_rate)
#print('--> Training LSTM')
# Training loop
for epoch in range(self.epochs):
total_loss = 0
total_correct = 0
total_words = 0
lstm_model.train() # Set model to training mode
for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
# Zero gradients
optimizer.zero_grad()
# Forward pass
tag_scores = lstm_model(X_batch)
# Reshape and compute loss (ignore padded values)
loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1))
# Backward pass and optimization
loss.backward()
optimizer.step()
total_loss += loss.item()
# Compute accuracy for this batch
# Get the predicted tags (index of max score)
_, predicted_tags = torch.max(tag_scores, dim=2)
# Flatten the tensors to compare word-by-word
flattened_pred = predicted_tags.view(-1)
flattened_true = y_batch.view(-1)
# Exclude padding tokens from the accuracy calculation
mask = flattened_true != PAD_VALUE
correct = (flattened_pred[mask] == flattened_true[mask]).sum().item()
# Count the total words in the batch (ignoring padding)
total_words_batch = mask.sum().item()
# Update total correct and total words
total_correct += correct
total_words += total_words_batch
avg_loss = total_loss / len(train_loader)
avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage
#print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%')
return lstm_model
# Define the FeedForward NN Model
class FeedForwardNN_NER(nn.Module):
def __init__(self, embedding_dim, hidden_dim, tagset_size):
super(FeedForwardNN_NER, self).__init__()
self.fc1 = nn.Linear(embedding_dim, hidden_dim)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_dim, tagset_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
logits = self.fc2(x)
return logits
class FeedforwardNN(BaseEstimator, ClassifierMixin):
def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None):
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.epochs = epochs
self.learning_rate = learning_rate
self.tag2idx = tag2idx
def fit(self, embedded, encoded_tags):
print('Feed Forward NN: ', flush=True)
data = Ner_Dataset(embedded, encoded_tags)
train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad)
self.model = self.train_FF(train_loader)
print('--> Feed Forward trained', flush=True)
return self
def predict(self, X):
# Switch to evaluation mode
test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad)
self.model.eval()
predictions = []
# Iterate through test data
with torch.no_grad():
for X_batch in test_loader:
X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
tag_scores = self.model(X_batch)
_, predicted_tags = torch.max(tag_scores, dim=2)
# Flatten the tensors to compare word-by-word
flattened_pred = predicted_tags.view(-1)
predictions.append(flattened_pred.cpu().numpy())
tag_encoder = LabelEncoder()
tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF'])
str_pred = []
for sentence in predictions:
str_sentence = tag_encoder.inverse_transform(sentence)
str_pred.append(list(str_sentence))
return str_pred
def train_FF(self, train_loader):
# Instantiate the lstm_model
ff_model = FeedForwardNN_NER(self.embedding_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx))
ff_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
# Loss function and optimizer
loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding
optimizer = optim.Adam(ff_model.parameters(), lr=self.learning_rate)
print('--> Training FF')
# Training loop
for epoch in range(self.epochs):
total_loss = 0
total_correct = 0
total_words = 0
ff_model.train() # Set model to training mode
for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
# Zero gradients
optimizer.zero_grad()
# Forward pass
tag_scores = ff_model(X_batch)
# Reshape and compute loss (ignore padded values)
loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1))
# Backward pass and optimization
loss.backward()
optimizer.step()
total_loss += loss.item()
# Compute accuracy for this batch
# Get the predicted tags (index of max score)
_, predicted_tags = torch.max(tag_scores, dim=2)
# Flatten the tensors to compare word-by-word
flattened_pred = predicted_tags.view(-1)
flattened_true = y_batch.view(-1)
# Exclude padding tokens from the accuracy calculation
mask = flattened_true != PAD_VALUE
correct = (flattened_pred[mask] == flattened_true[mask]).sum().item()
# Count the total words in the batch (ignoring padding)
total_words_batch = mask.sum().item()
# Update total correct and total words
total_correct += correct
total_words += total_words_batch
avg_loss = total_loss / len(train_loader)
avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage
print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%')
return ff_model
crf = sklearn_crfsuite.CRF(
algorithm='lbfgs',
c1=0.1,
c2=0.1,
max_iterations=100,
all_possible_transitions=True)