import pandas as pd import numpy as np import random import torch import torch.nn as nn import torch.optim as optim from seqeval.metrics import accuracy_score, f1_score, classification_report from seqeval.scheme import IOB2 import sklearn_crfsuite from sklearn_crfsuite import metrics from sklearn.metrics.pairwise import cosine_similarity from gensim.models import Word2Vec, KeyedVectors from sklearn.pipeline import Pipeline from sklearn.preprocessing import LabelEncoder from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pad_sequence from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin from sklearn.feature_extraction.text import TfidfVectorizer import gensim.downloader as api from itertools import product from sklearn.model_selection import train_test_split, GridSearchCV from joblib import dump class preprocess_sentences(): def __init__(self): pass def fit(self, X, y=None): print('PREPROCESSING') return self def transform(self, X): # X = train['tokens'], y = sentences = X.apply(lambda x: x.tolist()).tolist() print('--> Preprocessing complete \n', flush=True) return sentences EMBEDDING_DIM = 500 PAD_VALUE= -1 MAX_LENGTH = 376 BATCH_SIZE = 16 class Word2VecTransformer(): def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): self.model = None self.vector_size = vector_size self.window = window self.min_count = min_count self.workers = workers self.embedding_dim = embedding_dim def fit(self, X, y): # https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging # https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function print('WORD2VEC:', flush=True) # This fits the word2vec model self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window , min_count=self.min_count, workers=self.workers) print('--> Word2Vec Fitted', flush=True) return self def transform(self, X): # This bit should transform the sentences embedded_sentences = [] for sentence in X: sentence_vectors = [] for word in sentence: if word in self.model.wv: vec = self.model.wv[word] else: vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) sentence_vectors.append(vec) embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) print('--> Embeddings Complete \n', flush=True) return embedded_sentences class Word2VecTransformer_CRF(): def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): self.model = None self.vector_size = vector_size self.window = window self.min_count = min_count self.workers = workers self.embedding_dim = embedding_dim def fit(self, X, y): # https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging # https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function print('WORD2VEC:', flush=True) # This fits the word2vec model self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window , min_count=self.min_count, workers=self.workers) print('--> Word2Vec Fitted', flush=True) return self def transform(self, X): # This bit should transform the sentences embedded_sentences = [] for sentence in X: sentence_vectors = [] for word in sentence: features = { 'bias': 1.0, 'word.lower()': word.lower(), 'word[-3:]': word[-3:], 'word[-2:]': word[-2:], 'word.isupper()': word.isupper(), 'word.istitle()': word.istitle(), 'word.isdigit()': word.isdigit(), } if word in self.model.wv: vec = self.model.wv[word] else: vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) # https://stackoverflow.com/questions/58736548/how-to-use-word-embedding-as-features-for-crf-sklearn-crfsuite-model-training for index in range(len(vec)): features[f"embedding_{index}"] = vec[index] sentence_vectors.append(features) embedded_sentences.append(sentence_vectors) print('--> Embeddings Complete \n', flush=True) return embedded_sentences class tfidfTransformer(BaseEstimator, TransformerMixin): def __init__(self): self.model = None self.embedding_dim = None self.idf = None self.vocab_size = None self.vocab = None def fit(self, X, y = None): print('TFIDF:', flush=True) joined_sentences = [' '.join(tokens) for tokens in X] self.model = TfidfVectorizer() self.model.fit(joined_sentences) self.vocab = self.model.vocabulary_ self.idf = self.model.idf_ self.vocab_size = len(self.vocab) self.embedding_dim = self.vocab_size print('--> TFIDF Fitted', flush=True) return self def transform(self, X): embedded = [] for sentence in X: sent_vecs = [] token_counts = {} for word in sentence: token_counts[word] = token_counts.get(word, 0) + 1 sent_len = len(sentence) for word in sentence: vec = np.zeros(self.vocab_size) if word in self.vocab: tf = token_counts[word] / sent_len token_idx = self.vocab[word] vec[token_idx] = tf * self.idf[token_idx] sent_vecs.append(vec) embedded.append(torch.tensor(sent_vecs, dtype=torch.float32)) print('--> Embeddings Complete \n', flush=True) return embedded class GloveTransformer(BaseEstimator, TransformerMixin): def __init__(self): self.model = None self.embedding_dim = 300 def fit(self, X, y=None): print('GLOVE', flush = True) self.model = api.load('glove-wiki-gigaword-300') print('--> Glove Downloaded', flush=True) return self def transform(self, X): # This bit should transform the sentences print('--> Beginning embeddings', flush=True) embedded_sentences = [] for sentence in X: sentence_vectors = [] for word in sentence: if word in self.model: vec = self.model[word] else: vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) sentence_vectors.append(vec) embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) print('--> Embeddings Complete \n', flush=True) return embedded_sentences class Bio2VecTransformer(): def __init__(self, vector_size = 200, window = 5, min_count = 1, workers = 1, embedding_dim=200): self.model = None self.vector_size = vector_size self.window = window self.min_count = min_count self.workers = workers self.embedding_dim = embedding_dim def fit(self, X, y): print('BIO2VEC:', flush=True) # https://stackoverflow.com/questions/58055415/how-to-load-bio2vec-in-gensim self.model = Bio2VecModel print('--> BIO2VEC Fitted', flush=True) return self def transform(self, X): # This bit should transform the sentences embedded_sentences = [] for sentence in X: sentence_vectors = [] for word in sentence: if word in self.model: vec = self.model[word] else: vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) sentence_vectors.append(vec) embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) print('--> Embeddings Complete \n', flush=True) return embedded_sentences class BiLSTM_NER(nn.Module): def __init__(self,input_dim, hidden_dim, tagset_size): super(BiLSTM_NER, self).__init__() # Embedding layer #Freeze= false means that it will fine tune #self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze = False, padding_idx=-1) self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, bidirectional=True) self.fc = nn.Linear(hidden_dim*2, tagset_size) def forward(self, sentences): #embeds = self.embedding(sentences) lstm_out, _ = self.lstm(sentences) tag_scores = self.fc(lstm_out) return tag_scores def pad(batch): # batch is a list of (X, y) pairs X_batch, y_batch = zip(*batch) # Convert to tensors X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in X_batch] y_batch = [torch.tensor(seq, dtype=torch.long) for seq in y_batch] # Pad sequences X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) y_padded = pad_sequence(y_batch, batch_first=True, padding_value=PAD_VALUE) return X_padded, y_padded def pred_pad(batch): X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in batch] X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) return X_padded class Ner_Dataset(Dataset): def __init__(self, X, y): self.X = X self.y = y def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] class LSTM(BaseEstimator, ClassifierMixin): def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.epochs = epochs self.learning_rate = learning_rate self.tag2idx = tag2idx def fit(self, embedded, encoded_tags): #print('LSTM started:', flush=True) data = Ner_Dataset(embedded, encoded_tags) train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) self.model = self.train_LSTM(train_loader) #print('--> Epochs: ', self.epochs, flush=True) #print('--> Learning Rate: ', self.learning_rate) return self def predict(self, X): # Switch to evaluation mode test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) self.model.eval() predictions = [] # Iterate through test data with torch.no_grad(): for X_batch in test_loader: X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) tag_scores = self.model(X_batch) _, predicted_tags = torch.max(tag_scores, dim=2) flattened_pred = predicted_tags.view(-1) predictions.append(list(flattened_pred.cpu().numpy())) #print('before concat',predictions) #predictions = np.concatenate(predictions) #print('after concat',predictions) tag_encoder = LabelEncoder() tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) str_pred = [] for sentence in predictions: str_sentence = tag_encoder.inverse_transform(sentence) str_pred.append(list(str_sentence)) return str_pred def train_LSTM(self, train_loader): input_dim = self.embedding_dim # Instantiate the lstm_model lstm_model = BiLSTM_NER(input_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) lstm_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) # Loss function and optimizer loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding optimizer = optim.Adam(lstm_model.parameters(), lr=self.learning_rate) #print('--> Training LSTM') # Training loop for epoch in range(self.epochs): total_loss = 0 total_correct = 0 total_words = 0 lstm_model.train() # Set model to training mode for batch_idx, (X_batch, y_batch) in enumerate(train_loader): X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) # Zero gradients optimizer.zero_grad() # Forward pass tag_scores = lstm_model(X_batch) # Reshape and compute loss (ignore padded values) loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) # Backward pass and optimization loss.backward() optimizer.step() total_loss += loss.item() # Compute accuracy for this batch # Get the predicted tags (index of max score) _, predicted_tags = torch.max(tag_scores, dim=2) # Flatten the tensors to compare word-by-word flattened_pred = predicted_tags.view(-1) flattened_true = y_batch.view(-1) # Exclude padding tokens from the accuracy calculation mask = flattened_true != PAD_VALUE correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() # Count the total words in the batch (ignoring padding) total_words_batch = mask.sum().item() # Update total correct and total words total_correct += correct total_words += total_words_batch avg_loss = total_loss / len(train_loader) avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage #print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') return lstm_model # Define the FeedForward NN Model class FeedForwardNN_NER(nn.Module): def __init__(self, embedding_dim, hidden_dim, tagset_size): super(FeedForwardNN_NER, self).__init__() self.fc1 = nn.Linear(embedding_dim, hidden_dim) self.relu = nn.ReLU() self.fc2 = nn.Linear(hidden_dim, tagset_size) def forward(self, x): x = self.fc1(x) x = self.relu(x) logits = self.fc2(x) return logits class FeedforwardNN(BaseEstimator, ClassifierMixin): def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.epochs = epochs self.learning_rate = learning_rate self.tag2idx = tag2idx def fit(self, embedded, encoded_tags): print('Feed Forward NN: ', flush=True) data = Ner_Dataset(embedded, encoded_tags) train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) self.model = self.train_FF(train_loader) print('--> Feed Forward trained', flush=True) return self def predict(self, X): # Switch to evaluation mode test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) self.model.eval() predictions = [] # Iterate through test data with torch.no_grad(): for X_batch in test_loader: X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) tag_scores = self.model(X_batch) _, predicted_tags = torch.max(tag_scores, dim=2) # Flatten the tensors to compare word-by-word flattened_pred = predicted_tags.view(-1) predictions.append(flattened_pred.cpu().numpy()) tag_encoder = LabelEncoder() tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) str_pred = [] for sentence in predictions: str_sentence = tag_encoder.inverse_transform(sentence) str_pred.append(list(str_sentence)) return str_pred def train_FF(self, train_loader): # Instantiate the lstm_model ff_model = FeedForwardNN_NER(self.embedding_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) ff_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) # Loss function and optimizer loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding optimizer = optim.Adam(ff_model.parameters(), lr=self.learning_rate) print('--> Training FF') # Training loop for epoch in range(self.epochs): total_loss = 0 total_correct = 0 total_words = 0 ff_model.train() # Set model to training mode for batch_idx, (X_batch, y_batch) in enumerate(train_loader): X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) # Zero gradients optimizer.zero_grad() # Forward pass tag_scores = ff_model(X_batch) # Reshape and compute loss (ignore padded values) loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) # Backward pass and optimization loss.backward() optimizer.step() total_loss += loss.item() # Compute accuracy for this batch # Get the predicted tags (index of max score) _, predicted_tags = torch.max(tag_scores, dim=2) # Flatten the tensors to compare word-by-word flattened_pred = predicted_tags.view(-1) flattened_true = y_batch.view(-1) # Exclude padding tokens from the accuracy calculation mask = flattened_true != PAD_VALUE correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() # Count the total words in the batch (ignoring padding) total_words_batch = mask.sum().item() # Update total correct and total words total_correct += correct total_words += total_words_batch avg_loss = total_loss / len(train_loader) avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') return ff_model crf = sklearn_crfsuite.CRF( algorithm='lbfgs', c1=0.1, c2=0.1, max_iterations=100, all_possible_transitions=True)