cerulianx commited on
Commit
e902303
1 Parent(s): e2f9adc

Upload encoder.py

Browse files
Files changed (1) hide show
  1. encoder.py +93 -0
encoder.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import attr
2
+ import numpy as np
3
+
4
+ import torch
5
+ import torch.nn as nn
6
+ import torch.nn.functional as F
7
+
8
+ from collections import OrderedDict
9
+ from functools import partial
10
+ from dall_e.utils import Conv2d
11
+
12
+ @attr.s(eq=False, repr=False)
13
+ class EncoderBlock(nn.Module):
14
+ n_in: int = attr.ib(validator=lambda i, a, x: x >= 1)
15
+ n_out: int = attr.ib(validator=lambda i, a, x: x >= 1 and x % 4 ==0)
16
+ n_layers: int = attr.ib(validator=lambda i, a, x: x >= 1)
17
+
18
+ device: torch.device = attr.ib(default=None)
19
+ requires_grad: bool = attr.ib(default=False)
20
+
21
+ def __attrs_post_init__(self) -> None:
22
+ super().__init__()
23
+ self.n_hid = self.n_out // 4
24
+ self.post_gain = 1 / (self.n_layers ** 2)
25
+
26
+ make_conv = partial(Conv2d, device=self.device, requires_grad=self.requires_grad)
27
+ self.id_path = make_conv(self.n_in, self.n_out, 1) if self.n_in != self.n_out else nn.Identity()
28
+ self.res_path = nn.Sequential(OrderedDict([
29
+ ('relu_1', nn.ReLU()),
30
+ ('conv_1', make_conv(self.n_in, self.n_hid, 3)),
31
+ ('relu_2', nn.ReLU()),
32
+ ('conv_2', make_conv(self.n_hid, self.n_hid, 3)),
33
+ ('relu_3', nn.ReLU()),
34
+ ('conv_3', make_conv(self.n_hid, self.n_hid, 3)),
35
+ ('relu_4', nn.ReLU()),
36
+ ('conv_4', make_conv(self.n_hid, self.n_out, 1)),]))
37
+
38
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
39
+ return self.id_path(x) + self.post_gain * self.res_path(x)
40
+
41
+ @attr.s(eq=False, repr=False)
42
+ class Encoder(nn.Module):
43
+ group_count: int = 4
44
+ n_hid: int = attr.ib(default=256, validator=lambda i, a, x: x >= 64)
45
+ n_blk_per_group: int = attr.ib(default=2, validator=lambda i, a, x: x >= 1)
46
+ input_channels: int = attr.ib(default=3, validator=lambda i, a, x: x >= 1)
47
+ vocab_size: int = attr.ib(default=8192, validator=lambda i, a, x: x >= 512)
48
+
49
+ device: torch.device = attr.ib(default=torch.device('cpu'))
50
+ requires_grad: bool = attr.ib(default=False)
51
+ use_mixed_precision: bool = attr.ib(default=True)
52
+
53
+ def __attrs_post_init__(self) -> None:
54
+ super().__init__()
55
+
56
+ blk_range = range(self.n_blk_per_group)
57
+ n_layers = self.group_count * self.n_blk_per_group
58
+ make_conv = partial(Conv2d, device=self.device, requires_grad=self.requires_grad)
59
+ make_blk = partial(EncoderBlock, n_layers=n_layers, device=self.device,
60
+ requires_grad=self.requires_grad)
61
+
62
+ self.blocks = nn.Sequential(OrderedDict([
63
+ ('input', make_conv(self.input_channels, 1 * self.n_hid, 7)),
64
+ ('group_1', nn.Sequential(OrderedDict([
65
+ *[(f'block_{i + 1}', make_blk(1 * self.n_hid, 1 * self.n_hid)) for i in blk_range],
66
+ ('pool', nn.MaxPool2d(kernel_size=2)),
67
+ ]))),
68
+ ('group_2', nn.Sequential(OrderedDict([
69
+ *[(f'block_{i + 1}', make_blk(1 * self.n_hid if i == 0 else 2 * self.n_hid, 2 * self.n_hid)) for i in blk_range],
70
+ ('pool', nn.MaxPool2d(kernel_size=2)),
71
+ ]))),
72
+ ('group_3', nn.Sequential(OrderedDict([
73
+ *[(f'block_{i + 1}', make_blk(2 * self.n_hid if i == 0 else 4 * self.n_hid, 4 * self.n_hid)) for i in blk_range],
74
+ ('pool', nn.MaxPool2d(kernel_size=2)),
75
+ ]))),
76
+ ('group_4', nn.Sequential(OrderedDict([
77
+ *[(f'block_{i + 1}', make_blk(4 * self.n_hid if i == 0 else 8 * self.n_hid, 8 * self.n_hid)) for i in blk_range],
78
+ ]))),
79
+ ('output', nn.Sequential(OrderedDict([
80
+ ('relu', nn.ReLU()),
81
+ ('conv', make_conv(8 * self.n_hid, self.vocab_size, 1, use_float16=False)),
82
+ ]))),
83
+ ]))
84
+
85
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
86
+ if len(x.shape) != 4:
87
+ raise ValueError(f'input shape {x.shape} is not 4d')
88
+ if x.shape[1] != self.input_channels:
89
+ raise ValueError(f'input has {x.shape[1]} channels but model built for {self.input_channels}')
90
+ if x.dtype != torch.float32:
91
+ raise ValueError('input must have dtype torch.float32')
92
+
93
+ return self.blocks(x)