import tensorflow as tf import time import numpy as np import matplotlib.pyplot as plt def gelu(x): return 0.5 * x * (1.0 + tf.math.erf(x / tf.sqrt(2.))) def scaled_dot_product_attention(q, k, v, mask,adjoin_matrix): """Calculate the attention weights. q, k, v must have matching leading dimensions. k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v. The mask has different shapes depending on its type(padding or look ahead) but it must be broadcastable for addition. Args: q: query shape == (..., seq_len_q, depth) k: key shape == (..., seq_len_k, depth) v: value shape == (..., seq_len_v, depth_v) mask: Float tensor with shape broadcastable to (..., seq_len_q, seq_len_k). Defaults to None. Returns: output, attention_weights """ matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k) # scale matmul_qk dk = tf.cast(tf.shape(k)[-1], tf.float32) scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) # add the mask to the scaled tensor. if mask is not None: scaled_attention_logits += (mask * -1e9) if adjoin_matrix is not None: #adjoin_matrix1 =tf.where(adjoin_matrix>0,0.0,-1e9) #scaled_attention_logits += adjoin_matrix1 #scaled_attention_logits = scaled_attention_logits * adjoin_matrix scaled_attention_logits += adjoin_matrix # softmax is normalized on the last axis (seq_len_k) so that the scores # add up to 1. attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # (..., seq_len_q, seq_len_k) output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v) return output, attention_weights class MultiHeadAttention(tf.keras.layers.Layer): def __init__(self, d_model, num_heads): super(MultiHeadAttention, self).__init__() self.num_heads = num_heads self.d_model = d_model assert d_model % self.num_heads == 0 self.depth = d_model // self.num_heads self.wq = tf.keras.layers.Dense(d_model) self.wk = tf.keras.layers.Dense(d_model) self.wv = tf.keras.layers.Dense(d_model) self.dense = tf.keras.layers.Dense(d_model) def split_heads(self, x, batch_size): """Split the last dimension into (num_heads, depth). Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth) """ x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) return tf.transpose(x, perm=[0, 2, 1, 3]) def call(self, v, k, q, mask,adjoin_matrix): batch_size = tf.shape(q)[0] q = self.wq(q) # (batch_size, seq_len, d_model) k = self.wk(k) # (batch_size, seq_len, d_model) v = self.wv(v) # (batch_size, seq_len, d_model) q = self.split_heads(q, batch_size) # (batch_size, num_heads, seq_len_q, depth) k = self.split_heads(k, batch_size) # (batch_size, num_heads, seq_len_k, depth) v = self.split_heads(v, batch_size) # (batch_size, num_heads, seq_len_v, depth) # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth) # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k) scaled_attention, attention_weights = scaled_dot_product_attention( q, k, v, mask,adjoin_matrix) scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth) concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model) output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model) return output, attention_weights def point_wise_feed_forward_network(d_model, dff): return tf.keras.Sequential([ tf.keras.layers.Dense(dff, activation=gelu), # (batch_size, seq_len, dff)tf.keras.layers.LeakyReLU(0.01) tf.keras.layers.Dense(d_model) # (batch_size, seq_len, d_model) ]) class EncoderLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, dff, rate=0.1): super(EncoderLayer, self).__init__() self.mha = MultiHeadAttention(d_model, num_heads) self.ffn = point_wise_feed_forward_network(d_model, dff) self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.dropout1 = tf.keras.layers.Dropout(rate) self.dropout2 = tf.keras.layers.Dropout(rate) def call(self, x, training, mask,adjoin_matrix): attn_output, attention_weights = self.mha(x, x, x, mask,adjoin_matrix) # (batch_size, input_seq_len, d_model) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, d_model) ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model) ffn_output = self.dropout2(ffn_output, training=training) out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, d_model) return out2,attention_weights class Encoder(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1): super(Encoder, self).__init__() self.d_model = d_model self.num_layers = num_layers self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model) # self.pos_encoding = positional_encoding(maximum_position_encoding, # self.d_model) self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)] self.dropout = tf.keras.layers.Dropout(rate) def call(self, x, training, mask,adjoin_matrix): seq_len = tf.shape(x)[1] adjoin_matrix = adjoin_matrix[:,tf.newaxis,:,:] # adding embedding and position encoding. x = self.embedding(x) # (batch_size, input_seq_len, d_model) x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) x = self.dropout(x, training=training) for i in range(self.num_layers): x,attention_weights = self.enc_layers[i](x, training, mask,adjoin_matrix) return x # (batch_size, input_seq_len, d_model) class Encoder_test(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1): super(Encoder_test, self).__init__() self.d_model = d_model self.num_layers = num_layers self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model) # self.pos_encoding = positional_encoding(maximum_position_encoding, # self.d_model) self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)] self.dropout = tf.keras.layers.Dropout(rate) def call(self, x, training, mask,adjoin_matrix): seq_len = tf.shape(x)[1] adjoin_matrix = adjoin_matrix[:,tf.newaxis,:,:] # adding embedding and position encoding. x = self.embedding(x) # (batch_size, input_seq_len, d_model) x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) # x += self.pos_encoding[:, :seq_len, :] x = self.dropout(x, training=training) attention_weights_list = [] xs = [] for i in range(self.num_layers): x,attention_weights = self.enc_layers[i](x, training, mask,adjoin_matrix) attention_weights_list.append(attention_weights) xs.append(x) return x,attention_weights_list,xs class BertModel_test(tf.keras.Model): def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size = 17,dropout_rate = 0.1): super(BertModel_test, self).__init__() self.encoder = Encoder_test(num_layers=num_layers,d_model=d_model, num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) self.fc1 = tf.keras.layers.Dense(d_model, activation=gelu) self.layernorm = tf.keras.layers.LayerNormalization(-1) self.fc2 = tf.keras.layers.Dense(vocab_size) def call(self,x,adjoin_matrix,mask,training=False): x,att,xs = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) x = self.fc1(x) x = self.layernorm(x) x = self.fc2(x) return x,att,xs class BertModel(tf.keras.Model): def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size = 17,dropout_rate = 0.1): super(BertModel, self).__init__() self.encoder = Encoder(num_layers=num_layers,d_model=d_model, num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) self.fc1 = tf.keras.layers.Dense(d_model, activation=gelu) self.layernorm = tf.keras.layers.LayerNormalization(-1) self.fc2 = tf.keras.layers.Dense(vocab_size) def call(self,x,adjoin_matrix,mask,training=False): x = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) x = self.fc1(x) x = self.layernorm(x) x = self.fc2(x) return x class PredictModel(tf.keras.Model): def __init__(self,num_layers = 8,d_model = 256,dff = 512,num_heads = 8,vocab_size =17,dropout_rate = 0.1,dense_dropout=0.1): super(PredictModel, self).__init__() self.encoder = Encoder(num_layers=num_layers,d_model=d_model, num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) self.fc1 = tf.keras.layers.Dense(256,activation=tf.keras.layers.LeakyReLU(0.25)) self.fc2 = tf.keras.layers.Dense(256,activation=tf.keras.layers.LeakyReLU(0.25)) self.dropout = tf.keras.layers.Dropout(dense_dropout) self.fc3 = tf.keras.layers.Dense(1) def call(self,x,adjoin_matrix,mask,training=False): x = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) x = x[:,0,:] x = self.fc1(x) x = self.dropout(x,training=training) x = self.fc2(x) x = self.fc3(x) return x class PredictModel_test(tf.keras.Model): def __init__(self,num_layers = 6,d_model = 256,dff = 512,num_heads = 8,vocab_size =17,dropout_rate = 0.1,dense_dropout=0.5): super(PredictModel_test, self).__init__() self.encoder = Encoder_test(num_layers=num_layers,d_model=d_model, num_heads=num_heads,dff=dff,input_vocab_size=vocab_size,maximum_position_encoding=200,rate=dropout_rate) self.fc1 = tf.keras.layers.Dense(256, activation=tf.keras.layers.LeakyReLU(0.1)) self.dropout = tf.keras.layers.Dropout(dense_dropout) self.fc2 = tf.keras.layers.Dense(1) def call(self,x,adjoin_matrix,mask,training=False): x,att,xs = self.encoder(x,training=training,mask=mask,adjoin_matrix=adjoin_matrix) x = x[:, 0, :] x = self.fc1(x) x = self.dropout(x, training=training) x = self.fc2(x) return x,att,xs