import tensorflow as tf from tensorflow.keras import layers from tensorflow.keras.layers import Dense, Flatten, Conv2D from tensorflow.keras import Model from matplotlib import pyplot as plt from PIL import Image,ImageFont, ImageDraw from skimage.transform import rescale, resize import numpy as np import re import math import copy import random import time import data import cv2 #import data rng = np.random.RandomState(int(time.time())) #### training setup parameters #### lambda_val=1e-4 gamma_val=1 num_classes=130 ## Utility functions used to initialize vaiables def norm_weight(fan_in, fan_out): W_bound = np.sqrt(6.0 / (fan_in + fan_out)) return np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=(fan_in, fan_out)), dtype=np.float32) def conv_norm_weight(nin, nout, kernel_size): filter_shape = (kernel_size[0], kernel_size[1], nin, nout) fan_in = kernel_size[0] * kernel_size[1] * nin fan_out = kernel_size[0] * kernel_size[1] * nout W_bound = np.sqrt(6. / (fan_in + fan_out)) W = np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=np.float32) return W.astype('float32') def ortho_weight(ndim): W = np.random.randn(ndim, ndim) u, s, v = np.linalg.svd(W) return u.astype('float32') ##### class DenseEncoder(layers.Layer): def __init__(self, blocks, # number of dense blocks level, # number of levels in each blocks growth_rate, # growth rate in DenseNet paper: k istraining, dropout_rate=0.2, # keep-rate of dropout layer dense_channels=0, # filter numbers of transition layer's input transition=0.5, # rate of comprssion input_conv_filters=48, # filter numbers of conv2d before dense blocks input_conv_stride=(2,2), # stride of conv2d before dense blocks input_conv_kernel=(7,7), **kwargs): # kernel size of conv2d before dense blocks super(DenseEncoder, self).__init__( **kwargs) self.blocks = blocks self.growth_rate = growth_rate self.training = istraining self.dense_channels = dense_channels self.level = level self.dropout_rate = dropout_rate self.transition = transition self.input_conv_kernel = input_conv_kernel self.input_conv_stride = input_conv_stride self.input_conv_filters = input_conv_filters self.limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel) self.conv1=tf.keras.layers.Conv2D(filters=self.input_conv_filters, kernel_size=self.input_conv_kernel ,strides=self.input_conv_stride, padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-self.limit, self.limit)) self.batch_norm=tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.input_conv_filters),1.0/math.sqrt(self.input_conv_filters)), epsilon=0.0001) self.relu=tf.keras.layers.ReLU() self.maxpool=tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2), padding='same') self.dropout=tf.keras.layers.Dropout(rate=self.dropout_rate) self.avgpool=tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=(2, 2), padding='same') self.conv=[] self.conv2=[] self.batchnorm=[] self.batchnorm2=[] self.dense_channels += self.input_conv_filters for i in range(self.blocks): for j in range(self.level): limit = self.bound(self.dense_channels, 4 * self.growth_rate, [1,1]) self.conv.append(tf.keras.layers.Conv2D(filters=4 * self.growth_rate, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit))) self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(4 * self.growth_rate),1.0/math.sqrt(4 * self.growth_rate)), epsilon=0.0001)) limit = self.bound(4 * self.growth_rate, self.growth_rate, [3,3]) self.conv.append(tf.keras.layers.Conv2D(filters=self.growth_rate, kernel_size=(3,3) ,strides=(1,1), padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit))) self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.growth_rate),1.0/math.sqrt(self.growth_rate)), epsilon=0.0001)) self.dense_channels += self.growth_rate if i < self.blocks - 1: compressed_channels = int(self.dense_channels * self.transition) #### new dense channels for new dense block #### self.dense_channels = compressed_channels limit = self.bound(self.dense_channels, compressed_channels, [1,1]) self.conv2.append(tf.keras.layers.Conv2D(compressed_channels, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , activation=None, kernel_initializer=tf.random_uniform_initializer(-limit, limit))) self.batchnorm2.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.dense_channels),1.0/math.sqrt(self.dense_channels)), epsilon=0.0001)) def bound(self, nin, nout, kernel): fin = nin * kernel[0] * kernel[1] fout = nout * kernel[0] * kernel[1] return np.sqrt(6. / (fin + fout)) def dense_net(self, input_x, mask_x): #### before flowing into dense blocks #### input_x=tf.expand_dims(input=input_x, axis=3) x = input_x #limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel) x =self.conv1(x) mask_x = mask_x[:, 0::2, 0::2] x =self.batch_norm(x) x =self.relu(x) x=self.maxpool(x) input_pre = x mask_x = mask_x[:, 0::2, 0::2] dense_out = x cind=0 bind=0 cind2=0 bind2=0 #### flowing into dense blocks and transition_layer #### for i in range(self.blocks): for j in range(self.level): #### [1, 1] convolution part for bottleneck #### x =self.conv[cind](x) cind += 1 x =self.batchnorm[bind](x) bind += 1 x =self.relu(x) x =self.dropout(x,training=self.training) #### [3, 3] convolution part for regular convolve operation x =self.conv[cind](x) cind += 1 x =self.batchnorm[bind](x) bind += 1 x =self.relu(x) x =self.dropout(x,training=self.training) dense_out = tf.concat([dense_out, x], axis=3) x = dense_out #### calculate the filter number of dense block's output #### if i < self.blocks - 1: #### new dense channels for new dense block #### x =self.conv2[cind2](x) cind2 += 1 x =self.batchnorm2[bind2](x) bind2 += 1 x =self.relu(x) x =self.dropout(x,training=self.training) x=self.avgpool(x) dense_out = x mask_x = mask_x[:, 0::2, 0::2] return dense_out, mask_x ''' ContextualAttention class implements contextual attention mechanism. ''' class ContextualAttention(layers.Layer): def __init__(self, channels, # output of DenseEncoder | [batch, h, w, channels] dim_decoder, dim_attend, **kwargs): # decoder hidden state:$h_{t-1}$ | [batch, dec_dim] super(ContextualAttention, self).__init__( **kwargs) self.channels = channels self.coverage_kernel = [11,11] # kernel size of $Q$ self.coverage_filters = dim_attend # filter numbers of $Q$ | 512 self.dim_decoder = dim_decoder # 256 self.dim_attend = dim_attend # unified dim of three parts calculating $e_ti$ i.e. # $Q*beta_t$, $U_a * a_i$, $W_a x h_{t-1}$ | 512 self.U_f = tf.Variable(norm_weight(self.coverage_filters, self.dim_attend), name='U_f') # $U_f x f_i$ | [cov_filters, dim_attend] self.U_f_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_f_b') # $U_f x f_i + U_f_b$ | [dim_attend, ] self.U_a = tf.Variable(norm_weight(self.channels,self.dim_attend), name='U_a') # $U_a x a_i$ | [annotatin_channels, dim_attend] self.U_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_a_b') # $U_a x a_i + U_a_b$ | [dim_attend, ] self.W_a = tf.Variable(norm_weight(self.dim_decoder,self.dim_attend), name='W_a') # $W_a x h_{t_1}$ | [dec_dim, dim_attend] self.W_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='W_a_b') # $W_a x h_{t-1} + W_a_b$ | [dim_attend, ] self.V_a = tf.Variable(norm_weight(self.dim_attend, 1), name='V_a') # $V_a x tanh(A + B + C)$ | [dim_attend, 1] self.V_a_b = tf.Variable(np.zeros((1,)).astype('float32'), name='V_a_b') # $V_a x tanh(A + B + C) + V_a_b$ | [1, ] self.alpha_past_filter = tf.Variable(conv_norm_weight(1, self.dim_attend, self.coverage_kernel), name='alpha_past_filter') def get_context(self, annotation4ctx, h_t_1, alpha_past4ctx, a_mask): #### calculate $U_f x f_i$ #### alpha_past_4d = alpha_past4ctx[:, :, :, None] Ft = tf.nn.conv2d(alpha_past_4d, filters=self.alpha_past_filter, strides=[1, 1, 1, 1], padding='SAME') coverage_vector = tf.tensordot(Ft, self.U_f, axes=1) #+ self.U_f_b # [batch, h, w, dim_attend] #### calculate $U_a x a_i$ #### dense_encoder_vector = tf.tensordot(annotation4ctx, self.U_a, axes=1) #+ self.U_a_b # [batch, h, w, dim_attend] #### calculate $W_a x h_{t - 1}$ #### speller_vector = tf.tensordot(h_t_1, self.W_a, axes=1) #+ self.W_a_b # [batch, dim_attend] speller_vector = speller_vector[:, None, None, :] # [batch, None, None, dim_attend] tanh_vector = tf.tanh(coverage_vector + dense_encoder_vector + speller_vector + self.U_f_b) # [batch, h, w, dim_attend] e_ti = tf.tensordot(tanh_vector, self.V_a, axes=1) + self.V_a_b # [batch, h, w, 1] alpha = tf.exp(e_ti) alpha = tf.squeeze(alpha, axis=3) if a_mask is not None: alpha = alpha * a_mask alpha = alpha / tf.reduce_sum(alpha, axis=[1, 2], keepdims=True) # normlized weights | [batch, h, w] alpha_past4ctx += alpha # accumalated weights matrix | [batch, h, w] context = tf.reduce_sum(annotation4ctx * alpha[:, :, :, None], axis=[1, 2]) # context vector | [batch, feature_channels] return context, alpha, alpha_past4ctx ''' Decoder class implements 2 layerd Decoder (GRU) which decodes an input image and outputs a seuence of characters using attention mechanism . ''' class Decoder(layers.Layer): def __init__(self, hidden_dim, word_dim, contextual_attention, context_dim, **kwargs): super(Decoder, self).__init__( **kwargs) self.contextual_attention = contextual_attention # inner-instance of contextual_attention to provide context self.context_dim = context_dim # context dime 684 self.hidden_dim = hidden_dim # dim of hidden state 256 self.word_dim = word_dim # dim of embedding word 256 ##GRU 1 weights initialization starts here self.W_yz_yr = tf.Variable(np.concatenate( [norm_weight(self.word_dim, self.hidden_dim), norm_weight(self.word_dim, self.hidden_dim)], axis=1), name='W_yz_yr') # [dim_word, 2 * dim_decoder] self.b_yz_yr = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_yz_yr') self.U_hz_hr = tf.Variable(np.concatenate( [ortho_weight(self.hidden_dim),ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr') # [dim_hidden, 2 * dim_hidden] self.W_yh = tf.Variable(norm_weight(self.word_dim, self.hidden_dim), name='W_yh') self.b_yh = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_yh') # [dim_decoder, ] self.U_rh = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh') # [dim_hidden, dim_hidden] ##GRU 2 weights initialization starts here self.U_hz_hr_nl = tf.Variable(np.concatenate( [ortho_weight(self.hidden_dim), ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr_nl') # [dim_hidden, 2 * dim_hidden] non_linear self.b_hz_hr_nl = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_hz_hr_nl') # [2 * dim_hidden, ] self.W_c_z_r = tf.Variable(norm_weight(self.context_dim, 2 * self.hidden_dim), name='W_c_z_r') self.U_rh_nl = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh_nl') self.b_rh_nl = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_rh_nl') self.W_c_h_nl = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='W_c_h_nl') def get_ht_ctx(self, emb_y, target_hidden_state_0, annotations, a_m, y_m): res = tf.scan(self.one_time_step, elems=(emb_y, y_m), initializer=(target_hidden_state_0, tf.zeros([tf.shape(annotations)[0], self.context_dim]), tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]), tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]), annotations, a_m)) return res def one_time_step(self, tuple_h0_ctx_alpha_alpha_past_annotation, tuple_emb_mask): target_hidden_state_0 = tuple_h0_ctx_alpha_alpha_past_annotation[0] alpha_past_one = tuple_h0_ctx_alpha_alpha_past_annotation[3] annotation_one = tuple_h0_ctx_alpha_alpha_past_annotation[4] a_mask = tuple_h0_ctx_alpha_alpha_past_annotation[5] emb_y, y_mask = tuple_emb_mask #GRU 1 starts here emb_y_z_r_vector = tf.tensordot(emb_y, self.W_yz_yr, axes=1) + \ self.b_yz_yr # [batch, 2 * dim_decoder] hidden_z_r_vector = tf.tensordot(target_hidden_state_0, self.U_hz_hr, axes=1) # [batch, 2 * dim_decoder] pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \ hidden_z_r_vector) # [batch, 2 * dim_decoder] r1 = pre_z_r_vector[:, :self.hidden_dim] # [batch, dim_decoder] z1 = pre_z_r_vector[:, self.hidden_dim:] # [batch, dim_decoder] emb_y_h_vector = tf.tensordot(emb_y, self.W_yh, axes=1) + \ self.b_yh # [batch, dim_decoder] hidden_r_h_vector = tf.tensordot(target_hidden_state_0, self.U_rh, axes=1) # [batch, dim_decoder] hidden_r_h_vector *= r1 pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector) pre_h = z1 * target_hidden_state_0 + (1. - z1) * pre_h_proposal if y_mask is not None: pre_h = y_mask[:, None] * pre_h + (1. - y_mask)[:, None] * target_hidden_state_0 context, alpha, alpha_past_one = self.contextual_attention.get_context(annotation_one, pre_h, alpha_past_one, a_mask) # [batch, dim_ctx] #GRU 2 starts here emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.U_hz_hr_nl, axes=1) + self.b_hz_hr_nl context_z_r_vector = tf.tensordot(context, self.W_c_z_r, axes=1) z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector) r2 = z_r_vector[:, :self.hidden_dim] z2 = z_r_vector[:, self.hidden_dim:] emb_y_h_nl_vector = tf.tensordot(pre_h, self.U_rh_nl, axes=1) emb_y_h_nl_vector *= r2 emb_y_h_nl_vector=emb_y_h_nl_vector+ self.b_rh_nl # bias added after point wise multiplication with r2 context_h_vector = tf.tensordot(context, self.W_c_h_nl, axes=1) h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector) h = z2 * pre_h + (1. - z2) * h_proposal if y_mask is not None: h = y_mask[:, None] * h + (1. - y_mask)[:, None] * pre_h return h, context, alpha, alpha_past_one, annotation_one, a_mask ''' CALText class is the main class. This class uses below three classes: 1) DenseEncoder (Encoder) 2) ContextualAttention (Contextual attention mechnism) 3) Decoder (2 layerd GRU Decoder) CALText class implements two functions get_cost and get_sample, which are actually used for cost calculation and decoding. ''' class CALText(layers.Layer): def __init__(self, dense_encoder, contextual_attention, decoder, hidden_dim, word_dim, context_dim, target_dim, istraining,**kwargs): super(CALText, self).__init__( **kwargs) #self.batch_size = batch_size self.hidden_dim = hidden_dim self.word_dim = word_dim self.context_dim = context_dim self.target_dim = target_dim self.embed_matrix = tf.Variable(norm_weight(self.target_dim, self.word_dim), name='embed') self.dense_encoder = dense_encoder self.contextual_attention = contextual_attention self.decoder = decoder self.Wa2h = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='Wa2h') self.ba2h = tf.Variable(np.zeros((self.hidden_dim,)).astype('float32'), name='ba2h') self.Wc = tf.Variable(norm_weight(self.context_dim, self.word_dim), name='Wc') self.bc = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bc') self.Wh = tf.Variable(norm_weight(self.hidden_dim, self.word_dim), name='Wh') self.bh = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bh') self.Wy = tf.Variable(norm_weight(self.word_dim, self.word_dim), name='Wy') self.by = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='by') self.Wo = tf.Variable(norm_weight(self.word_dim//2, self.target_dim), name='Wo') self.bo = tf.Variable(np.zeros((self.target_dim,)).astype('float32'), name='bo') self.training = istraining self.dropout=tf.keras.layers.Dropout(rate=0.2) def get_cost(self, cost_annotation, cost_y, a_m, y_m,alpha_reg): #### step: 1 prepration of embedding of labels sequences #### timesteps = tf.shape(cost_y)[0] batch_size = tf.shape(cost_y)[1] emb_y = tf.nn.embedding_lookup(self.embed_matrix, tf.reshape(cost_y, [-1])) emb_y = tf.reshape(emb_y, [timesteps, batch_size, self.word_dim]) emb_pad = tf.fill((1, batch_size, self.word_dim), 0.0) emb_shift = tf.concat([emb_pad ,tf.strided_slice(emb_y, [0, 0, 0], [-1, batch_size, self.word_dim], [1, 1, 1])], axis=0) new_emb_y = emb_shift #### step: 2 calculation of h_0 #### anno_mean = tf.reduce_sum(cost_annotation * a_m[:, :, :, None], axis=[1, 2]) / tf.reduce_sum(a_m, axis=[1, 2])[:, None] h_0 = tf.tensordot(anno_mean, self.Wa2h, axes=1) + self.ba2h # [batch, hidden_dim] h_0 = tf.tanh(h_0) #### step: 3 calculation of h_t and c_t at all time steps #### ret = self.decoder.get_ht_ctx(new_emb_y, h_0, cost_annotation, a_m, y_m) h_t = ret[0] # h_t of all timesteps [timesteps, batch, hidden_dim] c_t = ret[1] # c_t of all timesteps [timesteps, batch, context_dim] alpha=ret[2] # alpha of all timesteps [timesteps, batch, h, w] #### step: 4 calculation of cost using h_t, c_t and y_t_1 #### y_t_1 = new_emb_y # shifted y | [1:] = [:-1] logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by logit = logit_pre + logit_ctx + logit_gru + self.bh shape = tf.shape(logit) logit = tf.reshape(logit, [shape[0], -1, shape[2]//2, 2]) logit = tf.reduce_max(logit, axis=3) logit =self.dropout(logit,training=self.training) #logit = tf.layers.dropout(inputs=logit, rate=0.2, training=self.training) logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo logit_shape = tf.shape(logit) logit = tf.reshape(logit, [-1,logit_shape[2]]) cost = tf.nn.softmax_cross_entropy_with_logits(logits=logit, labels=tf.one_hot(tf.reshape(cost_y, [-1]),depth=self.target_dim)) #### max pooling on vector with size equal to word_dim #### cost = tf.multiply(cost, tf.reshape(y_m, [-1])) cost = tf.reshape(cost, [shape[0], shape[1]]) cost = tf.reduce_sum(cost, axis=0) cost = tf.reduce_mean(cost) #### alpha L1 regularization #### alpha_sum=tf.reduce_mean(tf.reduce_sum(tf.reduce_sum(tf.abs(alpha), axis=[2, 3]),axis=0)) cost = tf.cond(tf.cast(alpha_reg > 0, tf.bool), lambda: cost + (alpha_reg * alpha_sum), lambda: cost) return cost def get_word(self, sample_y, sample_h_pre, alpha_past_pre, sample_annotation,training_mode): emb = tf.cond(pred=sample_y[0] < 0, true_fn=lambda: tf.fill((1, self.word_dim), 0.0), false_fn=lambda: tf.nn.embedding_lookup(params=self.embed_matrix, ids=sample_y) ) #ret = self.decoder.one_time_step((h_pre, None, None, alpha_past_pre, annotation, None), (emb, None)) emb_y_z_r_vector = tf.tensordot(emb, self.decoder.W_yz_yr, axes=1) + \ self.decoder.b_yz_yr # [batch, 2 * dim_decoder] hidden_z_r_vector = tf.tensordot(sample_h_pre, self.decoder.U_hz_hr, axes=1) # [batch, 2 * dim_decoder] pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \ hidden_z_r_vector) # [batch, 2 * dim_decoder] r1 = pre_z_r_vector[:, :self.decoder.hidden_dim] # [batch, dim_decoder] z1 = pre_z_r_vector[:, self.decoder.hidden_dim:] # [batch, dim_decoder] emb_y_h_vector = tf.tensordot(emb, self.decoder.W_yh, axes=1) + \ self.decoder.b_yh # [batch, dim_decoder] hidden_r_h_vector = tf.tensordot(sample_h_pre, self.decoder.U_rh, axes=1) # [batch, dim_decoder] hidden_r_h_vector *= r1 pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector) pre_h = z1 * sample_h_pre + (1. - z1) * pre_h_proposal context, alphacc, alpha_past = self.decoder.contextual_attention.get_context(sample_annotation, pre_h, alpha_past_pre, None) # [batch, dim_ctx] emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.decoder.U_hz_hr_nl, axes=1) + self.decoder.b_hz_hr_nl context_z_r_vector = tf.tensordot(context, self.decoder.W_c_z_r, axes=1) z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector) r2 = z_r_vector[:, :self.decoder.hidden_dim] z2 = z_r_vector[:, self.decoder.hidden_dim:] emb_y_h_nl_vector = tf.tensordot(pre_h, self.decoder.U_rh_nl, axes=1) + self.decoder.b_rh_nl emb_y_h_nl_vector *= r2 context_h_vector = tf.tensordot(context, self.decoder.W_c_h_nl, axes=1) h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector) h = z2 * pre_h + (1. - z2) * h_proposal h_t = h c_t = context alpha_past_t = alpha_past y_t_1 = emb logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by logit = logit_pre + logit_ctx + logit_gru + self.bh # batch x word_dim shape = tf.shape(input=logit) logit = tf.reshape(logit, [-1, shape[1]//2, 2]) logit = tf.reduce_max(input_tensor=logit, axis=2) logit = self.dropout(logit,training=training_mode) logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo next_probs = tf.nn.softmax(logits=logit) next_word = tf.reduce_max(input_tensor=tf.random.categorical(logits=next_probs, num_samples=1), axis=1) return next_probs, next_word, h_t, alpha_past_t, alphacc class CALText_Model(tf.keras.Model): # Subclass from tf.keras.model def __init__(self,training): # Define All your Variables Here. And other configurations super(CALText_Model, self).__init__() self.dense_blocks=3 self.levels_count=16 self.growth=24 #### decoder setup parameters #### self.hidden_dim=256 self.word_dim=256 self.dim_attend=512 self.dense_encoder = DenseEncoder(blocks=self.dense_blocks,level=self.levels_count, growth_rate=self.growth, istraining=training) self.contextual_attention = ContextualAttention(684, self.hidden_dim, self.dim_attend) self.decoder = Decoder(self.hidden_dim, self.word_dim, self.contextual_attention, 684) ##annotation.shape.as_list()[3]=684 self.caltext = CALText(self.dense_encoder, self.contextual_attention, self.decoder, self.hidden_dim, self.word_dim, 684 ,num_classes ,istraining=training) def call(self, x, x_mask, y=None, y_mask=None, training=True): # Use the variables defined here.... this is forward prop annotation, anno_mask = self.dense_encoder.dense_net(x, x_mask) if(y==None): return annotation else: cost = self.caltext.get_cost(annotation, y, anno_mask, y_mask, gamma_val) return cost,annotation def get_hidden_state_0(self, anno): hidden_state_0 = tf.tanh(tf.tensordot(tf.reduce_mean(input_tensor=anno, axis=[1, 2]), self.caltext.Wa2h, axes=1) + self.caltext.ba2h) # [batch, hidden_dim] return hidden_state_0 ##########apply L2 regularization on weights def get_loss(loss,model): #print(model.trainable_weights) for layer in model.trainable_weights: arr=(layer.name).split("/") if not arr[len(arr)-1].startswith('conv2d'): loss += lambda_val * tf.reduce_sum(input_tensor=tf.pow(layer, 2)) return loss ################### @tf.function(experimental_relax_shapes=True) def get_word(next_w, next_state, next_alpha_past, ctx, model,training): return model.caltext.get_word(next_w, next_state, next_alpha_past, ctx,training) def get_sample(ctx0, h_0, k , maxlen, stochastic, training, model): sample = [] sample_score = [] sample_att=[] live_k = 1 dead_k = 0 hyp_samples = [[]] * 1 hyp_scores = np.zeros(live_k).astype('float32') hyp_states = [] next_alpha_past = np.zeros((ctx0.shape[0], ctx0.shape[1], ctx0.shape[2])).astype('float32') emb_0 = np.zeros((ctx0.shape[0], 256)) next_w = -1 * np.ones((1,)).astype('int64') next_state = h_0 #tf.autograph.experimental.set_loop_options(shape_invariants=[(next_alpha_past, tf.TensorShape([None]))]) for ii in range(maxlen): ctx = np.tile(ctx0, [live_k, 1, 1, 1]) next_p, next_w, next_state, next_alpha_past,contexVec = get_word(next_w, next_state, next_alpha_past, ctx, model,training) sample_att.append(contexVec[0,:,:]) if stochastic: nw = next_w[0] sample.append(nw) sample_score += next_p[0, nw] if nw == 0: break else: cand_scores = hyp_scores[:, None] - np.log(next_p) cand_flat = cand_scores.flatten() ranks_flat = cand_flat.argsort()[:(k-dead_k)] voc_size = next_p.shape[1] assert voc_size==num_classes trans_indices = ranks_flat // voc_size word_indices = ranks_flat % voc_size costs = cand_flat[ranks_flat] new_hyp_samples = [] new_hyp_scores = np.zeros(k-dead_k).astype('float32') new_hyp_states = [] new_hyp_alpha_past = [] for idx, [ti, wi] in enumerate(zip(trans_indices, word_indices)): new_hyp_samples.append(hyp_samples[ti]+[wi]) new_hyp_scores[idx] = copy.copy(costs[idx]) new_hyp_states.append(copy.copy(next_state[ti])) new_hyp_alpha_past.append(copy.copy(next_alpha_past[ti])) new_live_k = 0 hyp_samples = [] hyp_scores = [] hyp_states = [] hyp_alpha_past = [] for idx in range(len(new_hyp_samples)): if new_hyp_samples[idx][-1] == 0: # sample.append(new_hyp_samples[idx]) sample_score.append(new_hyp_scores[idx]) dead_k += 1 else: new_live_k += 1 hyp_samples.append(new_hyp_samples[idx]) hyp_scores.append(new_hyp_scores[idx]) hyp_states.append(new_hyp_states[idx]) hyp_alpha_past.append(new_hyp_alpha_past[idx]) hyp_scores = np.array(hyp_scores) live_k = new_live_k if new_live_k < 1: break if dead_k >= k: break next_w = np.array([w1[-1] for w1 in hyp_samples]) next_state = np.array(hyp_states) next_alpha_past = np.array(hyp_alpha_past) if not stochastic: # dump every remaining one if live_k > 0: for idx in range(live_k): sample.append(hyp_samples[idx]) sample_score.append(hyp_scores[idx]) return sample, sample_score,sample_att #######################Predict @tf.function(experimental_relax_shapes=True) def execute_model(xx,xx_mask,CALTEXT): anno = CALTEXT(xx,xx_mask, training=False) hidden_state_0 = CALTEXT.get_hidden_state_0(anno) return anno,hidden_state_0 def predict(CALTEXT, images, x_mask): # training=False is only needed if there are layers with different # behavior during training versus inference (e.g. Dropout). batch_loss=0 img_ind=1 for img_ind in range(len(images)): xx = images[img_ind][tf.newaxis, ... ] xx_mask = x_mask[img_ind][tf.newaxis, ... ] anno,hidden_state_0=execute_model(xx,xx_mask,CALTEXT) sample, score,hypalpha=get_sample(anno, hidden_state_0,10, 130, False, False, CALTEXT) score = score / np.array([len(s) for s in sample]) ss = sample[score.argmin()] img_ind=img_ind+1 ind=0 num=int(len(ss)/2) #### output string ind=0 outstr=u'' frames = [] font = ImageFont.truetype("Jameel Noori Nastaleeq.ttf",60) worddicts_r=data.load_dict_picklefile("vocabulary.pkl") while (ind