File size: 9,423 Bytes
ad16788 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
import chainer
import chainer.functions as F
import chainer.links as L
import numpy as np
# dot product based attention
class AttDot(chainer.Chain):
"""Compute attention based on dot product.
Args:
eprojs (int | None): Dimension of input vectors from encoder.
dunits (int | None): Dimension of input vectors for decoder.
att_dim (int): Dimension of input vectors for attention.
"""
def __init__(self, eprojs, dunits, att_dim):
super(AttDot, self).__init__()
with self.init_scope():
self.mlp_enc = L.Linear(eprojs, att_dim)
self.mlp_dec = L.Linear(dunits, att_dim)
self.dunits = dunits
self.eprojs = eprojs
self.att_dim = att_dim
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
def reset(self):
"""Reset states."""
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
def __call__(self, enc_hs, dec_z, att_prev, scaling=2.0):
"""Compute AttDot forward layer.
Args:
enc_hs (chainer.Variable | N-dimensional array):
Input variable from encoder.
dec_z (chainer.Variable | N-dimensional array): Input variable of decoder.
scaling (float): Scaling weight to make attention sharp.
Returns:
chainer.Variable: Weighted sum over flames.
chainer.Variable: Attention weight.
"""
batch = len(enc_hs)
# pre-compute all h outside the decoder loop
if self.pre_compute_enc_h is None:
self.enc_h = F.pad_sequence(enc_hs) # utt x frame x hdim
self.h_length = self.enc_h.shape[1]
# utt x frame x att_dim
self.pre_compute_enc_h = F.tanh(self.mlp_enc(self.enc_h, n_batch_axes=2))
if dec_z is None:
dec_z = chainer.Variable(
self.xp.zeros((batch, self.dunits), dtype=np.float32)
)
else:
dec_z = dec_z.reshape(batch, self.dunits)
# <phi (h_t), psi (s)> for all t
u = F.broadcast_to(
F.expand_dims(F.tanh(self.mlp_dec(dec_z)), 1), self.pre_compute_enc_h.shape
)
e = F.sum(self.pre_compute_enc_h * u, axis=2) # utt x frame
# Applying a minus-large-number filter
# to make a probability value zero for a padded area
# simply degrades the performance, and I gave up this implementation
# Apply a scaling to make an attention sharp
w = F.softmax(scaling * e)
# weighted sum over flames
# utt x hdim
c = F.sum(
self.enc_h * F.broadcast_to(F.expand_dims(w, 2), self.enc_h.shape), axis=1
)
return c, w
# location based attention
class AttLoc(chainer.Chain):
"""Compute location-based attention.
Args:
eprojs (int | None): Dimension of input vectors from encoder.
dunits (int | None): Dimension of input vectors for decoder.
att_dim (int): Dimension of input vectors for attention.
aconv_chans (int): Number of channels of output arrays from convolutional layer.
aconv_filts (int): Size of filters of convolutional layer.
"""
def __init__(self, eprojs, dunits, att_dim, aconv_chans, aconv_filts):
super(AttLoc, self).__init__()
with self.init_scope():
self.mlp_enc = L.Linear(eprojs, att_dim)
self.mlp_dec = L.Linear(dunits, att_dim, nobias=True)
self.mlp_att = L.Linear(aconv_chans, att_dim, nobias=True)
self.loc_conv = L.Convolution2D(
1, aconv_chans, ksize=(1, 2 * aconv_filts + 1), pad=(0, aconv_filts)
)
self.gvec = L.Linear(att_dim, 1)
self.dunits = dunits
self.eprojs = eprojs
self.att_dim = att_dim
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
self.aconv_chans = aconv_chans
def reset(self):
"""Reset states."""
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
def __call__(self, enc_hs, dec_z, att_prev, scaling=2.0):
"""Compute AttLoc forward layer.
Args:
enc_hs (chainer.Variable | N-dimensional array):
Input variable from encoders.
dec_z (chainer.Variable | N-dimensional array): Input variable of decoder.
att_prev (chainer.Variable | None): Attention weight.
scaling (float): Scaling weight to make attention sharp.
Returns:
chainer.Variable: Weighted sum over flames.
chainer.Variable: Attention weight.
"""
batch = len(enc_hs)
# pre-compute all h outside the decoder loop
if self.pre_compute_enc_h is None:
self.enc_h = F.pad_sequence(enc_hs) # utt x frame x hdim
self.h_length = self.enc_h.shape[1]
# utt x frame x att_dim
self.pre_compute_enc_h = self.mlp_enc(self.enc_h, n_batch_axes=2)
if dec_z is None:
dec_z = chainer.Variable(
self.xp.zeros((batch, self.dunits), dtype=np.float32)
)
else:
dec_z = dec_z.reshape(batch, self.dunits)
# initialize attention weight with uniform dist.
if att_prev is None:
att_prev = [
self.xp.full(hh.shape[0], 1.0 / hh.shape[0], dtype=np.float32)
for hh in enc_hs
]
att_prev = [chainer.Variable(att) for att in att_prev]
att_prev = F.pad_sequence(att_prev)
# att_prev: utt x frame -> utt x 1 x 1 x frame
# -> utt x att_conv_chans x 1 x frame
att_conv = self.loc_conv(att_prev.reshape(batch, 1, 1, self.h_length))
# att_conv: utt x att_conv_chans x 1 x frame -> utt x frame x att_conv_chans
att_conv = F.swapaxes(F.squeeze(att_conv, axis=2), 1, 2)
# att_conv: utt x frame x att_conv_chans -> utt x frame x att_dim
att_conv = self.mlp_att(att_conv, n_batch_axes=2)
# dec_z_tiled: utt x frame x att_dim
dec_z_tiled = F.broadcast_to(
F.expand_dims(self.mlp_dec(dec_z), 1), self.pre_compute_enc_h.shape
)
# dot with gvec
# utt x frame x att_dim -> utt x frame
# TODO(watanabe) use batch_matmul
e = F.squeeze(
self.gvec(
F.tanh(att_conv + self.pre_compute_enc_h + dec_z_tiled), n_batch_axes=2
),
axis=2,
)
# Applying a minus-large-number filter
# to make a probability value zero for a padded area
# simply degrades the performance, and I gave up this implementation
# Apply a scaling to make an attention sharp
w = F.softmax(scaling * e)
# weighted sum over flames
# utt x hdim
c = F.sum(
self.enc_h * F.broadcast_to(F.expand_dims(w, 2), self.enc_h.shape), axis=1
)
return c, w
class NoAtt(chainer.Chain):
"""Compute non-attention layer.
This layer is a dummy attention layer to be compatible with other
attention-based models.
"""
def __init__(self):
super(NoAtt, self).__init__()
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
self.c = None
def reset(self):
"""Reset states."""
self.h_length = None
self.enc_h = None
self.pre_compute_enc_h = None
self.c = None
def __call__(self, enc_hs, dec_z, att_prev):
"""Compute NoAtt forward layer.
Args:
enc_hs (chainer.Variable | N-dimensional array):
Input variable from encoders.
dec_z: Dummy.
att_prev (chainer.Variable | None): Attention weight.
Returns:
chainer.Variable: Sum over flames.
chainer.Variable: Attention weight.
"""
# pre-compute all h outside the decoder loop
if self.pre_compute_enc_h is None:
self.enc_h = F.pad_sequence(enc_hs) # utt x frame x hdim
self.h_length = self.enc_h.shape[1]
# initialize attention weight with uniform dist.
if att_prev is None:
att_prev = [
self.xp.full(hh.shape[0], 1.0 / hh.shape[0], dtype=np.float32)
for hh in enc_hs
]
att_prev = [chainer.Variable(att) for att in att_prev]
att_prev = F.pad_sequence(att_prev)
self.c = F.sum(
self.enc_h
* F.broadcast_to(F.expand_dims(att_prev, 2), self.enc_h.shape),
axis=1,
)
return self.c, att_prev
def att_for(args):
"""Returns an attention layer given the program arguments.
Args:
args (Namespace): The arguments.
Returns:
chainer.Chain: The corresponding attention module.
"""
if args.atype == "dot":
att = AttDot(args.eprojs, args.dunits, args.adim)
elif args.atype == "location":
att = AttLoc(
args.eprojs, args.dunits, args.adim, args.aconv_chans, args.aconv_filts
)
elif args.atype == "noatt":
att = NoAtt()
else:
raise NotImplementedError(
"chainer supports only noatt, dot, and location attention."
)
return att
|