Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import random | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from seqeval.metrics import accuracy_score, f1_score, classification_report | |
| from seqeval.scheme import IOB2 | |
| import sklearn_crfsuite | |
| from sklearn_crfsuite import metrics | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from gensim.models import Word2Vec, KeyedVectors | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import LabelEncoder | |
| from torch.utils.data import Dataset, DataLoader | |
| from torch.nn.utils.rnn import pad_sequence | |
| from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import gensim.downloader as api | |
| from itertools import product | |
| from sklearn.model_selection import train_test_split, GridSearchCV | |
| from joblib import dump | |
| class preprocess_sentences(): | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| print('PREPROCESSING') | |
| return self | |
| def transform(self, X): | |
| # X = train['tokens'], y = | |
| sentences = X.apply(lambda x: x.tolist()).tolist() | |
| print('--> Preprocessing complete \n', flush=True) | |
| return sentences | |
| EMBEDDING_DIM = 500 | |
| PAD_VALUE= -1 | |
| MAX_LENGTH = 376 | |
| BATCH_SIZE = 16 | |
| class Word2VecTransformer(): | |
| def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): | |
| self.model = None | |
| self.vector_size = vector_size | |
| self.window = window | |
| self.min_count = min_count | |
| self.workers = workers | |
| self.embedding_dim = embedding_dim | |
| def fit(self, X, y): | |
| # https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging | |
| # https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function | |
| print('WORD2VEC:', flush=True) | |
| # This fits the word2vec model | |
| self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window | |
| , min_count=self.min_count, workers=self.workers) | |
| print('--> Word2Vec Fitted', flush=True) | |
| return self | |
| def transform(self, X): | |
| # This bit should transform the sentences | |
| embedded_sentences = [] | |
| for sentence in X: | |
| sentence_vectors = [] | |
| for word in sentence: | |
| if word in self.model.wv: | |
| vec = self.model.wv[word] | |
| else: | |
| vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
| sentence_vectors.append(vec) | |
| embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
| print('--> Embeddings Complete \n', flush=True) | |
| return embedded_sentences | |
| class Word2VecTransformer_CRF(): | |
| def __init__(self, vector_size = EMBEDDING_DIM, window = 5, min_count = 1, workers = 1, embedding_dim=EMBEDDING_DIM): | |
| self.model = None | |
| self.vector_size = vector_size | |
| self.window = window | |
| self.min_count = min_count | |
| self.workers = workers | |
| self.embedding_dim = embedding_dim | |
| def fit(self, X, y): | |
| # https://stackoverflow.com/questions/17242456/python-print-sys-stdout-write-not-visible-when-using-logging | |
| # https://stackoverflow.com/questions/230751/how-can-i-flush-the-output-of-the-print-function | |
| print('WORD2VEC:', flush=True) | |
| # This fits the word2vec model | |
| self.model = Word2Vec(sentences = X, vector_size=self.vector_size, window=self.window | |
| , min_count=self.min_count, workers=self.workers) | |
| print('--> Word2Vec Fitted', flush=True) | |
| return self | |
| def transform(self, X): | |
| # This bit should transform the sentences | |
| embedded_sentences = [] | |
| for sentence in X: | |
| sentence_vectors = [] | |
| for word in sentence: | |
| features = { | |
| 'bias': 1.0, | |
| 'word.lower()': word.lower(), | |
| 'word[-3:]': word[-3:], | |
| 'word[-2:]': word[-2:], | |
| 'word.isupper()': word.isupper(), | |
| 'word.istitle()': word.istitle(), | |
| 'word.isdigit()': word.isdigit(), | |
| } | |
| if word in self.model.wv: | |
| vec = self.model.wv[word] | |
| else: | |
| vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
| # https://stackoverflow.com/questions/58736548/how-to-use-word-embedding-as-features-for-crf-sklearn-crfsuite-model-training | |
| for index in range(len(vec)): | |
| features[f"embedding_{index}"] = vec[index] | |
| sentence_vectors.append(features) | |
| embedded_sentences.append(sentence_vectors) | |
| print('--> Embeddings Complete \n', flush=True) | |
| return embedded_sentences | |
| class tfidfTransformer(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| self.model = None | |
| self.embedding_dim = None | |
| self.idf = None | |
| self.vocab_size = None | |
| self.vocab = None | |
| def fit(self, X, y = None): | |
| print('TFIDF:', flush=True) | |
| joined_sentences = [' '.join(tokens) for tokens in X] | |
| self.model = TfidfVectorizer() | |
| self.model.fit(joined_sentences) | |
| self.vocab = self.model.vocabulary_ | |
| self.idf = self.model.idf_ | |
| self.vocab_size = len(self.vocab) | |
| self.embedding_dim = self.vocab_size | |
| print('--> TFIDF Fitted', flush=True) | |
| return self | |
| def transform(self, X): | |
| embedded = [] | |
| for sentence in X: | |
| sent_vecs = [] | |
| token_counts = {} | |
| for word in sentence: | |
| token_counts[word] = token_counts.get(word, 0) + 1 | |
| sent_len = len(sentence) | |
| for word in sentence: | |
| vec = np.zeros(self.vocab_size) | |
| if word in self.vocab: | |
| tf = token_counts[word] / sent_len | |
| token_idx = self.vocab[word] | |
| vec[token_idx] = tf * self.idf[token_idx] | |
| sent_vecs.append(vec) | |
| embedded.append(torch.tensor(sent_vecs, dtype=torch.float32)) | |
| print('--> Embeddings Complete \n', flush=True) | |
| return embedded | |
| class GloveTransformer(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| self.model = None | |
| self.embedding_dim = 300 | |
| def fit(self, X, y=None): | |
| print('GLOVE', flush = True) | |
| self.model = api.load('glove-wiki-gigaword-300') | |
| print('--> Glove Downloaded', flush=True) | |
| return self | |
| def transform(self, X): | |
| # This bit should transform the sentences | |
| print('--> Beginning embeddings', flush=True) | |
| embedded_sentences = [] | |
| for sentence in X: | |
| sentence_vectors = [] | |
| for word in sentence: | |
| if word in self.model: | |
| vec = self.model[word] | |
| else: | |
| vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
| sentence_vectors.append(vec) | |
| embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
| print('--> Embeddings Complete \n', flush=True) | |
| return embedded_sentences | |
| class Bio2VecTransformer(): | |
| def __init__(self, vector_size = 200, window = 5, min_count = 1, workers = 1, embedding_dim=200): | |
| self.model = None | |
| self.vector_size = vector_size | |
| self.window = window | |
| self.min_count = min_count | |
| self.workers = workers | |
| self.embedding_dim = embedding_dim | |
| def fit(self, X, y): | |
| print('BIO2VEC:', flush=True) | |
| # https://stackoverflow.com/questions/58055415/how-to-load-bio2vec-in-gensim | |
| self.model = Bio2VecModel | |
| print('--> BIO2VEC Fitted', flush=True) | |
| return self | |
| def transform(self, X): | |
| # This bit should transform the sentences | |
| embedded_sentences = [] | |
| for sentence in X: | |
| sentence_vectors = [] | |
| for word in sentence: | |
| if word in self.model: | |
| vec = self.model[word] | |
| else: | |
| vec = np.random.normal(scale=0.6, size=(self.embedding_dim,)) | |
| sentence_vectors.append(vec) | |
| embedded_sentences.append(torch.tensor(sentence_vectors, dtype=torch.float32)) | |
| print('--> Embeddings Complete \n', flush=True) | |
| return embedded_sentences | |
| class BiLSTM_NER(nn.Module): | |
| def __init__(self,input_dim, hidden_dim, tagset_size): | |
| super(BiLSTM_NER, self).__init__() | |
| # Embedding layer | |
| #Freeze= false means that it will fine tune | |
| #self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze = False, padding_idx=-1) | |
| self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, bidirectional=True) | |
| self.fc = nn.Linear(hidden_dim*2, tagset_size) | |
| def forward(self, sentences): | |
| #embeds = self.embedding(sentences) | |
| lstm_out, _ = self.lstm(sentences) | |
| tag_scores = self.fc(lstm_out) | |
| return tag_scores | |
| def pad(batch): | |
| # batch is a list of (X, y) pairs | |
| X_batch, y_batch = zip(*batch) | |
| # Convert to tensors | |
| X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in X_batch] | |
| y_batch = [torch.tensor(seq, dtype=torch.long) for seq in y_batch] | |
| # Pad sequences | |
| X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) | |
| y_padded = pad_sequence(y_batch, batch_first=True, padding_value=PAD_VALUE) | |
| return X_padded, y_padded | |
| def pred_pad(batch): | |
| X_batch = [torch.tensor(seq, dtype=torch.float32) for seq in batch] | |
| X_padded = pad_sequence(X_batch, batch_first=True, padding_value=PAD_VALUE) | |
| return X_padded | |
| class Ner_Dataset(Dataset): | |
| def __init__(self, X, y): | |
| self.X = X | |
| self.y = y | |
| def __len__(self): | |
| return len(self.X) | |
| def __getitem__(self, idx): | |
| return self.X[idx], self.y[idx] | |
| class LSTM(BaseEstimator, ClassifierMixin): | |
| def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): | |
| self.embedding_dim = embedding_dim | |
| self.hidden_dim = hidden_dim | |
| self.epochs = epochs | |
| self.learning_rate = learning_rate | |
| self.tag2idx = tag2idx | |
| def fit(self, embedded, encoded_tags): | |
| #print('LSTM started:', flush=True) | |
| data = Ner_Dataset(embedded, encoded_tags) | |
| train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) | |
| self.model = self.train_LSTM(train_loader) | |
| #print('--> Epochs: ', self.epochs, flush=True) | |
| #print('--> Learning Rate: ', self.learning_rate) | |
| return self | |
| def predict(self, X): | |
| # Switch to evaluation mode | |
| test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) | |
| self.model.eval() | |
| predictions = [] | |
| # Iterate through test data | |
| with torch.no_grad(): | |
| for X_batch in test_loader: | |
| X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| tag_scores = self.model(X_batch) | |
| _, predicted_tags = torch.max(tag_scores, dim=2) | |
| flattened_pred = predicted_tags.view(-1) | |
| predictions.append(list(flattened_pred.cpu().numpy())) | |
| #print('before concat',predictions) | |
| #predictions = np.concatenate(predictions) | |
| #print('after concat',predictions) | |
| tag_encoder = LabelEncoder() | |
| tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) | |
| str_pred = [] | |
| for sentence in predictions: | |
| str_sentence = tag_encoder.inverse_transform(sentence) | |
| str_pred.append(list(str_sentence)) | |
| return str_pred | |
| def train_LSTM(self, train_loader): | |
| input_dim = self.embedding_dim | |
| # Instantiate the lstm_model | |
| lstm_model = BiLSTM_NER(input_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) | |
| lstm_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| # Loss function and optimizer | |
| loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding | |
| optimizer = optim.Adam(lstm_model.parameters(), lr=self.learning_rate) | |
| #print('--> Training LSTM') | |
| # Training loop | |
| for epoch in range(self.epochs): | |
| total_loss = 0 | |
| total_correct = 0 | |
| total_words = 0 | |
| lstm_model.train() # Set model to training mode | |
| for batch_idx, (X_batch, y_batch) in enumerate(train_loader): | |
| X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| # Zero gradients | |
| optimizer.zero_grad() | |
| # Forward pass | |
| tag_scores = lstm_model(X_batch) | |
| # Reshape and compute loss (ignore padded values) | |
| loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) | |
| # Backward pass and optimization | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| # Compute accuracy for this batch | |
| # Get the predicted tags (index of max score) | |
| _, predicted_tags = torch.max(tag_scores, dim=2) | |
| # Flatten the tensors to compare word-by-word | |
| flattened_pred = predicted_tags.view(-1) | |
| flattened_true = y_batch.view(-1) | |
| # Exclude padding tokens from the accuracy calculation | |
| mask = flattened_true != PAD_VALUE | |
| correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() | |
| # Count the total words in the batch (ignoring padding) | |
| total_words_batch = mask.sum().item() | |
| # Update total correct and total words | |
| total_correct += correct | |
| total_words += total_words_batch | |
| avg_loss = total_loss / len(train_loader) | |
| avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage | |
| #print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') | |
| return lstm_model | |
| # Define the FeedForward NN Model | |
| class FeedForwardNN_NER(nn.Module): | |
| def __init__(self, embedding_dim, hidden_dim, tagset_size): | |
| super(FeedForwardNN_NER, self).__init__() | |
| self.fc1 = nn.Linear(embedding_dim, hidden_dim) | |
| self.relu = nn.ReLU() | |
| self.fc2 = nn.Linear(hidden_dim, tagset_size) | |
| def forward(self, x): | |
| x = self.fc1(x) | |
| x = self.relu(x) | |
| logits = self.fc2(x) | |
| return logits | |
| class FeedforwardNN(BaseEstimator, ClassifierMixin): | |
| def __init__(self, embedding_dim = None, hidden_dim = 128, epochs = 5, learning_rate = 0.001, tag2idx = None): | |
| self.embedding_dim = embedding_dim | |
| self.hidden_dim = hidden_dim | |
| self.epochs = epochs | |
| self.learning_rate = learning_rate | |
| self.tag2idx = tag2idx | |
| def fit(self, embedded, encoded_tags): | |
| print('Feed Forward NN: ', flush=True) | |
| data = Ner_Dataset(embedded, encoded_tags) | |
| train_loader = DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad) | |
| self.model = self.train_FF(train_loader) | |
| print('--> Feed Forward trained', flush=True) | |
| return self | |
| def predict(self, X): | |
| # Switch to evaluation mode | |
| test_loader = DataLoader(X, batch_size=1, shuffle=False, collate_fn=pred_pad) | |
| self.model.eval() | |
| predictions = [] | |
| # Iterate through test data | |
| with torch.no_grad(): | |
| for X_batch in test_loader: | |
| X_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| tag_scores = self.model(X_batch) | |
| _, predicted_tags = torch.max(tag_scores, dim=2) | |
| # Flatten the tensors to compare word-by-word | |
| flattened_pred = predicted_tags.view(-1) | |
| predictions.append(flattened_pred.cpu().numpy()) | |
| tag_encoder = LabelEncoder() | |
| tag_encoder.fit(['B-AC', 'O', 'B-LF', 'I-LF']) | |
| str_pred = [] | |
| for sentence in predictions: | |
| str_sentence = tag_encoder.inverse_transform(sentence) | |
| str_pred.append(list(str_sentence)) | |
| return str_pred | |
| def train_FF(self, train_loader): | |
| # Instantiate the lstm_model | |
| ff_model = FeedForwardNN_NER(self.embedding_dim, hidden_dim=self.hidden_dim, tagset_size=len(self.tag2idx)) | |
| ff_model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| # Loss function and optimizer | |
| loss_function = nn.CrossEntropyLoss(ignore_index=PAD_VALUE) # Ignore padding | |
| optimizer = optim.Adam(ff_model.parameters(), lr=self.learning_rate) | |
| print('--> Training FF') | |
| # Training loop | |
| for epoch in range(self.epochs): | |
| total_loss = 0 | |
| total_correct = 0 | |
| total_words = 0 | |
| ff_model.train() # Set model to training mode | |
| for batch_idx, (X_batch, y_batch) in enumerate(train_loader): | |
| X_batch, y_batch = X_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')), y_batch.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| # Zero gradients | |
| optimizer.zero_grad() | |
| # Forward pass | |
| tag_scores = ff_model(X_batch) | |
| # Reshape and compute loss (ignore padded values) | |
| loss = loss_function(tag_scores.view(-1, len(self.tag2idx)), y_batch.view(-1)) | |
| # Backward pass and optimization | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| # Compute accuracy for this batch | |
| # Get the predicted tags (index of max score) | |
| _, predicted_tags = torch.max(tag_scores, dim=2) | |
| # Flatten the tensors to compare word-by-word | |
| flattened_pred = predicted_tags.view(-1) | |
| flattened_true = y_batch.view(-1) | |
| # Exclude padding tokens from the accuracy calculation | |
| mask = flattened_true != PAD_VALUE | |
| correct = (flattened_pred[mask] == flattened_true[mask]).sum().item() | |
| # Count the total words in the batch (ignoring padding) | |
| total_words_batch = mask.sum().item() | |
| # Update total correct and total words | |
| total_correct += correct | |
| total_words += total_words_batch | |
| avg_loss = total_loss / len(train_loader) | |
| avg_accuracy = total_correct / total_words * 100 # Accuracy in percentage | |
| print(f' ==> Epoch {epoch + 1}/{self.epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%') | |
| return ff_model | |
| crf = sklearn_crfsuite.CRF( | |
| algorithm='lbfgs', | |
| c1=0.1, | |
| c2=0.1, | |
| max_iterations=100, | |
| all_possible_transitions=True) | |