import torch import torch.nn as nn from einops import rearrange, repeat from torch import einsum, nn from einops_exts import rearrange_many # from .modules import GatedCrossAttentionBlock from .utils import getattr_recursive, setattr_recursive def exists(val): return val is not None def FeedForward( dim, mult=4, use_ft_layernorm=False, enable_init_network_params=False, initializer_range=0.02, ): inner_dim = int(dim * mult) net = nn.Sequential( nn.LayerNorm(dim), nn.Linear(dim, inner_dim, bias=False), nn.GELU(), nn.Linear(inner_dim, dim, bias=False), ) if use_ft_layernorm and enable_init_network_params: # only use_ft_layernorm is on and enalbe_init_network_params # then start the initialization net[0].weight.data.normal_(mean=0.0, std=initializer_range) net[0].bias.data.zero_() net[1].weight.data.normal_(mean=0.0, std=initializer_range) net[3].weight.data.normal_(mean=0.0, std=initializer_range) return net # gated cross attention class MaskedCrossAttention(nn.Module): def __init__( self, *, dim, dim_visual, dim_head=64, heads=8, only_attend_immediate_media=True, use_ft_layernorm=False, use_ft_flash_attention=False, enable_init_network_params=False, initializer_range=0.02, ): super().__init__() self.scale = dim_head**-0.5 self.heads = heads self.use_ft_flash_attention = False self.initializer_range = initializer_range inner_dim = dim_head * heads self.norm = nn.LayerNorm(dim) self.to_q = nn.Linear(dim, inner_dim, bias=False) self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False) self.to_out = nn.Linear(inner_dim, dim, bias=False) # whether for text to only attend to immediate preceding image, or all previous images self.only_attend_immediate_media = only_attend_immediate_media if enable_init_network_params: self.apply(self._init_weights) def _init_weights(self, module): if isinstance(module, nn.Linear): # Slightly different from the TF version which uses truncated_normal for initialization # cf https://github.com/pytorch/pytorch/pull/5617 module.weight.data.normal_(mean=0.0, std=self.initializer_range) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) def forward(self, x, media, media_locations=None, use_cached_media=False, image_mask=None): """ Args: x (torch.Tensor): text features shape (B, T_txt, D_txt) media (torch.Tensor): image features shape (B, T_img, n, D_img) where n is the dim of the latents media_locations: boolean mask identifying the media tokens in x shape (B, T_txt) use_cached_media: bool If true, treat all of x as if they occur after the last media registered in media_locations. T_txt does not need to exactly equal media_locations.shape[1] in this case """ if not use_cached_media: assert media_locations.shape[1] == x.shape[1], ( f"media_location.shape is {media_locations.shape} but x.shape is" f" {x.shape}" ) T_txt = x.shape[1] _, T_img, n = media.shape[:3] h = self.heads x = self.norm(x.contiguous()) q = self.to_q(x) media = rearrange(media, "b t n d -> b (t n) d") k, v = self.to_kv(media).chunk(2, dim=-1) if exists(media_locations): media_time = torch.arange(T_img, device=x.device) + 1 if use_cached_media: # text time is set to the last cached media location text_time = repeat( torch.count_nonzero(media_locations, dim=1), "b -> b i", i=T_txt, ) else: # at each boolean of True, increment the time counter (relative to media time) text_time = media_locations.cumsum(dim=-1) # text time must equal media time if only attending to most immediate image # otherwise, as long as text time is greater than media time (if attending to all previous images / media) mask_op = torch.eq if self.only_attend_immediate_media else torch.ge text_to_media_mask = mask_op( rearrange(text_time, "b i -> b 1 i 1"), repeat(media_time, "j -> 1 1 1 (j n)", n=n), ) if self.only_attend_immediate_media: # any text without a preceding media needs to have attention zeroed out text_without_media_mask = text_time == 0 text_without_media_mask = rearrange( text_without_media_mask, "b i -> b 1 i 1" ) q, k, v = rearrange_many((q, k, v), "b n (h d) -> b h n d", h=h) q = q * self.scale sim = einsum("... i d, ... j d -> ... i j", q, k) if exists(image_mask): image_mask = image_mask.unsqueeze(1).unsqueeze(1).bool() image_mask = image_mask.repeat_interleave(int(sim.shape[3] / image_mask.shape[3]), dim=-1) sim = sim.masked_fill(~image_mask, -torch.finfo(sim.dtype).max) # if exists(media_locations): # sim = sim.masked_fill(~text_to_media_mask, -torch.finfo(sim.dtype).max) sim = sim - sim.amax(dim=-1, keepdim=True).detach() attn = sim.softmax(dim=-1) if exists(media_locations) and self.only_attend_immediate_media: # any text without a preceding media needs to have attention zeroed out attn = attn.masked_fill(text_without_media_mask, 0.0) out = einsum("... i j, ... j d -> ... i d", attn, v) out = rearrange(out, "b h n d -> b n (h d)") return self.to_out(out) class GatedCrossAttentionBlock(nn.Module): def __init__( self, *, dim, dim_visual, dim_head=64, heads=12, ff_mult=1, only_attend_immediate_media=True, use_ft_layernorm=False, use_ft_flash_attention=False, enable_init_network_params=False, initializer_range=0.02, gradient_checkpointing=False, ): super().__init__() self.attn = MaskedCrossAttention( dim=dim, dim_visual=dim_visual, dim_head=dim_head, heads=heads, only_attend_immediate_media=only_attend_immediate_media, use_ft_flash_attention=use_ft_flash_attention, use_ft_layernorm=use_ft_layernorm, enable_init_network_params=enable_init_network_params, initializer_range=initializer_range, ) self.attn_gate = nn.Parameter(torch.zeros(dim)) self.ff = FeedForward(dim, mult=ff_mult) self.ff_gate = nn.Parameter(torch.zeros(dim)) self.gradient_checkpointing = gradient_checkpointing def forward( self, x, media, media_locations=None, use_cached_media=False, image_mask=None, ): flag = torch.sum(media_locations, dim=-1) flag = torch.where(flag > 0.0, 1.0, 0.0) flag = flag.unsqueeze(1).unsqueeze(1).to(torch.bfloat16) x = ( flag * self.attn( x, media, media_locations=media_locations, use_cached_media=use_cached_media, image_mask=image_mask, ) * self.attn_gate.tanh() + x ) x = flag * self.ff(x) * self.ff_gate.tanh() + x return x class FlamingoLayer(nn.Module): """ FlamingoLayer is a wrapper around the GatedCrossAttentionBlock and DecoderLayer. """ def __init__( self, gated_cross_attn_layer, decoder_layer, gradient_checkpointing=False ): super().__init__() self.gated_cross_attn_layer = gated_cross_attn_layer self.decoder_layer = decoder_layer self.vis_x = None self.media_locations = None if self.gated_cross_attn_layer is not None: self.gated_cross_attn_layer._use_gradient_checkpointing = ( gradient_checkpointing ) self.decoder_layer._use_gradient_checkpointing = gradient_checkpointing def is_conditioned(self) -> bool: """Check whether the layer is conditioned.""" return self.vis_x is not None and self.media_locations is not None # Used this great idea from this implementation of Flamingo (https://github.com/dhansmair/flamingo-mini/) def condition_vis_x(self, vis_x): if vis_x is not None: self.vis_x, self.image_mask = vis_x else: self.vis_x, self.image_mask = None, None def condition_media_locations(self, media_locations): self.media_locations = media_locations def condition_use_cached_media(self, use_cached_media): self.use_cached_media = use_cached_media def forward( self, lang_x, attention_mask=None, **decoder_layer_kwargs, ): # Cross attention if self.gated_cross_attn_layer is not None: if self.vis_x is None: raise ValueError("vis_x must be conditioned before forward pass") if self.media_locations is None: raise ValueError( "media_locations must be conditioned before forward pass" ) lang_x = self.gated_cross_attn_layer( lang_x, self.vis_x, media_locations=self.media_locations, use_cached_media=self.use_cached_media, image_mask=self.image_mask, ) # Normal decoder layer lang_x = self.decoder_layer( lang_x, attention_mask=attention_mask, **decoder_layer_kwargs ) return lang_x class FlamingoLMMixin(nn.Module): """ Mixin to add cross-attention layers to a language model. """ def set_decoder_layers_attr_name(self, decoder_layers_attr_name): self.decoder_layers_attr_name = decoder_layers_attr_name def _get_decoder_layers(self): return getattr_recursive(self, self.decoder_layers_attr_name) def _set_decoder_layers(self, value): setattr_recursive(self, self.decoder_layers_attr_name, value) def init_flamingo( self, media_token_id, lang_hidden_size, vis_hidden_size, cross_attn_every_n_layers, *, use_ft_layernorm=False, use_ft_flash_attention=False, enable_init_network_params=False, initializer_range=0.02, gradient_checkpointing=False, ): """ Initialize Flamingo by adding a new gated cross attn to the decoder. Store the media token id for computing the media locations. """ self.old_decoder_blocks = self._get_decoder_layers() self.gated_cross_attn_layers = nn.ModuleList( [ ( GatedCrossAttentionBlock( dim=lang_hidden_size, dim_visual=vis_hidden_size, use_ft_layernorm=use_ft_layernorm, use_ft_flash_attention=use_ft_flash_attention, enable_init_network_params=enable_init_network_params, initializer_range=initializer_range, gradient_checkpointing=gradient_checkpointing, ) if (layer_idx + 1) % cross_attn_every_n_layers == 0 else None ) for layer_idx, _ in enumerate(self._get_decoder_layers()) ] ) self.init_flamingo_layers(gradient_checkpointing) self.media_token_id = media_token_id self.initialized_flamingo = True self._use_cached_vision_x = False def init_flamingo_layers(self, gradient_checkpointing): """ Re initializes the FlamingoLayers. Propagates any changes made to self.gated_corss_attn_layers or self.old_decoder_blocks """ self._set_decoder_layers( nn.ModuleList( [ FlamingoLayer( gated_cross_attn_layer, decoder_layer, gradient_checkpointing ) for gated_cross_attn_layer, decoder_layer in zip( self.gated_cross_attn_layers, self.old_decoder_blocks ) ] ) ) def forward(self, input_ids, attention_mask, **kwargs): """Condition the Flamingo layers on the media locations before forward()""" if not self.initialized_flamingo: raise ValueError( "Flamingo layers are not initialized. Please call `init_flamingo`" " first." ) media_locations = input_ids == self.media_token_id # make all of the seq focus on the first fake image to avoid nan # if there are media already cached and we're generating and there are no media tokens in the input, # we'll assume that ALL input tokens should attend to the last previous media that is cached. # this is especially important for HF generate() compatibility, since generate() calls forward() # repeatedly one token at a time (with no media tokens). # without this check, the model would not attend to any images when generating (after the first token) use_cached_media_locations = ( self._use_cached_vision_x and self.is_conditioned() and not media_locations.any() ) for layer in self._get_decoder_layers(): if not use_cached_media_locations: layer.condition_media_locations(media_locations) layer.condition_use_cached_media(use_cached_media_locations) # package arguments for the other parent's forward. since we don't know the order of the arguments, # make them all kwargs kwargs["input_ids"] = input_ids kwargs["attention_mask"] = attention_mask return super().forward(**kwargs) # Call the other parent's forward method def is_conditioned(self) -> bool: """Check whether all decoder layers are already conditioned.""" return all(l.is_conditioned() for l in self._get_decoder_layers()) def clear_conditioned_layers(self): for layer in self._get_decoder_layers(): layer.condition_vis_x(None) layer.condition_media_locations(None) layer.condition_use_cached_media(None)