Luis Oala
first test
d62813f
import math
from abc import ABC, abstractmethod
from itertools import product
from typing import Any, Optional
import attr
import numpy as np
import torch
@attr.s
class AttentionMask(ABC):
query_context_size: int = attr.ib(validator=lambda i, a, x: x >= 1) # type: ignore
key_context_size: int = attr.ib(validator=lambda i, a, x: x >= 1) # type: ignore
block_size: int = attr.ib(validator=lambda i, a, x: x >= 1) # type: ignore
n_head: int = attr.ib(validator=lambda i, a, x: x >= 1) # type: ignore
is_head_specific: bool = attr.ib(default=False)
n_query_pad: int = attr.ib(default=0)
n_key_pad: int = attr.ib(default=0)
def __attrs_post_init__(self) -> None:
if self.query_context_size % self.block_size != 0:
raise ValueError()
if self.key_context_size % self.block_size != 0:
raise ValueError()
if self.n_query_pad >= self.query_context_size:
raise ValueError()
if self.n_key_pad >= self.key_context_size:
raise ValueError()
self.n_query_block = self.query_context_size // self.block_size
self.n_key_block = self.key_context_size // self.block_size
self.first_pad_query_block_idx = self.n_query_block - int(
math.ceil(self.n_query_pad / self.block_size)
)
self.first_pad_key_block_idx = self.n_key_block - int(
math.ceil(self.n_key_pad / self.block_size)
)
def _make_global_layout(self) -> None:
if not self.is_head_specific:
m = np.ones([self.n_query_block, self.n_key_block], dtype=np.bool)
r = product(*[range(n) for n in m.shape])
for qb, kb in r:
m[qb, kb] = np.any(self.block_layout(None, 0, qb, kb, 0))
else:
m = np.ones([self.n_head, self.n_query_block, self.n_key_block], dtype=np.bool)
r = product(*[range(n) for n in m.shape])
for h, qb, kb in r:
m[h, qb, kb] = np.any(self.block_layout(None, h, qb, kb, 0))
self.global_layout = m
@abstractmethod
def _block_layout(
self, blk_shape: Any, head_idx: int, query_idx: int, key_idx: int, blk_idx: int
) -> np.ndarray:
raise NotImplementedError()
def block_layout(
self, blk_shape: Any, head_idx: int, query_idx: int, key_idx: int, blk_idx: int
) -> np.ndarray:
"""
`query_idx`, `key_idx` are block-level, zero-based indices.
"""
m = np.ones([self.block_size, self.block_size], dtype=np.bool)
if query_idx >= self.first_pad_query_block_idx:
n_pad = min(
self.block_size,
(query_idx + 1) * self.block_size - (self.query_context_size - self.n_query_pad),
)
assert n_pad > 0
m[self.block_size - n_pad :] = False
if key_idx >= self.first_pad_key_block_idx:
n_pad = min(
self.block_size,
(key_idx + 1) * self.block_size - (self.key_context_size - self.n_key_pad),
)
assert n_pad > 0
m[:, self.block_size - n_pad :] = False
return m & self._block_layout(blk_shape, head_idx, query_idx, key_idx, blk_idx)
@attr.s
class DenseAttentionMask(AttentionMask):
def __attrs_post_init__(self) -> None:
super().__attrs_post_init__()
self.global_layout = np.ones([self.n_query_block, self.n_key_block], dtype=np.bool)
n_zero_query_blocks = self.n_query_pad // self.block_size
n_zero_key_blocks = self.n_key_pad // self.block_size
self.global_layout[self.n_query_block - n_zero_query_blocks :] = False
self.global_layout[:, self.n_key_block - n_zero_key_blocks :] = False
def _block_layout(
self, blk_shape: Any, head_idx: int, query_idx: int, key_idx: int, blk_idx: int
) -> np.ndarray:
return np.ones([self.block_size, self.block_size], dtype=np.bool)
@attr.s
class DenseCausalAttentionMask(AttentionMask):
def __attrs_post_init__(self) -> None:
super().__attrs_post_init__()
self.global_layout = np.tril(np.ones([self.n_query_block, self.n_key_block], dtype=np.bool))
n_zero_query_blocks = self.n_query_pad // self.block_size
n_zero_key_blocks = self.n_key_pad // self.block_size
self.global_layout[self.n_query_block - n_zero_query_blocks :] = False
self.global_layout[:, self.n_key_block - n_zero_key_blocks :] = False
def _block_layout(
self, blk_shape: Any, head_idx: int, query_idx: int, key_idx: int, blk_idx: int
) -> np.ndarray:
if query_idx > key_idx:
return np.ones(2 * [self.block_size], dtype=np.bool)
elif query_idx < key_idx:
return np.zeros(2 * [self.block_size], dtype=np.bool)
else:
return np.tril(np.ones(2 * [self.block_size], dtype=np.bool))
@attr.s(eq=False, repr=False)
class AttentionInfo:
n_heads: int = attr.ib()
ctx_blks_q: int = attr.ib()
ctx_blks_k: int = attr.ib()
block_size: int = attr.ib()
pytorch_attn_bias: Optional[torch.Tensor] = attr.ib()
def to_attention_info(d: AttentionMask) -> AttentionInfo:
return AttentionInfo(
n_heads=d.n_head,
ctx_blks_q=d.n_query_block,
ctx_blks_k=d.n_key_block,
block_size=d.block_size,
pytorch_attn_bias=None,
)
def make_full_layout(d: AttentionMask) -> np.ndarray:
"""
Returns the `context_size x context_size` layout matrix described by `d`. If the layout is dependent on the index of
the attention head, a `attention_head x context_size x context_size` layout matrix is returned instead.
"""
if not d.is_head_specific:
u = np.reshape(d.global_layout, [d.n_query_block, d.n_key_block, 1, 1])
r = product(range(d.n_query_block), range(d.n_key_block))
v = np.array([d.block_layout(None, 0, i, j, 0) for i, j in r])
v = np.reshape(v, [d.n_query_block, d.n_key_block, d.block_size, d.block_size])
w = u * v
w = np.transpose(w, [0, 2, 1, 3])
w = np.reshape(w, [d.query_context_size, d.key_context_size])
return w
else:
if len(d.global_layout.shape) == 2:
u = np.reshape(d.global_layout, [1, d.n_query_block, d.n_key_block, 1, 1])
u = np.tile(u, [d.n_head, 1, 1, 1, 1])
elif len(d.global_layout.shape) == 3:
u = np.reshape(d.global_layout, [d.n_head, d.n_query_block, d.n_key_block, 1, 1])
else:
raise RuntimeError()
s = product(range(d.n_head), range(d.n_query_block), range(d.n_key_block))
v = np.array([d.block_layout(None, i, j, k, 0) for i, j, k in s])
v = np.reshape(v, [d.n_head, d.n_query_block, d.n_key_block, d.block_size, d.block_size])
w = u * v
w = np.transpose(w, [0, 1, 3, 2, 4])
w = np.reshape(w, [d.n_head, d.query_context_size, d.key_context_size])
return w