Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
"""JointmBERT.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/17r68TnVYmGnoeNYZhhbEqOeGxCn3fN8x | |
""" | |
# from google.colab import drive | |
# drive.mount('/content/drive') | |
# !nvidia-smi | |
#!pip install seqeval -qqq | |
# TODO: update this notebook to work with the latest version of transformers | |
#!pip install -q transformers==2.11.0 | |
import tensorflow as tf | |
tf.__version__ | |
"""#Importing Libraries""" | |
import os | |
from streamlit import st | |
from tqdm import tqdm | |
import pandas as pd | |
import numpy as np | |
from sklearn.preprocessing import LabelEncoder | |
from transformers import BertTokenizer, TFBertModel | |
import warnings | |
warnings.filterwarnings('ignore') | |
from tensorflow.keras.layers import Dropout, Dense | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.losses import SparseCategoricalCrossentropy | |
from tensorflow.keras.metrics import SparseCategoricalAccuracy | |
from tensorflow.keras.preprocessing.text import Tokenizer | |
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler, ModelCheckpoint, TensorBoard | |
from seqeval.metrics import classification_report | |
from transformers import TFAutoModel | |
from tensorflow.keras.layers import Dropout, Dense | |
import matplotlib.pyplot as plt | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.losses import SparseCategoricalCrossentropy | |
from tensorflow.keras.metrics import SparseCategoricalAccuracy | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score | |
"""#Loading Data""" | |
uploaded_file = st.file_uploader(“translated_swa_test_slot_labels.xlsx”) | |
df = pd.read_excel(uploaded_file) | |
df.shape | |
df.head(2) | |
df.rename(columns = {'utterance_swa ':'words','slot_labels_swa':'word_labels','intent_swa':'intent_label'}, inplace = True) | |
# df.head(2) | |
df["words"] = df["utterance_swa"] | |
del df["utterance_swa"] | |
df.head(2) | |
# df.head(2) | |
df_train, df_valid = train_test_split(df, test_size=0.2) | |
df_train.head(2) | |
df_valid.head(2) | |
""" | |
## A First Model: Intent Classification (Sentence Level) | |
Let's ignore the slot filling task for now and let's try to build a sentence level classifier by fine-tuning a pre-trained Transformer-based model using the `huggingface/transformers` package that provides both TF2/Keras and Pytorch APIs. | |
### The BERT Tokenizer | |
First let's load a pre-trained tokenizer and test it on a test sentence from the training set:""" | |
from transformers import BertTokenizer | |
model_name = "bert-base-multilingual-cased" | |
tokenizer = BertTokenizer.from_pretrained(model_name) | |
"""#Testing word tokenization using Bert""" | |
first_sentence = df_train.iloc[0]["words"] | |
first_sentence | |
encoding = tokenizer.encode(first_sentence) | |
print(tokenizer.convert_ids_to_tokens(encoding)) | |
"""It can be noticed that BERT uses subword tokens so the length of the tokenized sentence is likely to be larger than the number of words in the sentence. | |
Remarks: | |
- The first token `[CLS]` is used by the pre-training task for sequence classification. | |
- The last token `[SEP]` is a separator for the pre-training task that classifiies if a pair of sentences are consecutive in a corpus or not (next sentence prediction). | |
#Data Preprocessing | |
Checking the length of sequences after tokenization, so that we could assign them to equal dummy vectors in the training set | |
""" | |
train_sequence_lengths = [len(tokenizer.encode(text)) | |
for text in df_train["words"]] | |
plt.hist(train_sequence_lengths, bins=30) | |
plt.title(f"max sequence length: {max(train_sequence_lengths)}") | |
"""[link text](https://)The mapping can be introspected in the `tokenizer.vocab` attribute: | |
### Encoding the Dataset with the Tokenizer | |
Encoding the full train / valid and test sets with Bert tokenizer to get a padded integer numpy arrays: | |
""" | |
import numpy as np | |
def encode_dataset(tokenizer, text_sequences, max_length): | |
token_ids = np.zeros(shape=(len(text_sequences), max_length), | |
dtype=np.int32) | |
for i, text_sequence in enumerate(text_sequences): | |
encoded = tokenizer.encode(text_sequence) | |
token_ids[i, 0:len(encoded)] = encoded | |
attention_masks = (token_ids != 0).astype(np.int32) | |
return {"input_ids": token_ids, "attention_masks": attention_masks} | |
encoded_train = encode_dataset(tokenizer, df_train["words"], 60) | |
encoded_train["input_ids"] | |
encoded_train["attention_masks"] | |
encoded_valid = encode_dataset(tokenizer, df_valid["words"], 60) | |
# encoded_test = encode_dataset(tokenizer, df_test["words"], 90) | |
"""### Encoding the Sequence Classification Targets | |
To do so we build a simple mapping from the auxiliary files: | |
""" | |
seq_out_tokenizer = Tokenizer(filters='!"#$%&()*+,/:;<=>?@[\\]^`{|}~\t\n', oov_token="UNK",lower=False) | |
##Preprocessing | |
df_train.fillna('O',inplace = True) | |
df_valid.fillna('O',inplace = True) | |
seq_out_tokenizer.fit_on_texts(df_train["word_labels"].tolist()) | |
seq_out_tokenizer.fit_on_texts(df_valid["word_labels"].tolist()) | |
seq_out_word_to_index = seq_out_tokenizer.word_index | |
len(seq_out_word_to_index) | |
seq_out_word_to_index | |
intent_names = set(df_train.intent_label) | |
intent_map = dict((label, idx) for idx, label in enumerate(intent_names)) | |
intent_map | |
intent_train = df_train["intent_label"].map(intent_map).values | |
intent_valid = df_valid["intent_label"].map(intent_map).values | |
"""### Loading and Feeding a Pretrained BERT model | |
Loading a pretrained BERT Large model using the [huggingface transformers](https://github.com/huggingface/transformers) package: | |
""" | |
from transformers import TFAutoModel, AutoConfig | |
config = AutoConfig.from_pretrained('bert-base-multilingual-uncased') | |
large_bert_model = TFAutoModel.from_pretrained("bert-base-multilingual-cased") | |
large_bert_model.summary() | |
encoded_valid | |
outputs = large_bert_model(encoded_valid) | |
len(outputs) | |
"""The **first ouput** of the BERT model is a tensor with shape: `(batch_size, seq_len, output_dim)` which computes **features for each token in the input sequence**:""" | |
outputs[0].shape | |
"""The **second output** of the BERT model is a tensor with shape `(batch_size, output_dim)` which is the vector representation of the special token `[CLS]`. This vector is typically used as a **pooled representation for the sequence as a whole**.""" | |
outputs[1].shape | |
"""#Mapping slots to the corresponding indexes""" | |
slot_names = ["[PAD]"] | |
slot_names += list(seq_out_word_to_index.keys())[1:] | |
slot_names | |
slot_map = {} | |
for label in slot_names: | |
slot_map[label] = len(slot_map) | |
slot_map | |
"""The following function generates token-aligned integer labels from the BIO word-level annotations. In particular, if a specific word is too long to be represented as a single token, we expand its label for all the tokens of that word while taking care of using "B-" labels only for the first token and then use "I-" for the matching slot type for subsequent tokens of the same word:""" | |
def encode_token_labels(text_sequences, slot_names, tokenizer, slot_map,max_length): | |
encoded = np.zeros(shape=(len(text_sequences), max_length), dtype=np.int32) | |
for i, (text_sequence, word_labels) in enumerate( | |
zip(text_sequences, slot_names)): | |
encoded_labels = [] | |
for word, word_label in zip(text_sequence.split(), word_labels.split()): | |
tokens = tokenizer.tokenize(word) | |
encoded_labels.append(slot_map[word_label]) | |
expand_label = word_label.replace("B-", "I-") | |
if not expand_label in slot_map: | |
expand_label = word_label | |
encoded_labels.extend([slot_map[expand_label]] * (len(tokens) - 1)) | |
encoded[i, 1:len(encoded_labels) + 1] = encoded_labels | |
return encoded | |
slot_train = encode_token_labels( | |
df_train["words"], df_train["word_labels"], tokenizer, slot_map, 60) | |
slot_valid = encode_token_labels( | |
df_valid["words"], df_valid["word_labels"], tokenizer, slot_map, 60) | |
# slot_test = encode_token_labels( | |
# df_test["words"], df_test["word_labels"], tokenizer, slot_map, 90) | |
######################## | |
######### Debbuging proccess | |
slot_train[0] | |
slot_valid[0] | |
# slot_map | |
# # def encode_token_labels(text_sequences, slot_names, tokenizer, slot_map,max_length): | |
# text_sequences = df_train["words"] | |
# slot_names = df_train["word_labels"] | |
# slot_map = slot_map | |
# max_length = 63 | |
# encoded = np.zeros(shape=(len(text_sequences), max_length), dtype=np.int32) | |
# for i, (text_sequence, word_labels) in enumerate( | |
# zip(text_sequences, slot_names)): | |
# encoded_labels = [] | |
# for word, word_label in zip(text_sequence.split(), word_labels.split()): | |
# tokens = tokenizer.tokenize(word) | |
# # print(word_label) | |
# encoded_labels.append(slot_map[word_label]) | |
# expand_label = word_label.replace("B-", "I-") | |
# if not expand_label in slot_map: | |
# expand_label = word_label | |
# encoded_labels.extend([slot_map[expand_label]] * (len(tokens) - 1)) | |
# encoded[i, 1:len(encoded_labels) + 1] = encoded_labels | |
# # print(encoded) | |
# slot_train = encode_token_labels( | |
# df_train["words"], df_train["word_labels"], tokenizer, slot_map, 63) | |
# slot_valid = encode_token_labels( | |
# df_valid["words"], df_valid["word_labels"], tokenizer, slot_map, 63) | |
# slot_test = encode_token_labels( | |
# df_test["words"], df_test["word_labels"], tokenizer, slot_map, 63) | |
# df_train["words"][0] | |
# df_train["word_labels"][0] | |
slot_map['O'] | |
"""Note that the special tokens such as "[PAD]" and "[SEP]" and all padded positions recieve a 0 label. | |
#Joint Intent And Slot Filling Model | |
""" | |
class JointIntentAndSlotFillingModel(tf.keras.Model): | |
def __init__(self, intent_num_labels=None, slot_num_labels=None, | |
model_name="bert-base-multilingual-cased", dropout_prob=0.1): | |
super().__init__(name="joint_intent_slot") | |
self.bert = TFAutoModel.from_pretrained(model_name) | |
self.dropout = Dropout(dropout_prob) | |
self.intent_classifier = Dense(intent_num_labels, | |
name="intent_classifier") | |
self.slot_classifier = Dense(slot_num_labels, | |
name="slot_classifier") | |
def call(self, inputs, training=False): | |
sequence_output, pooled_output = self.bert(inputs, training=training) | |
# The first output of the main BERT layer has shape: | |
# (batch_size, max_length, output_dim) | |
sequence_output = self.dropout(sequence_output, training=training) | |
slot_logits = self.slot_classifier(sequence_output) | |
# The second output of the main BERT layer has shape: | |
# (batch_size, output_dim) | |
# and gives a "pooled" representation for the full sequence from the | |
# hidden state that corresponds to the "[CLS]" token. | |
pooled_output = self.dropout(pooled_output, training=training) | |
intent_logits = self.intent_classifier(pooled_output) | |
return slot_logits, intent_logits | |
joint_model = JointIntentAndSlotFillingModel(intent_num_labels=len(intent_map), slot_num_labels=len(slot_map)) | |
# joint_model.compile(optimizer=Adam(learning_rate=3e-5, epsilon=1e-08),loss=losses) | |
opt = Adam(learning_rate=3e-5, epsilon=1e-08) | |
losses = [SparseCategoricalCrossentropy(from_logits=True), | |
SparseCategoricalCrossentropy(from_logits=True)] | |
metrics = [SparseCategoricalAccuracy('accuracy')] | |
joint_model.compile(optimizer=opt, loss=losses, metrics=metrics) | |
######### | |
# checkpoint_path = "/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/cp.ckpt" | |
# checkpoint_dir = os.path.dirname(checkpoint_path) | |
# # Create a callback that saves the model's weights | |
# cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, | |
# save_weights_only=True, | |
# verbose=1) | |
#### Loading Model weights for training | |
################ | |
# joint_model.load_weights('/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/cp.ckpt') | |
history = joint_model.fit( | |
encoded_train, | |
(slot_train, intent_train), | |
validation_data=(encoded_valid, (slot_valid, intent_valid)), | |
epochs=15, | |
batch_size=8, | |
) | |
"""### Save the model""" | |
# model = joint_model | |
# model.save("/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/best_working") | |
"""#Making prediction on a single text sequence and displaying both the sequence-wise and the token-wise class labels""" | |
def show_predictions(text, tokenizer, model, intent_names, slot_names): | |
inputs = tf.constant(tokenizer.encode(text))[None, :] # batch_size = 1 | |
# print(inputs) | |
outputs = model(inputs) | |
slot_logits, intent_logits = outputs | |
# print(outputs) | |
slot_ids = slot_logits.numpy().argmax(axis=-1)[0, 1:-1] | |
# print(slot_ids) | |
intent_id = intent_logits.numpy().argmax(axis=-1)[0] | |
intent = [k for k, v in intent_names.items() if v == intent_id] | |
# print(slot_ids) | |
# print("## Intent:", intent) | |
# print("## Slots:") | |
slot_pred = [] | |
#### | |
text_split = text.split(' ') | |
tokens_ids = [(i,j) for i,j in zip(tokenizer.tokenize(text),slot_ids)] | |
#### Removing ## Tokens | |
tokens_ids_ = [x for x in tokens_ids if not '##' in x[0]] | |
for token, slot_id in zip(text_split, tokens_ids_): | |
slot_pred.append(slot_names[slot_id[1]]) | |
return intent, slot_pred | |
# slot_names | |
df_valid.iloc[55]["words"], df_valid.iloc[55]["word_labels"], df_valid.iloc[55]["intent_label"] | |
pred_intent, pred_slot = show_predictions(df_valid.iloc[55]["words"], tokenizer, joint_model, intent_map, slot_names) | |
pred_intent, pred_slot | |
# # def evaluate() | |
# print(idx, len(true_label), len(pred_slot), len(df_valid.iloc[idx]["words"].split())) | |
# print("Added", num_add, ["PAD"]*num_add) | |
# print(df_valid.iloc[idx]["words"]) | |
# print(true_label) | |
# print(pred_slot) | |
# # break | |
# pred_slot | |
# from sklearn.metrics import f1_score | |
slot_avg_f1_score = [] | |
slot_avg_accuracy_score = [] | |
intent_avg_f1_score = [] | |
intent_avg_accuracy_score = [] | |
count = 0 | |
for idx in range(len(df_valid)): | |
try: | |
pred_intent, pred_slot = show_predictions(df_valid.iloc[idx]["words"], tokenizer, joint_model, intent_map, slot_names) | |
true_label_slot = df_valid.iloc[idx]["word_labels"].split() | |
true_label_intent = df_valid.iloc[idx]["intent_label"].split() | |
# if(len(true_label) > len(pred_slot)): | |
# num_add = len(true_label) - len(pred_slot) | |
# pred_slot.extend(["[PAD]"]*num_add) | |
# count += 1 | |
# elif(len(pred_slot) > len(true_label)): | |
# num_add = len(pred_slot) - len(true_label) | |
# pred_slot.extend(["O"]*num_add) | |
# count += 1 | |
f1_slot = f1_score(true_label_slot, pred_slot, average="weighted") | |
slot_accuracy_score = accuracy_score(true_label_slot, pred_slot) | |
f1_intent = f1_score(true_label_intent, pred_intent, average="weighted") | |
intent_accuracy_score = accuracy_score(true_label_intent, pred_intent) | |
slot_avg_f1_score.append(f1_slot) | |
slot_avg_accuracy_score.append(slot_accuracy_score) | |
intent_avg_f1_score.append(f1_intent) | |
intent_avg_accuracy_score.append(intent_accuracy_score) | |
# print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) | |
except: | |
pass | |
# print(idx) | |
# print(df_valid.iloc[idx]["words"]) | |
# print(df_valid.iloc[idx]["word_labels"].split()) | |
# print(pred_slot) | |
# print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) | |
# break | |
# df_valid.iloc[0]["word_labels"].split(" "), pred_slot | |
print("F1 Score: ", np.mean(slot_avg_f1_score), "\nAccuracy:", np.mean(slot_avg_accuracy_score)) | |
print("F1 Score Intent: ", np.mean(intent_avg_f1_score), "\nAccuracy Intent:", np.mean(intent_avg_accuracy_score)) | |
from sklearn.metrics import f1_score | |
avg_f1_score = [] | |
avg_accuracy_score = [] | |
count = 0 | |
for idx in range(len(df_valid)): | |
try: | |
pred_intent, pred_slot = show_predictions(df_valid.iloc[idx]["words"], tokenizer, joint_model, intent_map, slot_names) | |
true_label = df_valid.iloc[idx]["word_labels"].split() | |
# if(len(true_label) > len(pred_slot)): | |
# num_add = len(true_label) - len(pred_slot) | |
# pred_slot.extend(["[PAD]"]*num_add) | |
# count += 1 | |
# elif(len(pred_slot) > len(true_label)): | |
# num_add = len(pred_slot) - len(true_label) | |
# pred_slot.extend(["O"]*num_add) | |
# count += 1 | |
f1 = f1_score(true_label, pred_slot, average="weighted") | |
accuracy = accuracy_score(true_label, pred_slot) | |
avg_f1_score.append(f1) | |
avg_accuracy_score.append(accuracy) | |
# print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) | |
except: | |
pass | |
# print(idx) | |
# print(df_valid.iloc[idx]["words"]) | |
# print(df_valid.iloc[idx]["word_labels"].split()) | |
# print(pred_slot) | |
# print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) | |
# break | |
# df_valid.iloc[0]["word_labels"].split(" "), pred_slot | |
print("F1 Score: ", np.mean(avg_f1_score), "\nAccuracy:", np.mean(avg_accuracy_score)) | |
# text = st.text_area("Please enter a text") | |
# if(text): | |
# output = show_predictions(text, tokenizer, joint_model, intent_map, slot_names) | |
# st.json(output) |