Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import random | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from seqeval.metrics import accuracy_score, f1_score, classification_report | |
from seqeval.scheme import IOB2 | |
import sklearn_crfsuite | |
from sklearn_crfsuite import metrics | |
from sklearn.metrics.pairwise import cosine_similarity | |
from gensim.models import Word2Vec, KeyedVectors | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import LabelEncoder | |
from torch.utils.data import Dataset, DataLoader | |
from torch.nn.utils.rnn import pad_sequence | |
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
import gensim.downloader as api | |
from itertools import product | |
from sklearn.model_selection import train_test_split, GridSearchCV | |
from joblib import dump | |
class preprocess_sentences(): | |
def __init__(self): | |
pass | |
def fit(self, X, y=None): | |
print('PREPROCESSING') | |
return self | |
def transform(self, X): | |
# X = train['tokens'], y = | |
sentences = X.apply(lambda x: x.tolist()).tolist() | |
print('--> Preprocessing complete \n', flush=True) | |
return sentences | |
EMBEDDING_DIM = 500 | |
PAD_VALUE= -1 | |
MAX_LENGTH = 376 | |
BATCH_SIZE = 16 | |
class Word2VecTransformer(): | |
def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): | |
self.model = None | |
self.vector_size = vector_size | |
self.window = window | |
self.min_count = min_count | |
self.workers = workers | |
self.embedding_dim = embedding_dim | |
def fit(self, X, y): | |
# https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging | |
# https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function | |
print('WORD2VEC:', flush=True) | |
# This fits the word2vec model | |
self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window | |
, min_count=self.min_count, workers=self.workers) | |
print('--> Word2Vec Fitted', flush=True) | |
return self | |
def transform(self, X): | |
# This bit should transform the sentences | |
embedded_sentences = [] | |
for sentence in X: | |
sentence_vectors = [] | |
for word in sentence: | |
if word in self.model.wv: | |
vec = self.model.wv[word] | |
else: | |
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
sentence_vectors.append(vec) | |
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
print('--> Embeddings Complete \n', flush=True) | |
return embedded_sentences | |
class Word2VecTransformer_CRF(): | |
def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): | |
self.model = None | |
self.vector_size = vector_size | |
self.window = window | |
self.min_count = min_count | |
self.workers = workers | |
self.embedding_dim = embedding_dim | |
def fit(self, X, y): | |
# https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging | |
# https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function | |
print('WORD2VEC:', flush=True) | |
# This fits the word2vec model | |
self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window | |
, min_count=self.min_count, workers=self.workers) | |
print('--> Word2Vec Fitted', flush=True) | |
return self | |
def transform(self, X): | |
# This bit should transform the sentences | |
embedded_sentences = [] | |
for sentence in X: | |
sentence_vectors = [] | |
for word in sentence: | |
features = { | |
'bias': 1.0, | |
'word.lower()': word.lower(), | |
'word[-3:]': word[-3:], | |
'word[-2:]': word[-2:], | |
'word.isupper()': word.isupper(), | |
'word.istitle()': word.istitle(), | |
'word.isdigit()': word.isdigit(), | |
} | |
if word in self.model.wv: | |
vec = self.model.wv[word] | |
else: | |
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
# https://stackoverflow.com/questions/58736548/how-to-use-word-embedding-as-features-for-crf-sklearn-crfsuite-model-training | |
for index in range(len(vec)): | |
features[f"embedding_{index}"] = vec[index] | |
sentence_vectors.append(features) | |
embedded_sentences.append(sentence_vectors) | |
print('--> Embeddings Complete \n', flush=True) | |
return embedded_sentences | |
class tfidfTransformer(BaseEstimator, TransformerMixin): | |
def __init__(self): | |
self.model = None | |
self.embedding_dim = None | |
self.idf = None | |
self.vocab_size = None | |
self.vocab = None | |
def fit(self, X, y = None): | |
print('TFIDF:', flush=True) | |
joined_sentences = [' '.join(tokens) for tokens in X] | |
self.model = TfidfVectorizer() | |
self.model.fit(joined_sentences) | |
self.vocab = self.model.vocabulary_ | |
self.idf = self.model.idf_ | |
self.vocab_size = len(self.vocab) | |
self.embedding_dim = self.vocab_size | |
print('--> TFIDF Fitted', flush=True) | |
return self | |
def transform(self, X): | |
embedded = [] | |
for sentence in X: | |
sent_vecs = [] | |
token_counts = {} | |
for word in sentence: | |
token_counts[word] = token_counts.get(word, 0) + 1 | |
sent_len = len(sentence) | |
for word in sentence: | |
vec = np.zeros(self.vocab_size) | |
if word in self.vocab: | |
tf = token_counts[word] / sent_len | |
token_idx = self.vocab[word] | |
vec[token_idx] = tf * self.idf[token_idx] | |
sent_vecs.append(vec) | |
embedded.append(torch.tensor(sent_vecs, dtype=torch.float32)) | |
print('--> Embeddings Complete \n', flush=True) | |
return embedded | |
class GloveTransformer(BaseEstimator, TransformerMixin): | |
def __init__(self): | |
self.model = None | |
self.embedding_dim = 300 | |
def fit(self, X, y=None): | |
print('GLOVE', flush = True) | |
self.model = api.load('glove-wiki-gigaword-300') | |
print('--> Glove Downloaded', flush=True) | |
return self | |
def transform(self, X): | |
# This bit should transform the sentences | |
print('--> Beginning embeddings', flush=True) | |
embedded_sentences = [] | |
for sentence in X: | |
sentence_vectors = [] | |
for word in sentence: | |
if word in self.model: | |
vec = self.model[word] | |
else: | |
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
sentence_vectors.append(vec) | |
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
print('--> Embeddings Complete \n', flush=True) | |
return embedded_sentences | |
class Bio2VecTransformer(): | |
def __init__(self, vector_size = 200, window = 5, min_count = 1, workers = 1, embedding_dim=200): | |
self.model = None | |
self.vector_size = vector_size | |
self.window = window | |
self.min_count = min_count | |
self.workers = workers | |
self.embedding_dim = embedding_dim | |
def fit(self, X, y): | |
print('BIO2VEC:', flush=True) | |
# https://stackoverflow.com/questions/58055415/how-to-load-bio2vec-in-gensim | |
self.model = Bio2VecModel | |
print('--> BIO2VEC Fitted', flush=True) | |
return self | |
def transform(self, X): | |
# This bit should transform the sentences | |
embedded_sentences = [] | |
for sentence in X: | |
sentence_vectors = [] | |
for word in sentence: | |
if word in self.model: | |
vec = self.model[word] | |
else: | |
vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
sentence_vectors.append(vec) | |
embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
print('--> Embeddings Complete \n', flush=True) | |
return embedded_sentences | |
class BiLSTM_NER(nn.Module): | |
def __init__(self,input_dim, hidden_dim, tagset_size): | |
super(BiLSTM_NER, self).__init__() | |
# Embedding layer | |
#Freeze= false means that it will fine tune | |
#self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze = False, padding_idx=-1) | |
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, bidirectional=True) | |
self.fc = nn.Linear(hidden_dim*2, tagset_size) | |
def forward(self, sentences): | |
#embeds = self.embedding(sentences) | |
lstm_out, _ = self.lstm(sentences) | |
tag_scores = self.fc(lstm_out) | |
return tag_scores | |
def pad(batch): | |
# batch is a list of (X, y) pairs | |
X_batch, y_batch = zip(*batch) | |
# Convert to tensors | |
X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in X_batch] | |
y_batch = [torch.tensor(seq, dtype=torch.long) for seq in y_batch] | |
# Pad sequences | |
X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) | |
y_padded = pad_sequence(y_batch, batch_first=True, padding_value=PAD_VALUE) | |
return X_padded, y_padded | |
def pred_pad(batch): | |
X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in batch] | |
X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) | |
return X_padded | |
class Ner_Dataset(Dataset): | |
def __init__(self, X, y): | |
self.X = X | |
self.y = y | |
def __len__(self): | |
return len(self.X) | |
def __getitem__(self, idx): | |
return self.X[idx], self.y[idx] | |
class LSTM(BaseEstimator, ClassifierMixin): | |
def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): | |
self.embedding_dim = embedding_dim | |
self.hidden_dim = hidden_dim | |
self.epochs = epochs | |
self.learning_rate = learning_rate | |
self.tag2idx = tag2idx | |
def fit(self, embedded, encoded_tags): | |
#print('LSTM started:', flush=True) | |
data = Ner_Dataset(embedded, encoded_tags) | |
train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) | |
self.model = self.train_LSTM(train_loader) | |
#print('--> Epochs: ', self.epochs, flush=True) | |
#print('--> Learning Rate: ', self.learning_rate) | |
return self | |
def predict(self, X): | |
# Switch to evaluation mode | |
test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) | |
self.model.eval() | |
predictions = [] | |
# Iterate through test data | |
with torch.no_grad(): | |
for X_batch in test_loader: | |
X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
tag_scores = self.model(X_batch) | |
_, predicted_tags = torch.max(tag_scores, dim=2) | |
flattened_pred = predicted_tags.view(-1) | |
predictions.append(list(flattened_pred.cpu().numpy())) | |
#print('before concat',predictions) | |
#predictions = np.concatenate(predictions) | |
#print('after concat',predictions) | |
tag_encoder = LabelEncoder() | |
tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) | |
str_pred = [] | |
for sentence in predictions: | |
str_sentence = tag_encoder.inverse_transform(sentence) | |
str_pred.append(list(str_sentence)) | |
return str_pred | |
def train_LSTM(self, train_loader): | |
input_dim = self.embedding_dim | |
# Instantiate the lstm_model | |
lstm_model = BiLSTM_NER(input_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) | |
lstm_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
# Loss function and optimizer | |
loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding | |
optimizer = optim.Adam(lstm_model.parameters(), lr=self.learning_rate) | |
#print('--> Training LSTM') | |
# Training loop | |
for epoch in range(self.epochs): | |
total_loss = 0 | |
total_correct = 0 | |
total_words = 0 | |
lstm_model.train() # Set model to training mode | |
for batch_idx, (X_batch, y_batch) in enumerate(train_loader): | |
X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
# Zero gradients | |
optimizer.zero_grad() | |
# Forward pass | |
tag_scores = lstm_model(X_batch) | |
# Reshape and compute loss (ignore padded values) | |
loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) | |
# Backward pass and optimization | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
# Compute accuracy for this batch | |
# Get the predicted tags (index of max score) | |
_, predicted_tags = torch.max(tag_scores, dim=2) | |
# Flatten the tensors to compare word-by-word | |
flattened_pred = predicted_tags.view(-1) | |
flattened_true = y_batch.view(-1) | |
# Exclude padding tokens from the accuracy calculation | |
mask = flattened_true != PAD_VALUE | |
correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() | |
# Count the total words in the batch (ignoring padding) | |
total_words_batch = mask.sum().item() | |
# Update total correct and total words | |
total_correct += correct | |
total_words += total_words_batch | |
avg_loss = total_loss / len(train_loader) | |
avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage | |
#print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') | |
return lstm_model | |
# Define the FeedForward NN Model | |
class FeedForwardNN_NER(nn.Module): | |
def __init__(self, embedding_dim, hidden_dim, tagset_size): | |
super(FeedForwardNN_NER, self).__init__() | |
self.fc1 = nn.Linear(embedding_dim, hidden_dim) | |
self.relu = nn.ReLU() | |
self.fc2 = nn.Linear(hidden_dim, tagset_size) | |
def forward(self, x): | |
x = self.fc1(x) | |
x = self.relu(x) | |
logits = self.fc2(x) | |
return logits | |
class FeedforwardNN(BaseEstimator, ClassifierMixin): | |
def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): | |
self.embedding_dim = embedding_dim | |
self.hidden_dim = hidden_dim | |
self.epochs = epochs | |
self.learning_rate = learning_rate | |
self.tag2idx = tag2idx | |
def fit(self, embedded, encoded_tags): | |
print('Feed Forward NN: ', flush=True) | |
data = Ner_Dataset(embedded, encoded_tags) | |
train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) | |
self.model = self.train_FF(train_loader) | |
print('--> Feed Forward trained', flush=True) | |
return self | |
def predict(self, X): | |
# Switch to evaluation mode | |
test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) | |
self.model.eval() | |
predictions = [] | |
# Iterate through test data | |
with torch.no_grad(): | |
for X_batch in test_loader: | |
X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
tag_scores = self.model(X_batch) | |
_, predicted_tags = torch.max(tag_scores, dim=2) | |
# Flatten the tensors to compare word-by-word | |
flattened_pred = predicted_tags.view(-1) | |
predictions.append(flattened_pred.cpu().numpy()) | |
tag_encoder = LabelEncoder() | |
tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) | |
str_pred = [] | |
for sentence in predictions: | |
str_sentence = tag_encoder.inverse_transform(sentence) | |
str_pred.append(list(str_sentence)) | |
return str_pred | |
def train_FF(self, train_loader): | |
# Instantiate the lstm_model | |
ff_model = FeedForwardNN_NER(self.embedding_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) | |
ff_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
# Loss function and optimizer | |
loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding | |
optimizer = optim.Adam(ff_model.parameters(), lr=self.learning_rate) | |
print('--> Training FF') | |
# Training loop | |
for epoch in range(self.epochs): | |
total_loss = 0 | |
total_correct = 0 | |
total_words = 0 | |
ff_model.train() # Set model to training mode | |
for batch_idx, (X_batch, y_batch) in enumerate(train_loader): | |
X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
# Zero gradients | |
optimizer.zero_grad() | |
# Forward pass | |
tag_scores = ff_model(X_batch) | |
# Reshape and compute loss (ignore padded values) | |
loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) | |
# Backward pass and optimization | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
# Compute accuracy for this batch | |
# Get the predicted tags (index of max score) | |
_, predicted_tags = torch.max(tag_scores, dim=2) | |
# Flatten the tensors to compare word-by-word | |
flattened_pred = predicted_tags.view(-1) | |
flattened_true = y_batch.view(-1) | |
# Exclude padding tokens from the accuracy calculation | |
mask = flattened_true != PAD_VALUE | |
correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() | |
# Count the total words in the batch (ignoring padding) | |
total_words_batch = mask.sum().item() | |
# Update total correct and total words | |
total_correct += correct | |
total_words += total_words_batch | |
avg_loss = total_loss / len(train_loader) | |
avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage | |
print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') | |
return ff_model | |
crf = sklearn_crfsuite.CRF( | |
algorithm='lbfgs', | |
c1=0.1, | |
c2=0.1, | |
max_iterations=100, | |
all_possible_transitions=True) | |