|
|
|
import tensorflow as tf |
|
from library.Multihead_attention import MultiHeadAttention,point_wise_feed_forward_network |
|
from library.self_attention import positional_encoding_1d,positional_encoding_2d |
|
|
|
class EncoderLayer(tf.keras.layers.Layer): |
|
def __init__(self, d_model, num_heads, dff, rate=0.1): |
|
super(EncoderLayer, self).__init__() |
|
|
|
self.mha = MultiHeadAttention(d_model, num_heads) |
|
self.ffn = point_wise_feed_forward_network(d_model, dff) |
|
|
|
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
|
|
self.dropout1 = tf.keras.layers.Dropout(rate) |
|
self.dropout2 = tf.keras.layers.Dropout(rate) |
|
|
|
def call(self, x, training, mask=None): |
|
|
|
attn_output, _ = self.mha(x, x, x, mask) |
|
attn_output = self.dropout1(attn_output, training=training) |
|
out1 = self.layernorm1(x + attn_output) |
|
|
|
ffn_output = self.ffn(out1) |
|
ffn_output = self.dropout2(ffn_output, training=training) |
|
out2 = self.layernorm2(out1 + ffn_output) |
|
|
|
return out2 |
|
|
|
|
|
class DecoderLayer(tf.keras.layers.Layer): |
|
def __init__(self, d_model, num_heads, dff, rate=0.1): |
|
super(DecoderLayer, self).__init__() |
|
|
|
self.mha1 = MultiHeadAttention(d_model, num_heads) |
|
self.mha2 = MultiHeadAttention(d_model, num_heads) |
|
|
|
self.ffn = point_wise_feed_forward_network(d_model, dff) |
|
|
|
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
|
|
self.dropout1 = tf.keras.layers.Dropout(rate) |
|
self.dropout2 = tf.keras.layers.Dropout(rate) |
|
self.dropout3 = tf.keras.layers.Dropout(rate) |
|
|
|
|
|
def call(self, x, enc_output, training, |
|
look_ahead_mask=None, padding_mask=None): |
|
|
|
|
|
|
|
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) |
|
attn1 = self.dropout1(attn1, training=training) |
|
out1 = self.layernorm1(attn1 + x) |
|
|
|
|
|
attn2, attn_weights_block2 = self.mha2( |
|
enc_output, enc_output, out1, padding_mask) |
|
attn2 = self.dropout2(attn2, training=training) |
|
out2 = self.layernorm2(attn2 + out1) |
|
|
|
ffn_output = self.ffn(out2) |
|
ffn_output = self.dropout3(ffn_output, training=training) |
|
out3 = self.layernorm3(ffn_output + out2) |
|
|
|
return out3, attn_weights_block1, attn_weights_block2 |
|
|
|
|
|
class Encoder(tf.keras.layers.Layer): |
|
def __init__(self, num_layers, d_model, num_heads, dff, |
|
row_size,col_size,rate=0.1): |
|
super(Encoder, self).__init__() |
|
|
|
self.d_model = d_model |
|
self.num_layers = num_layers |
|
|
|
self.embedding = tf.keras.layers.Dense(self.d_model,activation='relu') |
|
self.pos_encoding = positional_encoding_2d(row_size,col_size, |
|
self.d_model) |
|
|
|
|
|
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) |
|
for _ in range(num_layers)] |
|
|
|
self.dropout = tf.keras.layers.Dropout(rate) |
|
|
|
def call(self, x, training, mask=None): |
|
|
|
seq_len = tf.shape(x)[1] |
|
|
|
|
|
x = self.embedding(x) |
|
x += self.pos_encoding[:, :seq_len, :] |
|
|
|
x = self.dropout(x, training=training) |
|
|
|
for i in range(self.num_layers): |
|
x = self.enc_layers[i](x, training, mask) |
|
|
|
return x |
|
|
|
|
|
|
|
|
|
class Decoder(tf.keras.layers.Layer): |
|
def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, |
|
maximum_position_encoding, rate=0.1): |
|
super(Decoder, self).__init__() |
|
|
|
self.d_model = d_model |
|
self.num_layers = num_layers |
|
|
|
self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model) |
|
self.pos_encoding = positional_encoding_1d(maximum_position_encoding, d_model) |
|
|
|
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) |
|
for _ in range(num_layers)] |
|
self.dropout = tf.keras.layers.Dropout(rate) |
|
|
|
def call(self, x, enc_output, training, |
|
look_ahead_mask=None, padding_mask=None): |
|
|
|
seq_len = tf.shape(x)[1] |
|
attention_weights = {} |
|
|
|
x = self.embedding(x) |
|
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) |
|
x += self.pos_encoding[:, :seq_len, :] |
|
|
|
x = self.dropout(x, training=training) |
|
|
|
for i in range(self.num_layers): |
|
x, block1, block2 = self.dec_layers[i](x, enc_output, training, |
|
look_ahead_mask, padding_mask) |
|
|
|
attention_weights['decoder_layer{}_block1'.format(i+1)] = block1 |
|
attention_weights['decoder_layer{}_block2'.format(i+1)] = block2 |
|
|
|
|
|
return x, attention_weights |