# -*- coding: utf-8 -*- """JointmBERT.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/17r68TnVYmGnoeNYZhhbEqOeGxCn3fN8x """ # from google.colab import drive # drive.mount('/content/drive') # !nvidia-smi #!pip install seqeval -qqq # TODO: update this notebook to work with the latest version of transformers #!pip install -q transformers==2.11.0 import tensorflow as tf tf.__version__ """#Importing Libraries""" import os from streamlit import st from tqdm import tqdm import pandas as pd import numpy as np from sklearn.preprocessing import LabelEncoder from transformers import BertTokenizer, TFBertModel import warnings warnings.filterwarnings('ignore') from tensorflow.keras.layers import Dropout, Dense from tensorflow.keras.optimizers import Adam from tensorflow.keras.losses import SparseCategoricalCrossentropy from tensorflow.keras.metrics import SparseCategoricalAccuracy from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler, ModelCheckpoint, TensorBoard from seqeval.metrics import classification_report from transformers import TFAutoModel from tensorflow.keras.layers import Dropout, Dense import matplotlib.pyplot as plt from tensorflow.keras.optimizers import Adam from tensorflow.keras.losses import SparseCategoricalCrossentropy from tensorflow.keras.metrics import SparseCategoricalAccuracy from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score """#Loading Data""" uploaded_file = st.file_uploader(“translated_swa_test_slot_labels.xlsx”) df = pd.read_excel(uploaded_file) df.shape df.head(2) df.rename(columns = {'utterance_swa ':'words','slot_labels_swa':'word_labels','intent_swa':'intent_label'}, inplace = True) # df.head(2) df["words"] = df["utterance_swa"] del df["utterance_swa"] df.head(2) # df.head(2) df_train, df_valid = train_test_split(df, test_size=0.2) df_train.head(2) df_valid.head(2) """ ## A First Model: Intent Classification (Sentence Level) Let's ignore the slot filling task for now and let's try to build a sentence level classifier by fine-tuning a pre-trained Transformer-based model using the `huggingface/transformers` package that provides both TF2/Keras and Pytorch APIs. ### The BERT Tokenizer First let's load a pre-trained tokenizer and test it on a test sentence from the training set:""" from transformers import BertTokenizer model_name = "bert-base-multilingual-cased" tokenizer = BertTokenizer.from_pretrained(model_name) """#Testing word tokenization using Bert""" first_sentence = df_train.iloc[0]["words"] first_sentence encoding = tokenizer.encode(first_sentence) print(tokenizer.convert_ids_to_tokens(encoding)) """It can be noticed that BERT uses subword tokens so the length of the tokenized sentence is likely to be larger than the number of words in the sentence. Remarks: - The first token `[CLS]` is used by the pre-training task for sequence classification. - The last token `[SEP]` is a separator for the pre-training task that classifiies if a pair of sentences are consecutive in a corpus or not (next sentence prediction). #Data Preprocessing Checking the length of sequences after tokenization, so that we could assign them to equal dummy vectors in the training set """ train_sequence_lengths = [len(tokenizer.encode(text)) for text in df_train["words"]] plt.hist(train_sequence_lengths, bins=30) plt.title(f"max sequence length: {max(train_sequence_lengths)}") """[link text](https://)The mapping can be introspected in the `tokenizer.vocab` attribute: ### Encoding the Dataset with the Tokenizer Encoding the full train / valid and test sets with Bert tokenizer to get a padded integer numpy arrays: """ import numpy as np def encode_dataset(tokenizer, text_sequences, max_length): token_ids = np.zeros(shape=(len(text_sequences), max_length), dtype=np.int32) for i, text_sequence in enumerate(text_sequences): encoded = tokenizer.encode(text_sequence) token_ids[i, 0:len(encoded)] = encoded attention_masks = (token_ids != 0).astype(np.int32) return {"input_ids": token_ids, "attention_masks": attention_masks} encoded_train = encode_dataset(tokenizer, df_train["words"], 60) encoded_train["input_ids"] encoded_train["attention_masks"] encoded_valid = encode_dataset(tokenizer, df_valid["words"], 60) # encoded_test = encode_dataset(tokenizer, df_test["words"], 90) """### Encoding the Sequence Classification Targets To do so we build a simple mapping from the auxiliary files: """ seq_out_tokenizer = Tokenizer(filters='!"#$%&()*+,/:;<=>?@[\\]^`{|}~\t\n', oov_token="UNK",lower=False) ##Preprocessing df_train.fillna('O',inplace = True) df_valid.fillna('O',inplace = True) seq_out_tokenizer.fit_on_texts(df_train["word_labels"].tolist()) seq_out_tokenizer.fit_on_texts(df_valid["word_labels"].tolist()) seq_out_word_to_index = seq_out_tokenizer.word_index len(seq_out_word_to_index) seq_out_word_to_index intent_names = set(df_train.intent_label) intent_map = dict((label, idx) for idx, label in enumerate(intent_names)) intent_map intent_train = df_train["intent_label"].map(intent_map).values intent_valid = df_valid["intent_label"].map(intent_map).values """### Loading and Feeding a Pretrained BERT model Loading a pretrained BERT Large model using the [huggingface transformers](https://github.com/huggingface/transformers) package: """ from transformers import TFAutoModel, AutoConfig config = AutoConfig.from_pretrained('bert-base-multilingual-uncased') large_bert_model = TFAutoModel.from_pretrained("bert-base-multilingual-cased") large_bert_model.summary() encoded_valid outputs = large_bert_model(encoded_valid) len(outputs) """The **first ouput** of the BERT model is a tensor with shape: `(batch_size, seq_len, output_dim)` which computes **features for each token in the input sequence**:""" outputs[0].shape """The **second output** of the BERT model is a tensor with shape `(batch_size, output_dim)` which is the vector representation of the special token `[CLS]`. This vector is typically used as a **pooled representation for the sequence as a whole**.""" outputs[1].shape """#Mapping slots to the corresponding indexes""" slot_names = ["[PAD]"] slot_names += list(seq_out_word_to_index.keys())[1:] slot_names slot_map = {} for label in slot_names: slot_map[label] = len(slot_map) slot_map """The following function generates token-aligned integer labels from the BIO word-level annotations. In particular, if a specific word is too long to be represented as a single token, we expand its label for all the tokens of that word while taking care of using "B-" labels only for the first token and then use "I-" for the matching slot type for subsequent tokens of the same word:""" def encode_token_labels(text_sequences, slot_names, tokenizer, slot_map,max_length): encoded = np.zeros(shape=(len(text_sequences), max_length), dtype=np.int32) for i, (text_sequence, word_labels) in enumerate( zip(text_sequences, slot_names)): encoded_labels = [] for word, word_label in zip(text_sequence.split(), word_labels.split()): tokens = tokenizer.tokenize(word) encoded_labels.append(slot_map[word_label]) expand_label = word_label.replace("B-", "I-") if not expand_label in slot_map: expand_label = word_label encoded_labels.extend([slot_map[expand_label]] * (len(tokens) - 1)) encoded[i, 1:len(encoded_labels) + 1] = encoded_labels return encoded slot_train = encode_token_labels( df_train["words"], df_train["word_labels"], tokenizer, slot_map, 60) slot_valid = encode_token_labels( df_valid["words"], df_valid["word_labels"], tokenizer, slot_map, 60) # slot_test = encode_token_labels( # df_test["words"], df_test["word_labels"], tokenizer, slot_map, 90) ######################## ######### Debbuging proccess slot_train[0] slot_valid[0] # slot_map # # def encode_token_labels(text_sequences, slot_names, tokenizer, slot_map,max_length): # text_sequences = df_train["words"] # slot_names = df_train["word_labels"] # slot_map = slot_map # max_length = 63 # encoded = np.zeros(shape=(len(text_sequences), max_length), dtype=np.int32) # for i, (text_sequence, word_labels) in enumerate( # zip(text_sequences, slot_names)): # encoded_labels = [] # for word, word_label in zip(text_sequence.split(), word_labels.split()): # tokens = tokenizer.tokenize(word) # # print(word_label) # encoded_labels.append(slot_map[word_label]) # expand_label = word_label.replace("B-", "I-") # if not expand_label in slot_map: # expand_label = word_label # encoded_labels.extend([slot_map[expand_label]] * (len(tokens) - 1)) # encoded[i, 1:len(encoded_labels) + 1] = encoded_labels # # print(encoded) # slot_train = encode_token_labels( # df_train["words"], df_train["word_labels"], tokenizer, slot_map, 63) # slot_valid = encode_token_labels( # df_valid["words"], df_valid["word_labels"], tokenizer, slot_map, 63) # slot_test = encode_token_labels( # df_test["words"], df_test["word_labels"], tokenizer, slot_map, 63) # df_train["words"][0] # df_train["word_labels"][0] slot_map['O'] """Note that the special tokens such as "[PAD]" and "[SEP]" and all padded positions recieve a 0 label. #Joint Intent And Slot Filling Model """ class JointIntentAndSlotFillingModel(tf.keras.Model): def __init__(self, intent_num_labels=None, slot_num_labels=None, model_name="bert-base-multilingual-cased", dropout_prob=0.1): super().__init__(name="joint_intent_slot") self.bert = TFAutoModel.from_pretrained(model_name) self.dropout = Dropout(dropout_prob) self.intent_classifier = Dense(intent_num_labels, name="intent_classifier") self.slot_classifier = Dense(slot_num_labels, name="slot_classifier") def call(self, inputs, training=False): sequence_output, pooled_output = self.bert(inputs, training=training) # The first output of the main BERT layer has shape: # (batch_size, max_length, output_dim) sequence_output = self.dropout(sequence_output, training=training) slot_logits = self.slot_classifier(sequence_output) # The second output of the main BERT layer has shape: # (batch_size, output_dim) # and gives a "pooled" representation for the full sequence from the # hidden state that corresponds to the "[CLS]" token. pooled_output = self.dropout(pooled_output, training=training) intent_logits = self.intent_classifier(pooled_output) return slot_logits, intent_logits joint_model = JointIntentAndSlotFillingModel(intent_num_labels=len(intent_map), slot_num_labels=len(slot_map)) # joint_model.compile(optimizer=Adam(learning_rate=3e-5, epsilon=1e-08),loss=losses) opt = Adam(learning_rate=3e-5, epsilon=1e-08) losses = [SparseCategoricalCrossentropy(from_logits=True), SparseCategoricalCrossentropy(from_logits=True)] metrics = [SparseCategoricalAccuracy('accuracy')] joint_model.compile(optimizer=opt, loss=losses, metrics=metrics) ######### # checkpoint_path = "/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/cp.ckpt" # checkpoint_dir = os.path.dirname(checkpoint_path) # # Create a callback that saves the model's weights # cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, # save_weights_only=True, # verbose=1) #### Loading Model weights for training ################ # joint_model.load_weights('/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/cp.ckpt') history = joint_model.fit( encoded_train, (slot_train, intent_train), validation_data=(encoded_valid, (slot_valid, intent_valid)), epochs=15, batch_size=8, ) """### Save the model""" # model = joint_model # model.save("/content/drive/MyDrive/JP Morgan/Data/model_ester_swa/best_working") """#Making prediction on a single text sequence and displaying both the sequence-wise and the token-wise class labels""" def show_predictions(text, tokenizer, model, intent_names, slot_names): inputs = tf.constant(tokenizer.encode(text))[None, :] # batch_size = 1 # print(inputs) outputs = model(inputs) slot_logits, intent_logits = outputs # print(outputs) slot_ids = slot_logits.numpy().argmax(axis=-1)[0, 1:-1] # print(slot_ids) intent_id = intent_logits.numpy().argmax(axis=-1)[0] intent = [k for k, v in intent_names.items() if v == intent_id] # print(slot_ids) # print("## Intent:", intent) # print("## Slots:") slot_pred = [] #### text_split = text.split(' ') tokens_ids = [(i,j) for i,j in zip(tokenizer.tokenize(text),slot_ids)] #### Removing ## Tokens tokens_ids_ = [x for x in tokens_ids if not '##' in x[0]] for token, slot_id in zip(text_split, tokens_ids_): slot_pred.append(slot_names[slot_id[1]]) return intent, slot_pred # slot_names df_valid.iloc[55]["words"], df_valid.iloc[55]["word_labels"], df_valid.iloc[55]["intent_label"] pred_intent, pred_slot = show_predictions(df_valid.iloc[55]["words"], tokenizer, joint_model, intent_map, slot_names) pred_intent, pred_slot # # def evaluate() # print(idx, len(true_label), len(pred_slot), len(df_valid.iloc[idx]["words"].split())) # print("Added", num_add, ["PAD"]*num_add) # print(df_valid.iloc[idx]["words"]) # print(true_label) # print(pred_slot) # # break # pred_slot # from sklearn.metrics import f1_score slot_avg_f1_score = [] slot_avg_accuracy_score = [] intent_avg_f1_score = [] intent_avg_accuracy_score = [] count = 0 for idx in range(len(df_valid)): try: pred_intent, pred_slot = show_predictions(df_valid.iloc[idx]["words"], tokenizer, joint_model, intent_map, slot_names) true_label_slot = df_valid.iloc[idx]["word_labels"].split() true_label_intent = df_valid.iloc[idx]["intent_label"].split() # if(len(true_label) > len(pred_slot)): # num_add = len(true_label) - len(pred_slot) # pred_slot.extend(["[PAD]"]*num_add) # count += 1 # elif(len(pred_slot) > len(true_label)): # num_add = len(pred_slot) - len(true_label) # pred_slot.extend(["O"]*num_add) # count += 1 f1_slot = f1_score(true_label_slot, pred_slot, average="weighted") slot_accuracy_score = accuracy_score(true_label_slot, pred_slot) f1_intent = f1_score(true_label_intent, pred_intent, average="weighted") intent_accuracy_score = accuracy_score(true_label_intent, pred_intent) slot_avg_f1_score.append(f1_slot) slot_avg_accuracy_score.append(slot_accuracy_score) intent_avg_f1_score.append(f1_intent) intent_avg_accuracy_score.append(intent_accuracy_score) # print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) except: pass # print(idx) # print(df_valid.iloc[idx]["words"]) # print(df_valid.iloc[idx]["word_labels"].split()) # print(pred_slot) # print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) # break # df_valid.iloc[0]["word_labels"].split(" "), pred_slot print("F1 Score: ", np.mean(slot_avg_f1_score), "\nAccuracy:", np.mean(slot_avg_accuracy_score)) print("F1 Score Intent: ", np.mean(intent_avg_f1_score), "\nAccuracy Intent:", np.mean(intent_avg_accuracy_score)) from sklearn.metrics import f1_score avg_f1_score = [] avg_accuracy_score = [] count = 0 for idx in range(len(df_valid)): try: pred_intent, pred_slot = show_predictions(df_valid.iloc[idx]["words"], tokenizer, joint_model, intent_map, slot_names) true_label = df_valid.iloc[idx]["word_labels"].split() # if(len(true_label) > len(pred_slot)): # num_add = len(true_label) - len(pred_slot) # pred_slot.extend(["[PAD]"]*num_add) # count += 1 # elif(len(pred_slot) > len(true_label)): # num_add = len(pred_slot) - len(true_label) # pred_slot.extend(["O"]*num_add) # count += 1 f1 = f1_score(true_label, pred_slot, average="weighted") accuracy = accuracy_score(true_label, pred_slot) avg_f1_score.append(f1) avg_accuracy_score.append(accuracy) # print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) except: pass # print(idx) # print(df_valid.iloc[idx]["words"]) # print(df_valid.iloc[idx]["word_labels"].split()) # print(pred_slot) # print(len(df_valid.iloc[idx]["word_labels"].split()), len(pred_slot)) # break # df_valid.iloc[0]["word_labels"].split(" "), pred_slot print("F1 Score: ", np.mean(avg_f1_score), "\nAccuracy:", np.mean(avg_accuracy_score)) # text = st.text_area("Please enter a text") # if(text): # output = show_predictions(text, tokenizer, joint_model, intent_map, slot_names) # st.json(output)