FAPM_demo / esm /axial_attention.py
wenkai's picture
Upload 31 files
3f0529e verified
raw
history blame
8.53 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import torch
import torch.nn as nn
class RowSelfAttention(nn.Module):
"""Compute self-attention over rows of a 2D input."""
def __init__(
self,
embed_dim,
num_heads,
dropout=0.0,
max_tokens_per_msa: int = 2 ** 16,
):
super().__init__()
self.num_heads = num_heads
self.dropout = dropout
self.head_dim = embed_dim // num_heads
self.scaling = self.head_dim ** -0.5
self.max_tokens_per_msa = max_tokens_per_msa
self.attn_shape = "hnij"
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout_module = nn.Dropout(dropout)
def align_scaling(self, q):
num_rows = q.size(0)
return self.scaling / math.sqrt(num_rows)
def _batched_forward(
self,
x,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
max_rows = max(1, self.max_tokens_per_msa // num_cols)
attns = 0
scaling = self.align_scaling(x)
for start in range(0, num_rows, max_rows):
attn_weights = self.compute_attention_weights(
x[start : start + max_rows],
scaling,
self_attn_mask=self_attn_mask,
self_attn_padding_mask=self_attn_padding_mask[:, start : start + max_rows]
if self_attn_padding_mask is not None
else None,
)
attns += attn_weights
attn_probs = attns.softmax(-1)
attn_probs = self.dropout_module(attn_probs)
outputs = []
for start in range(0, num_rows, max_rows):
output = self.compute_attention_update(x[start : start + max_rows], attn_probs)
outputs.append(output)
output = torch.cat(outputs, 0)
return output, attn_probs
def compute_attention_weights(
self,
x,
scaling: float,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
q = self.q_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
k = self.k_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
q *= scaling
if self_attn_padding_mask is not None:
# Zero out any padded aligned positions - this is important since
# we take a sum across the alignment axis.
q *= 1 - self_attn_padding_mask.permute(1, 2, 0).unsqueeze(3).unsqueeze(4).to(q)
attn_weights = torch.einsum(f"rinhd,rjnhd->{self.attn_shape}", q, k)
if self_attn_mask is not None:
raise NotImplementedError
# Mask Size: [B x R x C], Weights Size: [H x B x C x C]
if self_attn_padding_mask is not None:
attn_weights = attn_weights.masked_fill(
self_attn_padding_mask[:, 0].unsqueeze(0).unsqueeze(2),
-10000,
)
return attn_weights
def compute_attention_update(
self,
x,
attn_probs,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
v = self.v_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
context = torch.einsum(f"{self.attn_shape},rjnhd->rinhd", attn_probs, v)
context = context.contiguous().view(num_rows, num_cols, batch_size, embed_dim)
output = self.out_proj(context)
return output
def forward(
self,
x,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
if (num_rows * num_cols > self.max_tokens_per_msa) and not torch.is_grad_enabled():
return self._batched_forward(x, self_attn_mask, self_attn_padding_mask)
else:
scaling = self.align_scaling(x)
attn_weights = self.compute_attention_weights(
x, scaling, self_attn_mask, self_attn_padding_mask
)
attn_probs = attn_weights.softmax(-1)
attn_probs = self.dropout_module(attn_probs)
output = self.compute_attention_update(x, attn_probs)
return output, attn_probs
class ColumnSelfAttention(nn.Module):
"""Compute self-attention over columns of a 2D input."""
def __init__(
self,
embed_dim,
num_heads,
dropout=0.0,
max_tokens_per_msa: int = 2 ** 16,
):
super().__init__()
self.num_heads = num_heads
self.dropout = dropout
self.head_dim = embed_dim // num_heads
self.scaling = self.head_dim ** -0.5
self.max_tokens_per_msa = max_tokens_per_msa
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout_module = nn.Dropout(dropout)
def _batched_forward(
self,
x,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
max_cols = max(1, self.max_tokens_per_msa // num_rows)
outputs = []
attns = []
for start in range(0, num_cols, max_cols):
output, attn = self(
x[:, start : start + max_cols],
self_attn_mask=self_attn_mask,
self_attn_padding_mask=self_attn_padding_mask[:, :, start : start + max_cols]
if self_attn_padding_mask is not None
else None,
)
outputs.append(output)
attns.append(attn)
output = torch.cat(outputs, 1)
attns = torch.cat(attns, 1)
return output, attns
def compute_attention_update(
self,
x,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
if num_rows == 1:
# if there is only 1 position, this is equivalent and doesn't break with padding
attn_probs = torch.ones(
self.num_heads,
num_cols,
batch_size,
num_rows,
num_rows,
device=x.device,
dtype=x.dtype,
)
output = self.out_proj(self.v_proj(x))
else:
q = self.q_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
k = self.k_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
v = self.v_proj(x).view(num_rows, num_cols, batch_size, self.num_heads, self.head_dim)
q *= self.scaling
attn_weights = torch.einsum("icnhd,jcnhd->hcnij", q, k)
if self_attn_mask is not None:
raise NotImplementedError
if self_attn_padding_mask is not None:
attn_weights = attn_weights.masked_fill(
self_attn_padding_mask.permute(2, 0, 1).unsqueeze(0).unsqueeze(3),
-10000,
)
attn_probs = attn_weights.softmax(-1)
attn_probs = self.dropout_module(attn_probs)
context = torch.einsum("hcnij,jcnhd->icnhd", attn_probs, v)
context = context.contiguous().view(num_rows, num_cols, batch_size, embed_dim)
output = self.out_proj(context)
return output, attn_probs
def forward(
self,
x,
self_attn_mask=None,
self_attn_padding_mask=None,
):
num_rows, num_cols, batch_size, embed_dim = x.size()
# if False and num_rows * num_cols > 2 ** 14 and not torch.is_grad_enabled():
if (num_rows * num_cols) > self.max_tokens_per_msa and not torch.is_grad_enabled():
return self._batched_forward(
x,
self_attn_mask,
self_attn_padding_mask,
)
else:
return self.compute_attention_update(x, self_attn_mask, self_attn_padding_mask)