Spaces:
Runtime error
Runtime error
import tensorflow as tf | |
from tensorflow.keras import layers | |
from tensorflow.keras.layers import Dense, Flatten, Conv2D | |
from tensorflow.keras import Model | |
from matplotlib import pyplot as plt | |
from PIL import Image,ImageFont, ImageDraw | |
from skimage.transform import rescale, resize | |
import numpy as np | |
import re | |
import math | |
import copy | |
import random | |
import time | |
import data | |
import cv2 | |
#import data | |
rng = np.random.RandomState(int(time.time())) | |
#### training setup parameters #### | |
lambda_val=1e-4 | |
gamma_val=1 | |
num_classes=130 | |
## Utility functions used to initialize vaiables | |
def norm_weight(fan_in, fan_out): | |
W_bound = np.sqrt(6.0 / (fan_in + fan_out)) | |
return np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=(fan_in, fan_out)), dtype=np.float32) | |
def conv_norm_weight(nin, nout, kernel_size): | |
filter_shape = (kernel_size[0], kernel_size[1], nin, nout) | |
fan_in = kernel_size[0] * kernel_size[1] * nin | |
fan_out = kernel_size[0] * kernel_size[1] * nout | |
W_bound = np.sqrt(6. / (fan_in + fan_out)) | |
W = np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=np.float32) | |
return W.astype('float32') | |
def ortho_weight(ndim): | |
W = np.random.randn(ndim, ndim) | |
u, s, v = np.linalg.svd(W) | |
return u.astype('float32') | |
##### | |
class DenseEncoder(layers.Layer): | |
def __init__(self, blocks, # number of dense blocks | |
level, # number of levels in each blocks | |
growth_rate, # growth rate in DenseNet paper: k | |
istraining, | |
dropout_rate=0.2, # keep-rate of dropout layer | |
dense_channels=0, # filter numbers of transition layer's input | |
transition=0.5, # rate of comprssion | |
input_conv_filters=48, # filter numbers of conv2d before dense blocks | |
input_conv_stride=(2,2), # stride of conv2d before dense blocks | |
input_conv_kernel=(7,7), **kwargs): # kernel size of conv2d before dense blocks | |
super(DenseEncoder, self).__init__( **kwargs) | |
self.blocks = blocks | |
self.growth_rate = growth_rate | |
self.training = istraining | |
self.dense_channels = dense_channels | |
self.level = level | |
self.dropout_rate = dropout_rate | |
self.transition = transition | |
self.input_conv_kernel = input_conv_kernel | |
self.input_conv_stride = input_conv_stride | |
self.input_conv_filters = input_conv_filters | |
self.limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel) | |
self.conv1=tf.keras.layers.Conv2D(filters=self.input_conv_filters, kernel_size=self.input_conv_kernel ,strides=self.input_conv_stride, padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-self.limit, self.limit)) | |
self.batch_norm=tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.input_conv_filters),1.0/math.sqrt(self.input_conv_filters)), epsilon=0.0001) | |
self.relu=tf.keras.layers.ReLU() | |
self.maxpool=tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2), padding='same') | |
self.dropout=tf.keras.layers.Dropout(rate=self.dropout_rate) | |
self.avgpool=tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=(2, 2), padding='same') | |
self.conv=[] | |
self.conv2=[] | |
self.batchnorm=[] | |
self.batchnorm2=[] | |
self.dense_channels += self.input_conv_filters | |
for i in range(self.blocks): | |
for j in range(self.level): | |
limit = self.bound(self.dense_channels, 4 * self.growth_rate, [1,1]) | |
self.conv.append(tf.keras.layers.Conv2D(filters=4 * self.growth_rate, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit))) | |
self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(4 * self.growth_rate),1.0/math.sqrt(4 * self.growth_rate)), epsilon=0.0001)) | |
limit = self.bound(4 * self.growth_rate, self.growth_rate, [3,3]) | |
self.conv.append(tf.keras.layers.Conv2D(filters=self.growth_rate, kernel_size=(3,3) ,strides=(1,1), padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit))) | |
self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.growth_rate),1.0/math.sqrt(self.growth_rate)), epsilon=0.0001)) | |
self.dense_channels += self.growth_rate | |
if i < self.blocks - 1: | |
compressed_channels = int(self.dense_channels * self.transition) | |
#### new dense channels for new dense block #### | |
self.dense_channels = compressed_channels | |
limit = self.bound(self.dense_channels, compressed_channels, [1,1]) | |
self.conv2.append(tf.keras.layers.Conv2D(compressed_channels, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , activation=None, kernel_initializer=tf.random_uniform_initializer(-limit, limit))) | |
self.batchnorm2.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.dense_channels),1.0/math.sqrt(self.dense_channels)), epsilon=0.0001)) | |
def bound(self, nin, nout, kernel): | |
fin = nin * kernel[0] * kernel[1] | |
fout = nout * kernel[0] * kernel[1] | |
return np.sqrt(6. / (fin + fout)) | |
def dense_net(self, input_x, mask_x): | |
#### before flowing into dense blocks #### | |
input_x=tf.expand_dims(input=input_x, axis=3) | |
x = input_x | |
#limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel) | |
x =self.conv1(x) | |
mask_x = mask_x[:, 0::2, 0::2] | |
x =self.batch_norm(x) | |
x =self.relu(x) | |
x=self.maxpool(x) | |
input_pre = x | |
mask_x = mask_x[:, 0::2, 0::2] | |
dense_out = x | |
cind=0 | |
bind=0 | |
cind2=0 | |
bind2=0 | |
#### flowing into dense blocks and transition_layer #### | |
for i in range(self.blocks): | |
for j in range(self.level): | |
#### [1, 1] convolution part for bottleneck #### | |
x =self.conv[cind](x) | |
cind += 1 | |
x =self.batchnorm[bind](x) | |
bind += 1 | |
x =self.relu(x) | |
x =self.dropout(x,training=self.training) | |
#### [3, 3] convolution part for regular convolve operation | |
x =self.conv[cind](x) | |
cind += 1 | |
x =self.batchnorm[bind](x) | |
bind += 1 | |
x =self.relu(x) | |
x =self.dropout(x,training=self.training) | |
dense_out = tf.concat([dense_out, x], axis=3) | |
x = dense_out | |
#### calculate the filter number of dense block's output #### | |
if i < self.blocks - 1: | |
#### new dense channels for new dense block #### | |
x =self.conv2[cind2](x) | |
cind2 += 1 | |
x =self.batchnorm2[bind2](x) | |
bind2 += 1 | |
x =self.relu(x) | |
x =self.dropout(x,training=self.training) | |
x=self.avgpool(x) | |
dense_out = x | |
mask_x = mask_x[:, 0::2, 0::2] | |
return dense_out, mask_x | |
''' | |
ContextualAttention class implements contextual attention mechanism. | |
''' | |
class ContextualAttention(layers.Layer): | |
def __init__(self, channels, # output of DenseEncoder | [batch, h, w, channels] | |
dim_decoder, dim_attend, **kwargs): # decoder hidden state:$h_{t-1}$ | [batch, dec_dim] | |
super(ContextualAttention, self).__init__( **kwargs) | |
self.channels = channels | |
self.coverage_kernel = [11,11] # kernel size of $Q$ | |
self.coverage_filters = dim_attend # filter numbers of $Q$ | 512 | |
self.dim_decoder = dim_decoder # 256 | |
self.dim_attend = dim_attend # unified dim of three parts calculating $e_ti$ i.e. | |
# $Q*beta_t$, $U_a * a_i$, $W_a x h_{t-1}$ | 512 | |
self.U_f = tf.Variable(norm_weight(self.coverage_filters, self.dim_attend), name='U_f') # $U_f x f_i$ | [cov_filters, dim_attend] | |
self.U_f_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_f_b') # $U_f x f_i + U_f_b$ | [dim_attend, ] | |
self.U_a = tf.Variable(norm_weight(self.channels,self.dim_attend), name='U_a') # $U_a x a_i$ | [annotatin_channels, dim_attend] | |
self.U_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_a_b') # $U_a x a_i + U_a_b$ | [dim_attend, ] | |
self.W_a = tf.Variable(norm_weight(self.dim_decoder,self.dim_attend), name='W_a') # $W_a x h_{t_1}$ | [dec_dim, dim_attend] | |
self.W_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='W_a_b') # $W_a x h_{t-1} + W_a_b$ | [dim_attend, ] | |
self.V_a = tf.Variable(norm_weight(self.dim_attend, 1), name='V_a') # $V_a x tanh(A + B + C)$ | [dim_attend, 1] | |
self.V_a_b = tf.Variable(np.zeros((1,)).astype('float32'), name='V_a_b') # $V_a x tanh(A + B + C) + V_a_b$ | [1, ] | |
self.alpha_past_filter = tf.Variable(conv_norm_weight(1, self.dim_attend, self.coverage_kernel), name='alpha_past_filter') | |
def get_context(self, annotation4ctx, h_t_1, alpha_past4ctx, a_mask): | |
#### calculate $U_f x f_i$ #### | |
alpha_past_4d = alpha_past4ctx[:, :, :, None] | |
Ft = tf.nn.conv2d(alpha_past_4d, filters=self.alpha_past_filter, strides=[1, 1, 1, 1], padding='SAME') | |
coverage_vector = tf.tensordot(Ft, self.U_f, axes=1) #+ self.U_f_b # [batch, h, w, dim_attend] | |
#### calculate $U_a x a_i$ #### | |
dense_encoder_vector = tf.tensordot(annotation4ctx, self.U_a, axes=1) #+ self.U_a_b # [batch, h, w, dim_attend] | |
#### calculate $W_a x h_{t - 1}$ #### | |
speller_vector = tf.tensordot(h_t_1, self.W_a, axes=1) #+ self.W_a_b # [batch, dim_attend] | |
speller_vector = speller_vector[:, None, None, :] # [batch, None, None, dim_attend] | |
tanh_vector = tf.tanh(coverage_vector + dense_encoder_vector + speller_vector + self.U_f_b) # [batch, h, w, dim_attend] | |
e_ti = tf.tensordot(tanh_vector, self.V_a, axes=1) + self.V_a_b # [batch, h, w, 1] | |
alpha = tf.exp(e_ti) | |
alpha = tf.squeeze(alpha, axis=3) | |
if a_mask is not None: | |
alpha = alpha * a_mask | |
alpha = alpha / tf.reduce_sum(alpha, axis=[1, 2], keepdims=True) # normlized weights | [batch, h, w] | |
alpha_past4ctx += alpha # accumalated weights matrix | [batch, h, w] | |
context = tf.reduce_sum(annotation4ctx * alpha[:, :, :, None], axis=[1, 2]) # context vector | [batch, feature_channels] | |
return context, alpha, alpha_past4ctx | |
''' | |
Decoder class implements 2 layerd Decoder (GRU) which decodes an input image | |
and outputs a seuence of characters using attention mechanism . | |
''' | |
class Decoder(layers.Layer): | |
def __init__(self, hidden_dim, word_dim, contextual_attention, context_dim, **kwargs): | |
super(Decoder, self).__init__( **kwargs) | |
self.contextual_attention = contextual_attention # inner-instance of contextual_attention to provide context | |
self.context_dim = context_dim # context dime 684 | |
self.hidden_dim = hidden_dim # dim of hidden state 256 | |
self.word_dim = word_dim # dim of embedding word 256 | |
##GRU 1 weights initialization starts here | |
self.W_yz_yr = tf.Variable(np.concatenate( | |
[norm_weight(self.word_dim, self.hidden_dim), norm_weight(self.word_dim, self.hidden_dim)], axis=1), name='W_yz_yr') # [dim_word, 2 * dim_decoder] | |
self.b_yz_yr = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_yz_yr') | |
self.U_hz_hr = tf.Variable(np.concatenate( | |
[ortho_weight(self.hidden_dim),ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr') # [dim_hidden, 2 * dim_hidden] | |
self.W_yh = tf.Variable(norm_weight(self.word_dim, | |
self.hidden_dim), name='W_yh') | |
self.b_yh = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_yh') # [dim_decoder, ] | |
self.U_rh = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh') # [dim_hidden, dim_hidden] | |
##GRU 2 weights initialization starts here | |
self.U_hz_hr_nl = tf.Variable(np.concatenate( | |
[ortho_weight(self.hidden_dim), ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr_nl') # [dim_hidden, 2 * dim_hidden] non_linear | |
self.b_hz_hr_nl = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_hz_hr_nl') # [2 * dim_hidden, ] | |
self.W_c_z_r = tf.Variable(norm_weight(self.context_dim, | |
2 * self.hidden_dim), name='W_c_z_r') | |
self.U_rh_nl = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh_nl') | |
self.b_rh_nl = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_rh_nl') | |
self.W_c_h_nl = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='W_c_h_nl') | |
def get_ht_ctx(self, emb_y, target_hidden_state_0, annotations, a_m, y_m): | |
res = tf.scan(self.one_time_step, elems=(emb_y, y_m), | |
initializer=(target_hidden_state_0, | |
tf.zeros([tf.shape(annotations)[0], self.context_dim]), | |
tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]), | |
tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]), | |
annotations, a_m)) | |
return res | |
def one_time_step(self, tuple_h0_ctx_alpha_alpha_past_annotation, tuple_emb_mask): | |
target_hidden_state_0 = tuple_h0_ctx_alpha_alpha_past_annotation[0] | |
alpha_past_one = tuple_h0_ctx_alpha_alpha_past_annotation[3] | |
annotation_one = tuple_h0_ctx_alpha_alpha_past_annotation[4] | |
a_mask = tuple_h0_ctx_alpha_alpha_past_annotation[5] | |
emb_y, y_mask = tuple_emb_mask | |
#GRU 1 starts here | |
emb_y_z_r_vector = tf.tensordot(emb_y, self.W_yz_yr, axes=1) + \ | |
self.b_yz_yr # [batch, 2 * dim_decoder] | |
hidden_z_r_vector = tf.tensordot(target_hidden_state_0, | |
self.U_hz_hr, axes=1) # [batch, 2 * dim_decoder] | |
pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \ | |
hidden_z_r_vector) # [batch, 2 * dim_decoder] | |
r1 = pre_z_r_vector[:, :self.hidden_dim] # [batch, dim_decoder] | |
z1 = pre_z_r_vector[:, self.hidden_dim:] # [batch, dim_decoder] | |
emb_y_h_vector = tf.tensordot(emb_y, self.W_yh, axes=1) + \ | |
self.b_yh # [batch, dim_decoder] | |
hidden_r_h_vector = tf.tensordot(target_hidden_state_0, | |
self.U_rh, axes=1) # [batch, dim_decoder] | |
hidden_r_h_vector *= r1 | |
pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector) | |
pre_h = z1 * target_hidden_state_0 + (1. - z1) * pre_h_proposal | |
if y_mask is not None: | |
pre_h = y_mask[:, None] * pre_h + (1. - y_mask)[:, None] * target_hidden_state_0 | |
context, alpha, alpha_past_one = self.contextual_attention.get_context(annotation_one, pre_h, alpha_past_one, a_mask) # [batch, dim_ctx] | |
#GRU 2 starts here | |
emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.U_hz_hr_nl, axes=1) + self.b_hz_hr_nl | |
context_z_r_vector = tf.tensordot(context, self.W_c_z_r, axes=1) | |
z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector) | |
r2 = z_r_vector[:, :self.hidden_dim] | |
z2 = z_r_vector[:, self.hidden_dim:] | |
emb_y_h_nl_vector = tf.tensordot(pre_h, self.U_rh_nl, axes=1) | |
emb_y_h_nl_vector *= r2 | |
emb_y_h_nl_vector=emb_y_h_nl_vector+ self.b_rh_nl # bias added after point wise multiplication with r2 | |
context_h_vector = tf.tensordot(context, self.W_c_h_nl, axes=1) | |
h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector) | |
h = z2 * pre_h + (1. - z2) * h_proposal | |
if y_mask is not None: | |
h = y_mask[:, None] * h + (1. - y_mask)[:, None] * pre_h | |
return h, context, alpha, alpha_past_one, annotation_one, a_mask | |
''' | |
CALText class is the main class. This class uses below three classes: | |
1) DenseEncoder (Encoder) | |
2) ContextualAttention (Contextual attention mechnism) | |
3) Decoder (2 layerd GRU Decoder) | |
CALText class implements two functions get_cost and get_sample, which are actually used for cost calculation and decoding. | |
''' | |
class CALText(layers.Layer): | |
def __init__(self, dense_encoder, contextual_attention, decoder, hidden_dim, word_dim, context_dim, target_dim, istraining,**kwargs): | |
super(CALText, self).__init__( **kwargs) | |
#self.batch_size = batch_size | |
self.hidden_dim = hidden_dim | |
self.word_dim = word_dim | |
self.context_dim = context_dim | |
self.target_dim = target_dim | |
self.embed_matrix = tf.Variable(norm_weight(self.target_dim, self.word_dim), name='embed') | |
self.dense_encoder = dense_encoder | |
self.contextual_attention = contextual_attention | |
self.decoder = decoder | |
self.Wa2h = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='Wa2h') | |
self.ba2h = tf.Variable(np.zeros((self.hidden_dim,)).astype('float32'), name='ba2h') | |
self.Wc = tf.Variable(norm_weight(self.context_dim, self.word_dim), name='Wc') | |
self.bc = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bc') | |
self.Wh = tf.Variable(norm_weight(self.hidden_dim, self.word_dim), name='Wh') | |
self.bh = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bh') | |
self.Wy = tf.Variable(norm_weight(self.word_dim, self.word_dim), name='Wy') | |
self.by = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='by') | |
self.Wo = tf.Variable(norm_weight(self.word_dim//2, self.target_dim), name='Wo') | |
self.bo = tf.Variable(np.zeros((self.target_dim,)).astype('float32'), name='bo') | |
self.training = istraining | |
self.dropout=tf.keras.layers.Dropout(rate=0.2) | |
def get_cost(self, cost_annotation, cost_y, a_m, y_m,alpha_reg): | |
#### step: 1 prepration of embedding of labels sequences #### | |
timesteps = tf.shape(cost_y)[0] | |
batch_size = tf.shape(cost_y)[1] | |
emb_y = tf.nn.embedding_lookup(self.embed_matrix, tf.reshape(cost_y, [-1])) | |
emb_y = tf.reshape(emb_y, [timesteps, batch_size, self.word_dim]) | |
emb_pad = tf.fill((1, batch_size, self.word_dim), 0.0) | |
emb_shift = tf.concat([emb_pad ,tf.strided_slice(emb_y, [0, 0, 0], [-1, batch_size, self.word_dim], [1, 1, 1])], axis=0) | |
new_emb_y = emb_shift | |
#### step: 2 calculation of h_0 #### | |
anno_mean = tf.reduce_sum(cost_annotation * a_m[:, :, :, None], axis=[1, 2]) / tf.reduce_sum(a_m, axis=[1, 2])[:, None] | |
h_0 = tf.tensordot(anno_mean, self.Wa2h, axes=1) + self.ba2h # [batch, hidden_dim] | |
h_0 = tf.tanh(h_0) | |
#### step: 3 calculation of h_t and c_t at all time steps #### | |
ret = self.decoder.get_ht_ctx(new_emb_y, h_0, cost_annotation, a_m, y_m) | |
h_t = ret[0] # h_t of all timesteps [timesteps, batch, hidden_dim] | |
c_t = ret[1] # c_t of all timesteps [timesteps, batch, context_dim] | |
alpha=ret[2] # alpha of all timesteps [timesteps, batch, h, w] | |
#### step: 4 calculation of cost using h_t, c_t and y_t_1 #### | |
y_t_1 = new_emb_y # shifted y | [1:] = [:-1] | |
logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh | |
logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc | |
logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by | |
logit = logit_pre + logit_ctx + logit_gru + self.bh | |
shape = tf.shape(logit) | |
logit = tf.reshape(logit, [shape[0], -1, shape[2]//2, 2]) | |
logit = tf.reduce_max(logit, axis=3) | |
logit =self.dropout(logit,training=self.training) | |
#logit = tf.layers.dropout(inputs=logit, rate=0.2, training=self.training) | |
logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo | |
logit_shape = tf.shape(logit) | |
logit = tf.reshape(logit, [-1,logit_shape[2]]) | |
cost = tf.nn.softmax_cross_entropy_with_logits(logits=logit, labels=tf.one_hot(tf.reshape(cost_y, [-1]),depth=self.target_dim)) | |
#### max pooling on vector with size equal to word_dim #### | |
cost = tf.multiply(cost, tf.reshape(y_m, [-1])) | |
cost = tf.reshape(cost, [shape[0], shape[1]]) | |
cost = tf.reduce_sum(cost, axis=0) | |
cost = tf.reduce_mean(cost) | |
#### alpha L1 regularization #### | |
alpha_sum=tf.reduce_mean(tf.reduce_sum(tf.reduce_sum(tf.abs(alpha), axis=[2, 3]),axis=0)) | |
cost = tf.cond(tf.cast(alpha_reg > 0, tf.bool), lambda: cost + (alpha_reg * alpha_sum), lambda: cost) | |
return cost | |
def get_word(self, sample_y, sample_h_pre, alpha_past_pre, sample_annotation,training_mode): | |
emb = tf.cond(pred=sample_y[0] < 0, | |
true_fn=lambda: tf.fill((1, self.word_dim), 0.0), | |
false_fn=lambda: tf.nn.embedding_lookup(params=self.embed_matrix, ids=sample_y) | |
) | |
#ret = self.decoder.one_time_step((h_pre, None, None, alpha_past_pre, annotation, None), (emb, None)) | |
emb_y_z_r_vector = tf.tensordot(emb, self.decoder.W_yz_yr, axes=1) + \ | |
self.decoder.b_yz_yr # [batch, 2 * dim_decoder] | |
hidden_z_r_vector = tf.tensordot(sample_h_pre, | |
self.decoder.U_hz_hr, axes=1) # [batch, 2 * dim_decoder] | |
pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \ | |
hidden_z_r_vector) # [batch, 2 * dim_decoder] | |
r1 = pre_z_r_vector[:, :self.decoder.hidden_dim] # [batch, dim_decoder] | |
z1 = pre_z_r_vector[:, self.decoder.hidden_dim:] # [batch, dim_decoder] | |
emb_y_h_vector = tf.tensordot(emb, self.decoder.W_yh, axes=1) + \ | |
self.decoder.b_yh # [batch, dim_decoder] | |
hidden_r_h_vector = tf.tensordot(sample_h_pre, | |
self.decoder.U_rh, axes=1) # [batch, dim_decoder] | |
hidden_r_h_vector *= r1 | |
pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector) | |
pre_h = z1 * sample_h_pre + (1. - z1) * pre_h_proposal | |
context, alphacc, alpha_past = self.decoder.contextual_attention.get_context(sample_annotation, pre_h, alpha_past_pre, None) # [batch, dim_ctx] | |
emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.decoder.U_hz_hr_nl, axes=1) + self.decoder.b_hz_hr_nl | |
context_z_r_vector = tf.tensordot(context, self.decoder.W_c_z_r, axes=1) | |
z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector) | |
r2 = z_r_vector[:, :self.decoder.hidden_dim] | |
z2 = z_r_vector[:, self.decoder.hidden_dim:] | |
emb_y_h_nl_vector = tf.tensordot(pre_h, self.decoder.U_rh_nl, axes=1) + self.decoder.b_rh_nl | |
emb_y_h_nl_vector *= r2 | |
context_h_vector = tf.tensordot(context, self.decoder.W_c_h_nl, axes=1) | |
h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector) | |
h = z2 * pre_h + (1. - z2) * h_proposal | |
h_t = h | |
c_t = context | |
alpha_past_t = alpha_past | |
y_t_1 = emb | |
logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh | |
logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc | |
logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by | |
logit = logit_pre + logit_ctx + logit_gru + self.bh # batch x word_dim | |
shape = tf.shape(input=logit) | |
logit = tf.reshape(logit, [-1, shape[1]//2, 2]) | |
logit = tf.reduce_max(input_tensor=logit, axis=2) | |
logit = self.dropout(logit,training=training_mode) | |
logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo | |
next_probs = tf.nn.softmax(logits=logit) | |
next_word = tf.reduce_max(input_tensor=tf.random.categorical(logits=next_probs, num_samples=1), axis=1) | |
return next_probs, next_word, h_t, alpha_past_t, alphacc | |
class CALText_Model(tf.keras.Model): # Subclass from tf.keras.model | |
def __init__(self,training): # Define All your Variables Here. And other configurations | |
super(CALText_Model, self).__init__() | |
self.dense_blocks=3 | |
self.levels_count=16 | |
self.growth=24 | |
#### decoder setup parameters #### | |
self.hidden_dim=256 | |
self.word_dim=256 | |
self.dim_attend=512 | |
self.dense_encoder = DenseEncoder(blocks=self.dense_blocks,level=self.levels_count, growth_rate=self.growth, istraining=training) | |
self.contextual_attention = ContextualAttention(684, self.hidden_dim, self.dim_attend) | |
self.decoder = Decoder(self.hidden_dim, self.word_dim, self.contextual_attention, 684) ##annotation.shape.as_list()[3]=684 | |
self.caltext = CALText(self.dense_encoder, self.contextual_attention, self.decoder, self.hidden_dim, self.word_dim, 684 ,num_classes ,istraining=training) | |
def call(self, x, x_mask, y=None, y_mask=None, training=True): # Use the variables defined here.... this is forward prop | |
annotation, anno_mask = self.dense_encoder.dense_net(x, x_mask) | |
if(y==None): | |
return annotation | |
else: | |
cost = self.caltext.get_cost(annotation, y, anno_mask, y_mask, gamma_val) | |
return cost,annotation | |
def get_hidden_state_0(self, anno): | |
hidden_state_0 = tf.tanh(tf.tensordot(tf.reduce_mean(input_tensor=anno, axis=[1, 2]), self.caltext.Wa2h, axes=1) + self.caltext.ba2h) # [batch, hidden_dim] | |
return hidden_state_0 | |
##########apply L2 regularization on weights | |
def get_loss(loss,model): | |
#print(model.trainable_weights) | |
for layer in model.trainable_weights: | |
arr=(layer.name).split("/") | |
if not arr[len(arr)-1].startswith('conv2d'): | |
loss += lambda_val * tf.reduce_sum(input_tensor=tf.pow(layer, 2)) | |
return loss | |
################### | |
def get_word(next_w, next_state, next_alpha_past, ctx, model,training): | |
return model.caltext.get_word(next_w, next_state, next_alpha_past, ctx,training) | |
def get_sample(ctx0, h_0, k , maxlen, stochastic, training, model): | |
sample = [] | |
sample_score = [] | |
sample_att=[] | |
live_k = 1 | |
dead_k = 0 | |
hyp_samples = [[]] * 1 | |
hyp_scores = np.zeros(live_k).astype('float32') | |
hyp_states = [] | |
next_alpha_past = np.zeros((ctx0.shape[0], ctx0.shape[1], ctx0.shape[2])).astype('float32') | |
emb_0 = np.zeros((ctx0.shape[0], 256)) | |
next_w = -1 * np.ones((1,)).astype('int64') | |
next_state = h_0 | |
#tf.autograph.experimental.set_loop_options(shape_invariants=[(next_alpha_past, tf.TensorShape([None]))]) | |
for ii in range(maxlen): | |
ctx = np.tile(ctx0, [live_k, 1, 1, 1]) | |
next_p, next_w, next_state, next_alpha_past,contexVec = get_word(next_w, next_state, next_alpha_past, ctx, model,training) | |
sample_att.append(contexVec[0,:,:]) | |
if stochastic: | |
nw = next_w[0] | |
sample.append(nw) | |
sample_score += next_p[0, nw] | |
if nw == 0: | |
break | |
else: | |
cand_scores = hyp_scores[:, None] - np.log(next_p) | |
cand_flat = cand_scores.flatten() | |
ranks_flat = cand_flat.argsort()[:(k-dead_k)] | |
voc_size = next_p.shape[1] | |
assert voc_size==num_classes | |
trans_indices = ranks_flat // voc_size | |
word_indices = ranks_flat % voc_size | |
costs = cand_flat[ranks_flat] | |
new_hyp_samples = [] | |
new_hyp_scores = np.zeros(k-dead_k).astype('float32') | |
new_hyp_states = [] | |
new_hyp_alpha_past = [] | |
for idx, [ti, wi] in enumerate(zip(trans_indices, word_indices)): | |
new_hyp_samples.append(hyp_samples[ti]+[wi]) | |
new_hyp_scores[idx] = copy.copy(costs[idx]) | |
new_hyp_states.append(copy.copy(next_state[ti])) | |
new_hyp_alpha_past.append(copy.copy(next_alpha_past[ti])) | |
new_live_k = 0 | |
hyp_samples = [] | |
hyp_scores = [] | |
hyp_states = [] | |
hyp_alpha_past = [] | |
for idx in range(len(new_hyp_samples)): | |
if new_hyp_samples[idx][-1] == 0: # <eol> | |
sample.append(new_hyp_samples[idx]) | |
sample_score.append(new_hyp_scores[idx]) | |
dead_k += 1 | |
else: | |
new_live_k += 1 | |
hyp_samples.append(new_hyp_samples[idx]) | |
hyp_scores.append(new_hyp_scores[idx]) | |
hyp_states.append(new_hyp_states[idx]) | |
hyp_alpha_past.append(new_hyp_alpha_past[idx]) | |
hyp_scores = np.array(hyp_scores) | |
live_k = new_live_k | |
if new_live_k < 1: | |
break | |
if dead_k >= k: | |
break | |
next_w = np.array([w1[-1] for w1 in hyp_samples]) | |
next_state = np.array(hyp_states) | |
next_alpha_past = np.array(hyp_alpha_past) | |
if not stochastic: | |
# dump every remaining one | |
if live_k > 0: | |
for idx in range(live_k): | |
sample.append(hyp_samples[idx]) | |
sample_score.append(hyp_scores[idx]) | |
return sample, sample_score,sample_att | |
#######################Predict | |
def execute_model(xx,xx_mask,CALTEXT): | |
anno = CALTEXT(xx,xx_mask, training=False) | |
hidden_state_0 = CALTEXT.get_hidden_state_0(anno) | |
return anno,hidden_state_0 | |
def predict(CALTEXT, images, x_mask): | |
# training=False is only needed if there are layers with different | |
# behavior during training versus inference (e.g. Dropout). | |
batch_loss=0 | |
img_ind=1 | |
for img_ind in range(len(images)): | |
xx = images[img_ind][tf.newaxis, ... ] | |
xx_mask = x_mask[img_ind][tf.newaxis, ... ] | |
anno,hidden_state_0=execute_model(xx,xx_mask,CALTEXT) | |
sample, score,hypalpha=get_sample(anno, hidden_state_0,10, 130, False, False, CALTEXT) | |
score = score / np.array([len(s) for s in sample]) | |
ss = sample[score.argmin()] | |
img_ind=img_ind+1 | |
ind=0 | |
num=int(len(ss)/2) | |
#### output string | |
ind=0 | |
outstr=u'' | |
frames = [] | |
font = ImageFont.truetype("Jameel Noori Nastaleeq.ttf",60) | |
worddicts_r=data.load_dict_picklefile("vocabulary.pkl") | |
while (ind<len(ss)-1): | |
k=(len(ss)-2)-ind | |
outstr=outstr+worddicts_r[int(ss[k])] | |
textimg = Image.new('RGB', (1400,100),(255,255,255)) | |
drawtext = ImageDraw.Draw(textimg) | |
drawtext.text((20, 20), outstr ,(0,0,0),font=font) | |
fig,axes=plt.subplots(2,1) | |
axes[0].imshow(textimg) | |
axes[0].axis('off') | |
axes[1].axis('off') | |
axes[1].imshow(xx[0,:,:],cmap='gray') | |
visualization=resize(hypalpha[k], (100,800),anti_aliasing=True) | |
axes[1].imshow(255-(255 * visualization), alpha=0.2) | |
plt.axis('off') | |
plt.savefig('res.png') | |
frames.append(Image.fromarray(cv2.imread('res.png'), 'RGB')) | |
ind=ind+1 | |
frame_one = frames[0] | |
frame_one.save("vis.gif", format="GIF", append_images=frames,save_all=True, duration=300, loop=0) | |
gif_image="vis.gif" | |
return outstr,gif_image | |