CALText-TextRecognizer / CALTextModel.py
Tayaba171's picture
Update CALTextModel.py
f88111a
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras import Model
from matplotlib import pyplot as plt
from PIL import Image,ImageFont, ImageDraw
from skimage.transform import rescale, resize
import numpy as np
import re
import math
import copy
import random
import time
import data
import cv2
#import data
rng = np.random.RandomState(int(time.time()))
#### training setup parameters ####
lambda_val=1e-4
gamma_val=1
num_classes=130
## Utility functions used to initialize vaiables
def norm_weight(fan_in, fan_out):
W_bound = np.sqrt(6.0 / (fan_in + fan_out))
return np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=(fan_in, fan_out)), dtype=np.float32)
def conv_norm_weight(nin, nout, kernel_size):
filter_shape = (kernel_size[0], kernel_size[1], nin, nout)
fan_in = kernel_size[0] * kernel_size[1] * nin
fan_out = kernel_size[0] * kernel_size[1] * nout
W_bound = np.sqrt(6. / (fan_in + fan_out))
W = np.asarray(rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype=np.float32)
return W.astype('float32')
def ortho_weight(ndim):
W = np.random.randn(ndim, ndim)
u, s, v = np.linalg.svd(W)
return u.astype('float32')
#####
class DenseEncoder(layers.Layer):
def __init__(self, blocks, # number of dense blocks
level, # number of levels in each blocks
growth_rate, # growth rate in DenseNet paper: k
istraining,
dropout_rate=0.2, # keep-rate of dropout layer
dense_channels=0, # filter numbers of transition layer's input
transition=0.5, # rate of comprssion
input_conv_filters=48, # filter numbers of conv2d before dense blocks
input_conv_stride=(2,2), # stride of conv2d before dense blocks
input_conv_kernel=(7,7), **kwargs): # kernel size of conv2d before dense blocks
super(DenseEncoder, self).__init__( **kwargs)
self.blocks = blocks
self.growth_rate = growth_rate
self.training = istraining
self.dense_channels = dense_channels
self.level = level
self.dropout_rate = dropout_rate
self.transition = transition
self.input_conv_kernel = input_conv_kernel
self.input_conv_stride = input_conv_stride
self.input_conv_filters = input_conv_filters
self.limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel)
self.conv1=tf.keras.layers.Conv2D(filters=self.input_conv_filters, kernel_size=self.input_conv_kernel ,strides=self.input_conv_stride, padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-self.limit, self.limit))
self.batch_norm=tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.input_conv_filters),1.0/math.sqrt(self.input_conv_filters)), epsilon=0.0001)
self.relu=tf.keras.layers.ReLU()
self.maxpool=tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=(2, 2), padding='same')
self.dropout=tf.keras.layers.Dropout(rate=self.dropout_rate)
self.avgpool=tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=(2, 2), padding='same')
self.conv=[]
self.conv2=[]
self.batchnorm=[]
self.batchnorm2=[]
self.dense_channels += self.input_conv_filters
for i in range(self.blocks):
for j in range(self.level):
limit = self.bound(self.dense_channels, 4 * self.growth_rate, [1,1])
self.conv.append(tf.keras.layers.Conv2D(filters=4 * self.growth_rate, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit)))
self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(4 * self.growth_rate),1.0/math.sqrt(4 * self.growth_rate)), epsilon=0.0001))
limit = self.bound(4 * self.growth_rate, self.growth_rate, [3,3])
self.conv.append(tf.keras.layers.Conv2D(filters=self.growth_rate, kernel_size=(3,3) ,strides=(1,1), padding='same', data_format='channels_last', use_bias=False , kernel_initializer=tf.random_uniform_initializer(-limit, limit)))
self.batchnorm.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True,gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.growth_rate),1.0/math.sqrt(self.growth_rate)), epsilon=0.0001))
self.dense_channels += self.growth_rate
if i < self.blocks - 1:
compressed_channels = int(self.dense_channels * self.transition)
#### new dense channels for new dense block ####
self.dense_channels = compressed_channels
limit = self.bound(self.dense_channels, compressed_channels, [1,1])
self.conv2.append(tf.keras.layers.Conv2D(compressed_channels, kernel_size=(1,1) ,strides=(1,1), padding='valid', data_format='channels_last', use_bias=False , activation=None, kernel_initializer=tf.random_uniform_initializer(-limit, limit)))
self.batchnorm2.append(tf.keras.layers.BatchNormalization(trainable=self.training, momentum=0.9, scale=True, gamma_initializer=tf.random_uniform_initializer(-1.0/math.sqrt(self.dense_channels),1.0/math.sqrt(self.dense_channels)), epsilon=0.0001))
def bound(self, nin, nout, kernel):
fin = nin * kernel[0] * kernel[1]
fout = nout * kernel[0] * kernel[1]
return np.sqrt(6. / (fin + fout))
def dense_net(self, input_x, mask_x):
#### before flowing into dense blocks ####
input_x=tf.expand_dims(input=input_x, axis=3)
x = input_x
#limit = self.bound(1, self.input_conv_filters, self.input_conv_kernel)
x =self.conv1(x)
mask_x = mask_x[:, 0::2, 0::2]
x =self.batch_norm(x)
x =self.relu(x)
x=self.maxpool(x)
input_pre = x
mask_x = mask_x[:, 0::2, 0::2]
dense_out = x
cind=0
bind=0
cind2=0
bind2=0
#### flowing into dense blocks and transition_layer ####
for i in range(self.blocks):
for j in range(self.level):
#### [1, 1] convolution part for bottleneck ####
x =self.conv[cind](x)
cind += 1
x =self.batchnorm[bind](x)
bind += 1
x =self.relu(x)
x =self.dropout(x,training=self.training)
#### [3, 3] convolution part for regular convolve operation
x =self.conv[cind](x)
cind += 1
x =self.batchnorm[bind](x)
bind += 1
x =self.relu(x)
x =self.dropout(x,training=self.training)
dense_out = tf.concat([dense_out, x], axis=3)
x = dense_out
#### calculate the filter number of dense block's output ####
if i < self.blocks - 1:
#### new dense channels for new dense block ####
x =self.conv2[cind2](x)
cind2 += 1
x =self.batchnorm2[bind2](x)
bind2 += 1
x =self.relu(x)
x =self.dropout(x,training=self.training)
x=self.avgpool(x)
dense_out = x
mask_x = mask_x[:, 0::2, 0::2]
return dense_out, mask_x
'''
ContextualAttention class implements contextual attention mechanism.
'''
class ContextualAttention(layers.Layer):
def __init__(self, channels, # output of DenseEncoder | [batch, h, w, channels]
dim_decoder, dim_attend, **kwargs): # decoder hidden state:$h_{t-1}$ | [batch, dec_dim]
super(ContextualAttention, self).__init__( **kwargs)
self.channels = channels
self.coverage_kernel = [11,11] # kernel size of $Q$
self.coverage_filters = dim_attend # filter numbers of $Q$ | 512
self.dim_decoder = dim_decoder # 256
self.dim_attend = dim_attend # unified dim of three parts calculating $e_ti$ i.e.
# $Q*beta_t$, $U_a * a_i$, $W_a x h_{t-1}$ | 512
self.U_f = tf.Variable(norm_weight(self.coverage_filters, self.dim_attend), name='U_f') # $U_f x f_i$ | [cov_filters, dim_attend]
self.U_f_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_f_b') # $U_f x f_i + U_f_b$ | [dim_attend, ]
self.U_a = tf.Variable(norm_weight(self.channels,self.dim_attend), name='U_a') # $U_a x a_i$ | [annotatin_channels, dim_attend]
self.U_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='U_a_b') # $U_a x a_i + U_a_b$ | [dim_attend, ]
self.W_a = tf.Variable(norm_weight(self.dim_decoder,self.dim_attend), name='W_a') # $W_a x h_{t_1}$ | [dec_dim, dim_attend]
self.W_a_b = tf.Variable(np.zeros((self.dim_attend,)).astype('float32'), name='W_a_b') # $W_a x h_{t-1} + W_a_b$ | [dim_attend, ]
self.V_a = tf.Variable(norm_weight(self.dim_attend, 1), name='V_a') # $V_a x tanh(A + B + C)$ | [dim_attend, 1]
self.V_a_b = tf.Variable(np.zeros((1,)).astype('float32'), name='V_a_b') # $V_a x tanh(A + B + C) + V_a_b$ | [1, ]
self.alpha_past_filter = tf.Variable(conv_norm_weight(1, self.dim_attend, self.coverage_kernel), name='alpha_past_filter')
def get_context(self, annotation4ctx, h_t_1, alpha_past4ctx, a_mask):
#### calculate $U_f x f_i$ ####
alpha_past_4d = alpha_past4ctx[:, :, :, None]
Ft = tf.nn.conv2d(alpha_past_4d, filters=self.alpha_past_filter, strides=[1, 1, 1, 1], padding='SAME')
coverage_vector = tf.tensordot(Ft, self.U_f, axes=1) #+ self.U_f_b # [batch, h, w, dim_attend]
#### calculate $U_a x a_i$ ####
dense_encoder_vector = tf.tensordot(annotation4ctx, self.U_a, axes=1) #+ self.U_a_b # [batch, h, w, dim_attend]
#### calculate $W_a x h_{t - 1}$ ####
speller_vector = tf.tensordot(h_t_1, self.W_a, axes=1) #+ self.W_a_b # [batch, dim_attend]
speller_vector = speller_vector[:, None, None, :] # [batch, None, None, dim_attend]
tanh_vector = tf.tanh(coverage_vector + dense_encoder_vector + speller_vector + self.U_f_b) # [batch, h, w, dim_attend]
e_ti = tf.tensordot(tanh_vector, self.V_a, axes=1) + self.V_a_b # [batch, h, w, 1]
alpha = tf.exp(e_ti)
alpha = tf.squeeze(alpha, axis=3)
if a_mask is not None:
alpha = alpha * a_mask
alpha = alpha / tf.reduce_sum(alpha, axis=[1, 2], keepdims=True) # normlized weights | [batch, h, w]
alpha_past4ctx += alpha # accumalated weights matrix | [batch, h, w]
context = tf.reduce_sum(annotation4ctx * alpha[:, :, :, None], axis=[1, 2]) # context vector | [batch, feature_channels]
return context, alpha, alpha_past4ctx
'''
Decoder class implements 2 layerd Decoder (GRU) which decodes an input image
and outputs a seuence of characters using attention mechanism .
'''
class Decoder(layers.Layer):
def __init__(self, hidden_dim, word_dim, contextual_attention, context_dim, **kwargs):
super(Decoder, self).__init__( **kwargs)
self.contextual_attention = contextual_attention # inner-instance of contextual_attention to provide context
self.context_dim = context_dim # context dime 684
self.hidden_dim = hidden_dim # dim of hidden state 256
self.word_dim = word_dim # dim of embedding word 256
##GRU 1 weights initialization starts here
self.W_yz_yr = tf.Variable(np.concatenate(
[norm_weight(self.word_dim, self.hidden_dim), norm_weight(self.word_dim, self.hidden_dim)], axis=1), name='W_yz_yr') # [dim_word, 2 * dim_decoder]
self.b_yz_yr = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_yz_yr')
self.U_hz_hr = tf.Variable(np.concatenate(
[ortho_weight(self.hidden_dim),ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr') # [dim_hidden, 2 * dim_hidden]
self.W_yh = tf.Variable(norm_weight(self.word_dim,
self.hidden_dim), name='W_yh')
self.b_yh = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_yh') # [dim_decoder, ]
self.U_rh = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh') # [dim_hidden, dim_hidden]
##GRU 2 weights initialization starts here
self.U_hz_hr_nl = tf.Variable(np.concatenate(
[ortho_weight(self.hidden_dim), ortho_weight(self.hidden_dim)], axis=1), name='U_hz_hr_nl') # [dim_hidden, 2 * dim_hidden] non_linear
self.b_hz_hr_nl = tf.Variable(np.zeros((2 * self.hidden_dim, )).astype('float32'), name='b_hz_hr_nl') # [2 * dim_hidden, ]
self.W_c_z_r = tf.Variable(norm_weight(self.context_dim,
2 * self.hidden_dim), name='W_c_z_r')
self.U_rh_nl = tf.Variable(ortho_weight(self.hidden_dim), name='U_rh_nl')
self.b_rh_nl = tf.Variable(np.zeros((self.hidden_dim, )).astype('float32'), name='b_rh_nl')
self.W_c_h_nl = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='W_c_h_nl')
def get_ht_ctx(self, emb_y, target_hidden_state_0, annotations, a_m, y_m):
res = tf.scan(self.one_time_step, elems=(emb_y, y_m),
initializer=(target_hidden_state_0,
tf.zeros([tf.shape(annotations)[0], self.context_dim]),
tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]),
tf.zeros([tf.shape(annotations)[0], tf.shape(annotations)[1], tf.shape(annotations)[2]]),
annotations, a_m))
return res
def one_time_step(self, tuple_h0_ctx_alpha_alpha_past_annotation, tuple_emb_mask):
target_hidden_state_0 = tuple_h0_ctx_alpha_alpha_past_annotation[0]
alpha_past_one = tuple_h0_ctx_alpha_alpha_past_annotation[3]
annotation_one = tuple_h0_ctx_alpha_alpha_past_annotation[4]
a_mask = tuple_h0_ctx_alpha_alpha_past_annotation[5]
emb_y, y_mask = tuple_emb_mask
#GRU 1 starts here
emb_y_z_r_vector = tf.tensordot(emb_y, self.W_yz_yr, axes=1) + \
self.b_yz_yr # [batch, 2 * dim_decoder]
hidden_z_r_vector = tf.tensordot(target_hidden_state_0,
self.U_hz_hr, axes=1) # [batch, 2 * dim_decoder]
pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \
hidden_z_r_vector) # [batch, 2 * dim_decoder]
r1 = pre_z_r_vector[:, :self.hidden_dim] # [batch, dim_decoder]
z1 = pre_z_r_vector[:, self.hidden_dim:] # [batch, dim_decoder]
emb_y_h_vector = tf.tensordot(emb_y, self.W_yh, axes=1) + \
self.b_yh # [batch, dim_decoder]
hidden_r_h_vector = tf.tensordot(target_hidden_state_0,
self.U_rh, axes=1) # [batch, dim_decoder]
hidden_r_h_vector *= r1
pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector)
pre_h = z1 * target_hidden_state_0 + (1. - z1) * pre_h_proposal
if y_mask is not None:
pre_h = y_mask[:, None] * pre_h + (1. - y_mask)[:, None] * target_hidden_state_0
context, alpha, alpha_past_one = self.contextual_attention.get_context(annotation_one, pre_h, alpha_past_one, a_mask) # [batch, dim_ctx]
#GRU 2 starts here
emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.U_hz_hr_nl, axes=1) + self.b_hz_hr_nl
context_z_r_vector = tf.tensordot(context, self.W_c_z_r, axes=1)
z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector)
r2 = z_r_vector[:, :self.hidden_dim]
z2 = z_r_vector[:, self.hidden_dim:]
emb_y_h_nl_vector = tf.tensordot(pre_h, self.U_rh_nl, axes=1)
emb_y_h_nl_vector *= r2
emb_y_h_nl_vector=emb_y_h_nl_vector+ self.b_rh_nl # bias added after point wise multiplication with r2
context_h_vector = tf.tensordot(context, self.W_c_h_nl, axes=1)
h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector)
h = z2 * pre_h + (1. - z2) * h_proposal
if y_mask is not None:
h = y_mask[:, None] * h + (1. - y_mask)[:, None] * pre_h
return h, context, alpha, alpha_past_one, annotation_one, a_mask
'''
CALText class is the main class. This class uses below three classes:
1) DenseEncoder (Encoder)
2) ContextualAttention (Contextual attention mechnism)
3) Decoder (2 layerd GRU Decoder)
CALText class implements two functions get_cost and get_sample, which are actually used for cost calculation and decoding.
'''
class CALText(layers.Layer):
def __init__(self, dense_encoder, contextual_attention, decoder, hidden_dim, word_dim, context_dim, target_dim, istraining,**kwargs):
super(CALText, self).__init__( **kwargs)
#self.batch_size = batch_size
self.hidden_dim = hidden_dim
self.word_dim = word_dim
self.context_dim = context_dim
self.target_dim = target_dim
self.embed_matrix = tf.Variable(norm_weight(self.target_dim, self.word_dim), name='embed')
self.dense_encoder = dense_encoder
self.contextual_attention = contextual_attention
self.decoder = decoder
self.Wa2h = tf.Variable(norm_weight(self.context_dim, self.hidden_dim), name='Wa2h')
self.ba2h = tf.Variable(np.zeros((self.hidden_dim,)).astype('float32'), name='ba2h')
self.Wc = tf.Variable(norm_weight(self.context_dim, self.word_dim), name='Wc')
self.bc = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bc')
self.Wh = tf.Variable(norm_weight(self.hidden_dim, self.word_dim), name='Wh')
self.bh = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='bh')
self.Wy = tf.Variable(norm_weight(self.word_dim, self.word_dim), name='Wy')
self.by = tf.Variable(np.zeros((self.word_dim,)).astype('float32'), name='by')
self.Wo = tf.Variable(norm_weight(self.word_dim//2, self.target_dim), name='Wo')
self.bo = tf.Variable(np.zeros((self.target_dim,)).astype('float32'), name='bo')
self.training = istraining
self.dropout=tf.keras.layers.Dropout(rate=0.2)
def get_cost(self, cost_annotation, cost_y, a_m, y_m,alpha_reg):
#### step: 1 prepration of embedding of labels sequences ####
timesteps = tf.shape(cost_y)[0]
batch_size = tf.shape(cost_y)[1]
emb_y = tf.nn.embedding_lookup(self.embed_matrix, tf.reshape(cost_y, [-1]))
emb_y = tf.reshape(emb_y, [timesteps, batch_size, self.word_dim])
emb_pad = tf.fill((1, batch_size, self.word_dim), 0.0)
emb_shift = tf.concat([emb_pad ,tf.strided_slice(emb_y, [0, 0, 0], [-1, batch_size, self.word_dim], [1, 1, 1])], axis=0)
new_emb_y = emb_shift
#### step: 2 calculation of h_0 ####
anno_mean = tf.reduce_sum(cost_annotation * a_m[:, :, :, None], axis=[1, 2]) / tf.reduce_sum(a_m, axis=[1, 2])[:, None]
h_0 = tf.tensordot(anno_mean, self.Wa2h, axes=1) + self.ba2h # [batch, hidden_dim]
h_0 = tf.tanh(h_0)
#### step: 3 calculation of h_t and c_t at all time steps ####
ret = self.decoder.get_ht_ctx(new_emb_y, h_0, cost_annotation, a_m, y_m)
h_t = ret[0] # h_t of all timesteps [timesteps, batch, hidden_dim]
c_t = ret[1] # c_t of all timesteps [timesteps, batch, context_dim]
alpha=ret[2] # alpha of all timesteps [timesteps, batch, h, w]
#### step: 4 calculation of cost using h_t, c_t and y_t_1 ####
y_t_1 = new_emb_y # shifted y | [1:] = [:-1]
logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh
logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc
logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by
logit = logit_pre + logit_ctx + logit_gru + self.bh
shape = tf.shape(logit)
logit = tf.reshape(logit, [shape[0], -1, shape[2]//2, 2])
logit = tf.reduce_max(logit, axis=3)
logit =self.dropout(logit,training=self.training)
#logit = tf.layers.dropout(inputs=logit, rate=0.2, training=self.training)
logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo
logit_shape = tf.shape(logit)
logit = tf.reshape(logit, [-1,logit_shape[2]])
cost = tf.nn.softmax_cross_entropy_with_logits(logits=logit, labels=tf.one_hot(tf.reshape(cost_y, [-1]),depth=self.target_dim))
#### max pooling on vector with size equal to word_dim ####
cost = tf.multiply(cost, tf.reshape(y_m, [-1]))
cost = tf.reshape(cost, [shape[0], shape[1]])
cost = tf.reduce_sum(cost, axis=0)
cost = tf.reduce_mean(cost)
#### alpha L1 regularization ####
alpha_sum=tf.reduce_mean(tf.reduce_sum(tf.reduce_sum(tf.abs(alpha), axis=[2, 3]),axis=0))
cost = tf.cond(tf.cast(alpha_reg > 0, tf.bool), lambda: cost + (alpha_reg * alpha_sum), lambda: cost)
return cost
def get_word(self, sample_y, sample_h_pre, alpha_past_pre, sample_annotation,training_mode):
emb = tf.cond(pred=sample_y[0] < 0,
true_fn=lambda: tf.fill((1, self.word_dim), 0.0),
false_fn=lambda: tf.nn.embedding_lookup(params=self.embed_matrix, ids=sample_y)
)
#ret = self.decoder.one_time_step((h_pre, None, None, alpha_past_pre, annotation, None), (emb, None))
emb_y_z_r_vector = tf.tensordot(emb, self.decoder.W_yz_yr, axes=1) + \
self.decoder.b_yz_yr # [batch, 2 * dim_decoder]
hidden_z_r_vector = tf.tensordot(sample_h_pre,
self.decoder.U_hz_hr, axes=1) # [batch, 2 * dim_decoder]
pre_z_r_vector = tf.sigmoid(emb_y_z_r_vector + \
hidden_z_r_vector) # [batch, 2 * dim_decoder]
r1 = pre_z_r_vector[:, :self.decoder.hidden_dim] # [batch, dim_decoder]
z1 = pre_z_r_vector[:, self.decoder.hidden_dim:] # [batch, dim_decoder]
emb_y_h_vector = tf.tensordot(emb, self.decoder.W_yh, axes=1) + \
self.decoder.b_yh # [batch, dim_decoder]
hidden_r_h_vector = tf.tensordot(sample_h_pre,
self.decoder.U_rh, axes=1) # [batch, dim_decoder]
hidden_r_h_vector *= r1
pre_h_proposal = tf.tanh(hidden_r_h_vector + emb_y_h_vector)
pre_h = z1 * sample_h_pre + (1. - z1) * pre_h_proposal
context, alphacc, alpha_past = self.decoder.contextual_attention.get_context(sample_annotation, pre_h, alpha_past_pre, None) # [batch, dim_ctx]
emb_y_z_r_nl_vector = tf.tensordot(pre_h, self.decoder.U_hz_hr_nl, axes=1) + self.decoder.b_hz_hr_nl
context_z_r_vector = tf.tensordot(context, self.decoder.W_c_z_r, axes=1)
z_r_vector = tf.sigmoid(emb_y_z_r_nl_vector + context_z_r_vector)
r2 = z_r_vector[:, :self.decoder.hidden_dim]
z2 = z_r_vector[:, self.decoder.hidden_dim:]
emb_y_h_nl_vector = tf.tensordot(pre_h, self.decoder.U_rh_nl, axes=1) + self.decoder.b_rh_nl
emb_y_h_nl_vector *= r2
context_h_vector = tf.tensordot(context, self.decoder.W_c_h_nl, axes=1)
h_proposal = tf.tanh(emb_y_h_nl_vector + context_h_vector)
h = z2 * pre_h + (1. - z2) * h_proposal
h_t = h
c_t = context
alpha_past_t = alpha_past
y_t_1 = emb
logit_gru = tf.tensordot(h_t, self.Wh, axes=1) #+ self.bh
logit_ctx = tf.tensordot(c_t, self.Wc, axes=1) #+ self.bc
logit_pre = tf.tensordot(y_t_1, self.Wy, axes=1) #+ self.by
logit = logit_pre + logit_ctx + logit_gru + self.bh # batch x word_dim
shape = tf.shape(input=logit)
logit = tf.reshape(logit, [-1, shape[1]//2, 2])
logit = tf.reduce_max(input_tensor=logit, axis=2)
logit = self.dropout(logit,training=training_mode)
logit = tf.tensordot(logit, self.Wo, axes=1) + self.bo
next_probs = tf.nn.softmax(logits=logit)
next_word = tf.reduce_max(input_tensor=tf.random.categorical(logits=next_probs, num_samples=1), axis=1)
return next_probs, next_word, h_t, alpha_past_t, alphacc
class CALText_Model(tf.keras.Model): # Subclass from tf.keras.model
def __init__(self,training): # Define All your Variables Here. And other configurations
super(CALText_Model, self).__init__()
self.dense_blocks=3
self.levels_count=16
self.growth=24
#### decoder setup parameters ####
self.hidden_dim=256
self.word_dim=256
self.dim_attend=512
self.dense_encoder = DenseEncoder(blocks=self.dense_blocks,level=self.levels_count, growth_rate=self.growth, istraining=training)
self.contextual_attention = ContextualAttention(684, self.hidden_dim, self.dim_attend)
self.decoder = Decoder(self.hidden_dim, self.word_dim, self.contextual_attention, 684) ##annotation.shape.as_list()[3]=684
self.caltext = CALText(self.dense_encoder, self.contextual_attention, self.decoder, self.hidden_dim, self.word_dim, 684 ,num_classes ,istraining=training)
def call(self, x, x_mask, y=None, y_mask=None, training=True): # Use the variables defined here.... this is forward prop
annotation, anno_mask = self.dense_encoder.dense_net(x, x_mask)
if(y==None):
return annotation
else:
cost = self.caltext.get_cost(annotation, y, anno_mask, y_mask, gamma_val)
return cost,annotation
def get_hidden_state_0(self, anno):
hidden_state_0 = tf.tanh(tf.tensordot(tf.reduce_mean(input_tensor=anno, axis=[1, 2]), self.caltext.Wa2h, axes=1) + self.caltext.ba2h) # [batch, hidden_dim]
return hidden_state_0
##########apply L2 regularization on weights
def get_loss(loss,model):
#print(model.trainable_weights)
for layer in model.trainable_weights:
arr=(layer.name).split("/")
if not arr[len(arr)-1].startswith('conv2d'):
loss += lambda_val * tf.reduce_sum(input_tensor=tf.pow(layer, 2))
return loss
###################
@tf.function(experimental_relax_shapes=True)
def get_word(next_w, next_state, next_alpha_past, ctx, model,training):
return model.caltext.get_word(next_w, next_state, next_alpha_past, ctx,training)
def get_sample(ctx0, h_0, k , maxlen, stochastic, training, model):
sample = []
sample_score = []
sample_att=[]
live_k = 1
dead_k = 0
hyp_samples = [[]] * 1
hyp_scores = np.zeros(live_k).astype('float32')
hyp_states = []
next_alpha_past = np.zeros((ctx0.shape[0], ctx0.shape[1], ctx0.shape[2])).astype('float32')
emb_0 = np.zeros((ctx0.shape[0], 256))
next_w = -1 * np.ones((1,)).astype('int64')
next_state = h_0
#tf.autograph.experimental.set_loop_options(shape_invariants=[(next_alpha_past, tf.TensorShape([None]))])
for ii in range(maxlen):
ctx = np.tile(ctx0, [live_k, 1, 1, 1])
next_p, next_w, next_state, next_alpha_past,contexVec = get_word(next_w, next_state, next_alpha_past, ctx, model,training)
sample_att.append(contexVec[0,:,:])
if stochastic:
nw = next_w[0]
sample.append(nw)
sample_score += next_p[0, nw]
if nw == 0:
break
else:
cand_scores = hyp_scores[:, None] - np.log(next_p)
cand_flat = cand_scores.flatten()
ranks_flat = cand_flat.argsort()[:(k-dead_k)]
voc_size = next_p.shape[1]
assert voc_size==num_classes
trans_indices = ranks_flat // voc_size
word_indices = ranks_flat % voc_size
costs = cand_flat[ranks_flat]
new_hyp_samples = []
new_hyp_scores = np.zeros(k-dead_k).astype('float32')
new_hyp_states = []
new_hyp_alpha_past = []
for idx, [ti, wi] in enumerate(zip(trans_indices, word_indices)):
new_hyp_samples.append(hyp_samples[ti]+[wi])
new_hyp_scores[idx] = copy.copy(costs[idx])
new_hyp_states.append(copy.copy(next_state[ti]))
new_hyp_alpha_past.append(copy.copy(next_alpha_past[ti]))
new_live_k = 0
hyp_samples = []
hyp_scores = []
hyp_states = []
hyp_alpha_past = []
for idx in range(len(new_hyp_samples)):
if new_hyp_samples[idx][-1] == 0: # <eol>
sample.append(new_hyp_samples[idx])
sample_score.append(new_hyp_scores[idx])
dead_k += 1
else:
new_live_k += 1
hyp_samples.append(new_hyp_samples[idx])
hyp_scores.append(new_hyp_scores[idx])
hyp_states.append(new_hyp_states[idx])
hyp_alpha_past.append(new_hyp_alpha_past[idx])
hyp_scores = np.array(hyp_scores)
live_k = new_live_k
if new_live_k < 1:
break
if dead_k >= k:
break
next_w = np.array([w1[-1] for w1 in hyp_samples])
next_state = np.array(hyp_states)
next_alpha_past = np.array(hyp_alpha_past)
if not stochastic:
# dump every remaining one
if live_k > 0:
for idx in range(live_k):
sample.append(hyp_samples[idx])
sample_score.append(hyp_scores[idx])
return sample, sample_score,sample_att
#######################Predict
@tf.function(experimental_relax_shapes=True)
def execute_model(xx,xx_mask,CALTEXT):
anno = CALTEXT(xx,xx_mask, training=False)
hidden_state_0 = CALTEXT.get_hidden_state_0(anno)
return anno,hidden_state_0
def predict(CALTEXT, images, x_mask):
# training=False is only needed if there are layers with different
# behavior during training versus inference (e.g. Dropout).
batch_loss=0
img_ind=1
for img_ind in range(len(images)):
xx = images[img_ind][tf.newaxis, ... ]
xx_mask = x_mask[img_ind][tf.newaxis, ... ]
anno,hidden_state_0=execute_model(xx,xx_mask,CALTEXT)
sample, score,hypalpha=get_sample(anno, hidden_state_0,10, 130, False, False, CALTEXT)
score = score / np.array([len(s) for s in sample])
ss = sample[score.argmin()]
img_ind=img_ind+1
ind=0
num=int(len(ss)/2)
#### output string
ind=0
outstr=u''
frames = []
font = ImageFont.truetype("Jameel Noori Nastaleeq.ttf",60)
worddicts_r=data.load_dict_picklefile("vocabulary.pkl")
while (ind<len(ss)-1):
k=(len(ss)-2)-ind
outstr=outstr+worddicts_r[int(ss[k])]
textimg = Image.new('RGB', (1400,100),(255,255,255))
drawtext = ImageDraw.Draw(textimg)
drawtext.text((20, 20), outstr ,(0,0,0),font=font)
fig,axes=plt.subplots(2,1)
axes[0].imshow(textimg)
axes[0].axis('off')
axes[1].axis('off')
axes[1].imshow(xx[0,:,:],cmap='gray')
visualization=resize(hypalpha[k], (100,800),anti_aliasing=True)
axes[1].imshow(255-(255 * visualization), alpha=0.2)
plt.axis('off')
plt.savefig('res.png')
frames.append(Image.fromarray(cv2.imread('res.png'), 'RGB'))
ind=ind+1
frame_one = frames[0]
frame_one.save("vis.gif", format="GIF", append_images=frames,save_all=True, duration=300, loop=0)
gif_image="vis.gif"
return outstr,gif_image