infimm-hd / flamingo_lm.py
liuhaogeng
first commit
b0b3b00
import torch
import torch.nn as nn
from einops import rearrange, repeat
from torch import einsum, nn
from einops_exts import rearrange_many
# from .modules import GatedCrossAttentionBlock
from .utils import getattr_recursive, setattr_recursive
def exists(val):
return val is not None
def FeedForward(
dim,
mult=4,
use_ft_layernorm=False,
enable_init_network_params=False,
initializer_range=0.02,
):
inner_dim = int(dim * mult)
net = nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, inner_dim, bias=False),
nn.GELU(),
nn.Linear(inner_dim, dim, bias=False),
)
if use_ft_layernorm and enable_init_network_params:
# only use_ft_layernorm is on and enalbe_init_network_params
# then start the initialization
net[0].weight.data.normal_(mean=0.0, std=initializer_range)
net[0].bias.data.zero_()
net[1].weight.data.normal_(mean=0.0, std=initializer_range)
net[3].weight.data.normal_(mean=0.0, std=initializer_range)
return net
# gated cross attention
class MaskedCrossAttention(nn.Module):
def __init__(
self,
*,
dim,
dim_visual,
dim_head=64,
heads=8,
only_attend_immediate_media=True,
use_ft_layernorm=False,
use_ft_flash_attention=False,
enable_init_network_params=False,
initializer_range=0.02,
):
super().__init__()
self.scale = dim_head**-0.5
self.heads = heads
self.use_ft_flash_attention = False
self.initializer_range = initializer_range
inner_dim = dim_head * heads
self.norm = nn.LayerNorm(dim)
self.to_q = nn.Linear(dim, inner_dim, bias=False)
self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False)
self.to_out = nn.Linear(inner_dim, dim, bias=False)
# whether for text to only attend to immediate preceding image, or all previous images
self.only_attend_immediate_media = only_attend_immediate_media
if enable_init_network_params:
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
# Slightly different from the TF version which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(mean=0.0, std=self.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(self, x, media, media_locations=None, use_cached_media=False, image_mask=None):
"""
Args:
x (torch.Tensor): text features
shape (B, T_txt, D_txt)
media (torch.Tensor): image features
shape (B, T_img, n, D_img) where n is the dim of the latents
media_locations: boolean mask identifying the media tokens in x
shape (B, T_txt)
use_cached_media: bool
If true, treat all of x as if they occur after the last media
registered in media_locations. T_txt does not need to exactly
equal media_locations.shape[1] in this case
"""
if not use_cached_media:
assert media_locations.shape[1] == x.shape[1], (
f"media_location.shape is {media_locations.shape} but x.shape is"
f" {x.shape}"
)
T_txt = x.shape[1]
_, T_img, n = media.shape[:3]
h = self.heads
x = self.norm(x.contiguous())
q = self.to_q(x)
media = rearrange(media, "b t n d -> b (t n) d")
k, v = self.to_kv(media).chunk(2, dim=-1)
if exists(media_locations):
media_time = torch.arange(T_img, device=x.device) + 1
if use_cached_media:
# text time is set to the last cached media location
text_time = repeat(
torch.count_nonzero(media_locations, dim=1),
"b -> b i",
i=T_txt,
)
else:
# at each boolean of True, increment the time counter (relative to media time)
text_time = media_locations.cumsum(dim=-1)
# text time must equal media time if only attending to most immediate image
# otherwise, as long as text time is greater than media time (if attending to all previous images / media)
mask_op = torch.eq if self.only_attend_immediate_media else torch.ge
text_to_media_mask = mask_op(
rearrange(text_time, "b i -> b 1 i 1"),
repeat(media_time, "j -> 1 1 1 (j n)", n=n),
)
if self.only_attend_immediate_media:
# any text without a preceding media needs to have attention zeroed out
text_without_media_mask = text_time == 0
text_without_media_mask = rearrange(
text_without_media_mask, "b i -> b 1 i 1"
)
q, k, v = rearrange_many((q, k, v), "b n (h d) -> b h n d", h=h)
q = q * self.scale
sim = einsum("... i d, ... j d -> ... i j", q, k)
if exists(image_mask):
image_mask = image_mask.unsqueeze(1).unsqueeze(1).bool()
image_mask = image_mask.repeat_interleave(int(sim.shape[3] / image_mask.shape[3]), dim=-1)
sim = sim.masked_fill(~image_mask, -torch.finfo(sim.dtype).max)
# if exists(media_locations):
# sim = sim.masked_fill(~text_to_media_mask, -torch.finfo(sim.dtype).max)
sim = sim - sim.amax(dim=-1, keepdim=True).detach()
attn = sim.softmax(dim=-1)
if exists(media_locations) and self.only_attend_immediate_media:
# any text without a preceding media needs to have attention zeroed out
attn = attn.masked_fill(text_without_media_mask, 0.0)
out = einsum("... i j, ... j d -> ... i d", attn, v)
out = rearrange(out, "b h n d -> b n (h d)")
return self.to_out(out)
class GatedCrossAttentionBlock(nn.Module):
def __init__(
self,
*,
dim,
dim_visual,
dim_head=64,
heads=12,
ff_mult=1,
only_attend_immediate_media=True,
use_ft_layernorm=False,
use_ft_flash_attention=False,
enable_init_network_params=False,
initializer_range=0.02,
gradient_checkpointing=False,
):
super().__init__()
self.attn = MaskedCrossAttention(
dim=dim,
dim_visual=dim_visual,
dim_head=dim_head,
heads=heads,
only_attend_immediate_media=only_attend_immediate_media,
use_ft_flash_attention=use_ft_flash_attention,
use_ft_layernorm=use_ft_layernorm,
enable_init_network_params=enable_init_network_params,
initializer_range=initializer_range,
)
self.attn_gate = nn.Parameter(torch.zeros(dim))
self.ff = FeedForward(dim, mult=ff_mult)
self.ff_gate = nn.Parameter(torch.zeros(dim))
self.gradient_checkpointing = gradient_checkpointing
def forward(
self,
x,
media,
media_locations=None,
use_cached_media=False,
image_mask=None,
):
flag = torch.sum(media_locations, dim=-1)
flag = torch.where(flag > 0.0, 1.0, 0.0)
flag = flag.unsqueeze(1).unsqueeze(1).to(torch.bfloat16)
x = (
flag
* self.attn(
x,
media,
media_locations=media_locations,
use_cached_media=use_cached_media,
image_mask=image_mask,
)
* self.attn_gate.tanh()
+ x
)
x = flag * self.ff(x) * self.ff_gate.tanh() + x
return x
class FlamingoLayer(nn.Module):
"""
FlamingoLayer is a wrapper around the GatedCrossAttentionBlock and DecoderLayer.
"""
def __init__(
self, gated_cross_attn_layer, decoder_layer, gradient_checkpointing=False
):
super().__init__()
self.gated_cross_attn_layer = gated_cross_attn_layer
self.decoder_layer = decoder_layer
self.vis_x = None
self.media_locations = None
if self.gated_cross_attn_layer is not None:
self.gated_cross_attn_layer._use_gradient_checkpointing = (
gradient_checkpointing
)
self.decoder_layer._use_gradient_checkpointing = gradient_checkpointing
def is_conditioned(self) -> bool:
"""Check whether the layer is conditioned."""
return self.vis_x is not None and self.media_locations is not None
# Used this great idea from this implementation of Flamingo (https://github.com/dhansmair/flamingo-mini/)
def condition_vis_x(self, vis_x):
if vis_x is not None:
self.vis_x, self.image_mask = vis_x
else:
self.vis_x, self.image_mask = None, None
def condition_media_locations(self, media_locations):
self.media_locations = media_locations
def condition_use_cached_media(self, use_cached_media):
self.use_cached_media = use_cached_media
def forward(
self,
lang_x,
attention_mask=None,
**decoder_layer_kwargs,
):
# Cross attention
if self.gated_cross_attn_layer is not None:
if self.vis_x is None:
raise ValueError("vis_x must be conditioned before forward pass")
if self.media_locations is None:
raise ValueError(
"media_locations must be conditioned before forward pass"
)
lang_x = self.gated_cross_attn_layer(
lang_x,
self.vis_x,
media_locations=self.media_locations,
use_cached_media=self.use_cached_media,
image_mask=self.image_mask,
)
# Normal decoder layer
lang_x = self.decoder_layer(
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs
)
return lang_x
class FlamingoLMMixin(nn.Module):
"""
Mixin to add cross-attention layers to a language model.
"""
def set_decoder_layers_attr_name(self, decoder_layers_attr_name):
self.decoder_layers_attr_name = decoder_layers_attr_name
def _get_decoder_layers(self):
return getattr_recursive(self, self.decoder_layers_attr_name)
def _set_decoder_layers(self, value):
setattr_recursive(self, self.decoder_layers_attr_name, value)
def init_flamingo(
self,
media_token_id,
lang_hidden_size,
vis_hidden_size,
cross_attn_every_n_layers,
*,
use_ft_layernorm=False,
use_ft_flash_attention=False,
enable_init_network_params=False,
initializer_range=0.02,
gradient_checkpointing=False,
):
"""
Initialize Flamingo by adding a new gated cross attn to the decoder. Store the media token id for computing the media locations.
"""
self.old_decoder_blocks = self._get_decoder_layers()
self.gated_cross_attn_layers = nn.ModuleList(
[
(
GatedCrossAttentionBlock(
dim=lang_hidden_size,
dim_visual=vis_hidden_size,
use_ft_layernorm=use_ft_layernorm,
use_ft_flash_attention=use_ft_flash_attention,
enable_init_network_params=enable_init_network_params,
initializer_range=initializer_range,
gradient_checkpointing=gradient_checkpointing,
)
if (layer_idx + 1) % cross_attn_every_n_layers == 0
else None
)
for layer_idx, _ in enumerate(self._get_decoder_layers())
]
)
self.init_flamingo_layers(gradient_checkpointing)
self.media_token_id = media_token_id
self.initialized_flamingo = True
self._use_cached_vision_x = False
def init_flamingo_layers(self, gradient_checkpointing):
"""
Re initializes the FlamingoLayers.
Propagates any changes made to self.gated_corss_attn_layers or self.old_decoder_blocks
"""
self._set_decoder_layers(
nn.ModuleList(
[
FlamingoLayer(
gated_cross_attn_layer, decoder_layer, gradient_checkpointing
)
for gated_cross_attn_layer, decoder_layer in zip(
self.gated_cross_attn_layers, self.old_decoder_blocks
)
]
)
)
def forward(self, input_ids, attention_mask, **kwargs):
"""Condition the Flamingo layers on the media locations before forward()"""
if not self.initialized_flamingo:
raise ValueError(
"Flamingo layers are not initialized. Please call `init_flamingo`"
" first."
)
media_locations = input_ids == self.media_token_id
# make all of the seq focus on the first fake image to avoid nan
# if there are media already cached and we're generating and there are no media tokens in the input,
# we'll assume that ALL input tokens should attend to the last previous media that is cached.
# this is especially important for HF generate() compatibility, since generate() calls forward()
# repeatedly one token at a time (with no media tokens).
# without this check, the model would not attend to any images when generating (after the first token)
use_cached_media_locations = (
self._use_cached_vision_x
and self.is_conditioned()
and not media_locations.any()
)
for layer in self._get_decoder_layers():
if not use_cached_media_locations:
layer.condition_media_locations(media_locations)
layer.condition_use_cached_media(use_cached_media_locations)
# package arguments for the other parent's forward. since we don't know the order of the arguments,
# make them all kwargs
kwargs["input_ids"] = input_ids
kwargs["attention_mask"] = attention_mask
return super().forward(**kwargs) # Call the other parent's forward method
def is_conditioned(self) -> bool:
"""Check whether all decoder layers are already conditioned."""
return all(l.is_conditioned() for l in self._get_decoder_layers())
def clear_conditioned_layers(self):
for layer in self._get_decoder_layers():
layer.condition_vis_x(None)
layer.condition_media_locations(None)
layer.condition_use_cached_media(None)