File size: 4,962 Bytes
ad16788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# encoding: utf-8
"""Class Declaration of Transformer's Encoder."""

import chainer

from chainer import links as L

from espnet.nets.chainer_backend.transformer.embedding import PositionalEncoding
from espnet.nets.chainer_backend.transformer.encoder_layer import EncoderLayer
from espnet.nets.chainer_backend.transformer.layer_norm import LayerNorm
from espnet.nets.chainer_backend.transformer.mask import make_history_mask
from espnet.nets.chainer_backend.transformer.subsampling import Conv2dSubsampling
from espnet.nets.chainer_backend.transformer.subsampling import LinearSampling

import logging
import numpy as np


class Encoder(chainer.Chain):
    """Encoder.

    Args:
        input_type(str):
            Sampling type. `input_type` must be `conv2d` or 'linear' currently.
        idim (int): Dimension of inputs.
        n_layers (int): Number of encoder layers.
        n_units (int): Number of input/output dimension of a FeedForward layer.
        d_units (int): Number of units of hidden layer in a FeedForward layer.
        h (int): Number of attention heads.
        dropout (float): Dropout rate

    """

    def __init__(
        self,
        idim,
        attention_dim=256,
        attention_heads=4,
        linear_units=2048,
        num_blocks=6,
        dropout_rate=0.1,
        positional_dropout_rate=0.1,
        attention_dropout_rate=0.0,
        input_layer="conv2d",
        pos_enc_class=PositionalEncoding,
        initialW=None,
        initial_bias=None,
    ):
        """Initialize Encoder.

        Args:
            idim (int): Input dimension.
            args (Namespace): Training config.
            initialW (int, optional):  Initializer to initialize the weight.
            initial_bias (bool, optional): Initializer to initialize the bias.

        """
        super(Encoder, self).__init__()
        initialW = chainer.initializers.Uniform if initialW is None else initialW
        initial_bias = (
            chainer.initializers.Uniform if initial_bias is None else initial_bias
        )
        self.do_history_mask = False
        with self.init_scope():
            self.conv_subsampling_factor = 1
            channels = 64  # Based in paper
            if input_layer == "conv2d":
                idim = int(np.ceil(np.ceil(idim / 2) / 2)) * channels
                self.input_layer = Conv2dSubsampling(
                    channels,
                    idim,
                    attention_dim,
                    dropout=dropout_rate,
                    initialW=initialW,
                    initial_bias=initial_bias,
                )
                self.conv_subsampling_factor = 4
            elif input_layer == "linear":
                self.input_layer = LinearSampling(
                    idim, attention_dim, initialW=initialW, initial_bias=initial_bias
                )
            elif input_layer == "embed":
                self.input_layer = chainer.Sequential(
                    L.EmbedID(idim, attention_dim, ignore_label=-1),
                    pos_enc_class(attention_dim, positional_dropout_rate),
                )
                self.do_history_mask = True
            else:
                raise ValueError("unknown input_layer: " + input_layer)
            self.norm = LayerNorm(attention_dim)
        for i in range(num_blocks):
            name = "encoders." + str(i)
            layer = EncoderLayer(
                attention_dim,
                d_units=linear_units,
                h=attention_heads,
                dropout=attention_dropout_rate,
                initialW=initialW,
                initial_bias=initial_bias,
            )
            self.add_link(name, layer)
        self.n_layers = num_blocks

    def forward(self, e, ilens):
        """Compute Encoder layer.

        Args:
            e (chainer.Variable): Batch of padded charactor. (B, Tmax)
            ilens (chainer.Variable): Batch of length of each input batch. (B,)

        Returns:
            chainer.Variable: Computed variable of encoder.
            numpy.array: Mask.
            chainer.Variable: Batch of lengths of each encoder outputs.

        """
        if isinstance(self.input_layer, Conv2dSubsampling):
            e, ilens = self.input_layer(e, ilens)
        else:
            e = self.input_layer(e)
        batch, length, dims = e.shape
        x_mask = np.ones([batch, length])
        for j in range(batch):
            x_mask[j, ilens[j] :] = -1
        xx_mask = (x_mask[:, None, :] >= 0) * (x_mask[:, :, None] >= 0)
        xx_mask = self.xp.array(xx_mask)
        if self.do_history_mask:
            history_mask = make_history_mask(self.xp, x_mask)
            xx_mask *= history_mask
        logging.debug("encoders size: " + str(e.shape))
        e = e.reshape(-1, dims)
        for i in range(self.n_layers):
            e = self["encoders." + str(i)](e, xx_mask, batch)
        return self.norm(e).reshape(batch, length, -1), x_mask, ilens