# coding=utf-8 # Copyright 2022 The IDEA Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import torch import torch.nn as nn from detrex.layers import FFN, BaseTransformerLayer, MultiheadAttention, TransformerLayerSequence class DetrTransformerEncoder(TransformerLayerSequence): def __init__( self, embed_dim: int = 256, num_heads: int = 8, attn_dropout: float = 0.1, feedforward_dim: int = 2048, ffn_dropout: float = 0.1, num_layers: int = 6, post_norm: bool = True, batch_first: bool = False, ): super(DetrTransformerEncoder, self).__init__( transformer_layers=BaseTransformerLayer( attn=MultiheadAttention( embed_dim=embed_dim, num_heads=num_heads, attn_drop=attn_dropout, batch_first=batch_first, ), ffn=FFN( embed_dim=embed_dim, feedforward_dim=feedforward_dim, ffn_drop=ffn_dropout, ), norm=nn.LayerNorm( normalized_shape=embed_dim, ), operation_order=("self_attn", "norm", "ffn", "norm"), ), num_layers=num_layers, ) self.embed_dim = self.layers[0].embed_dim self.pre_norm = self.layers[0].pre_norm if post_norm: self.post_norm_layer = nn.LayerNorm(self.embed_dim) else: self.post_norm_layer = None def forward( self, query, key, value, query_pos=None, key_pos=None, attn_masks=None, query_key_padding_mask=None, key_padding_mask=None, **kwargs, ): for layer in self.layers: query = layer( query, key, value, query_pos=query_pos, key_pos=key_pos, attn_masks=attn_masks, query_key_padding_mask=query_key_padding_mask, key_padding_mask=key_padding_mask, **kwargs, ) if self.post_norm_layer is not None: query = self.post_norm_layer(query) return query class DetrTransformerDecoder(TransformerLayerSequence): def __init__( self, embed_dim: int = 256, num_heads: int = 8, attn_dropout: float = 0.1, feedforward_dim: int = 2048, ffn_dropout: float = 0.1, num_layers: int = 6, post_norm: bool = True, return_intermediate: bool = True, batch_first: bool = False, ): super(DetrTransformerDecoder, self).__init__( transformer_layers=BaseTransformerLayer( attn=MultiheadAttention( embed_dim=embed_dim, num_heads=num_heads, attn_drop=attn_dropout, batch_first=batch_first, ), ffn=FFN( embed_dim=embed_dim, feedforward_dim=feedforward_dim, ffn_drop=ffn_dropout, ), norm=nn.LayerNorm( normalized_shape=embed_dim, ), operation_order=("self_attn", "norm", "cross_attn", "norm", "ffn", "norm"), ), num_layers=num_layers, ) self.return_intermediate = return_intermediate self.embed_dim = self.layers[0].embed_dim if post_norm: self.post_norm_layer = nn.LayerNorm(self.embed_dim) else: self.post_norm_layer = None def forward( self, query, key, value, query_pos=None, key_pos=None, attn_masks=None, query_key_padding_mask=None, key_padding_mask=None, **kwargs, ): if not self.return_intermediate: for layer in self.layers: query = layer( query, key, value, query_pos=query_pos, key_pos=key_pos, attn_masks=attn_masks, query_key_padding_mask=query_key_padding_mask, key_padding_mask=key_padding_mask, **kwargs, ) if self.post_norm_layer is not None: query = self.post_norm_layer(query)[None] return query # return intermediate intermediate = [] for layer in self.layers: query = layer( query, key, value, query_pos=query_pos, key_pos=key_pos, attn_masks=attn_masks, query_key_padding_mask=query_key_padding_mask, key_padding_mask=key_padding_mask, **kwargs, ) if self.return_intermediate: if self.post_norm_layer is not None: intermediate.append(self.post_norm_layer(query)) else: intermediate.append(query) return torch.stack(intermediate) class DetrTransformer(nn.Module): def __init__(self, encoder=None, decoder=None): super(DetrTransformer, self).__init__() self.encoder = encoder self.decoder = decoder self.embed_dim = self.encoder.embed_dim self.init_weights() def init_weights(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) def forward(self, x, mask, query_embed, pos_embed): bs, c, h, w = x.shape x = x.view(bs, c, -1).permute(2, 0, 1) # [bs, c, h, w] -> [h*w, bs, c] pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1) query_embed = query_embed.unsqueeze(1).repeat( 1, bs, 1 ) # [num_query, dim] -> [num_query, bs, dim] mask = mask.view(bs, -1) # [bs, h, w] -> [bs, h*w] memory = self.encoder( query=x, key=None, value=None, query_pos=pos_embed, query_key_padding_mask=mask, ) target = torch.zeros_like(query_embed) decoder_output = self.decoder( query=target, key=memory, value=memory, key_pos=pos_embed, query_pos=query_embed, key_padding_mask=mask, ) decoder_output = decoder_output.transpose(1, 2) memory = memory.permute(1, 2, 0).reshape(bs, c, h, w) return decoder_output, memory