|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from typing import Optional, Tuple |
|
import math |
|
from transformers import PretrainedConfig, PreTrainedModel |
|
|
|
class MultiHeadAttention(nn.Module): |
|
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): |
|
super().__init__() |
|
assert d_model % num_heads == 0 |
|
|
|
self.d_model = d_model |
|
self.num_heads = num_heads |
|
self.head_dim = d_model // num_heads |
|
|
|
self.q_proj = nn.Linear(d_model, d_model) |
|
self.k_proj = nn.Linear(d_model, d_model) |
|
self.v_proj = nn.Linear(d_model, d_model) |
|
self.o_proj = nn.Linear(d_model, d_model) |
|
|
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
batch_size, seq_len, d_model = x.shape |
|
|
|
|
|
q = self.q_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
|
k = self.k_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
|
v = self.v_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim) |
|
|
|
|
|
q = q.transpose(1, 2) |
|
k = k.transpose(1, 2) |
|
v = v.transpose(1, 2) |
|
|
|
|
|
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) |
|
|
|
if mask is not None: |
|
scores = scores.masked_fill(mask == 0, float('-inf')) |
|
|
|
attn_weights = F.softmax(scores, dim=-1) |
|
attn_weights = self.dropout(attn_weights) |
|
|
|
|
|
out = torch.matmul(attn_weights, v) |
|
out = out.transpose(1, 2) |
|
out = out.reshape(batch_size, seq_len, d_model) |
|
|
|
return self.o_proj(out) |
|
|
|
class PreludeBlock(nn.Module): |
|
def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): |
|
super().__init__() |
|
self.token_embedding = nn.Embedding(vocab_size, d_model) |
|
self.pos_encoding = nn.Parameter(torch.zeros(1, 1024, d_model)) |
|
self.attention = MultiHeadAttention(d_model, num_heads, dropout) |
|
self.norm1 = nn.LayerNorm(d_model) |
|
self.norm2 = nn.LayerNorm(d_model) |
|
self.feed_forward = nn.Sequential( |
|
nn.Linear(d_model, 4 * d_model), |
|
nn.GELU(), |
|
nn.Linear(4 * d_model, d_model), |
|
nn.Dropout(dropout) |
|
) |
|
|
|
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
seq_len = x.size(1) |
|
|
|
x = self.token_embedding(x) + self.pos_encoding[:, :seq_len, :] |
|
|
|
|
|
attended = self.attention(self.norm1(x), mask) |
|
x = x + attended |
|
|
|
|
|
x = x + self.feed_forward(self.norm2(x)) |
|
return x |
|
|
|
class RecurrentBlock(nn.Module): |
|
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1): |
|
super().__init__() |
|
self.attention = MultiHeadAttention(d_model, num_heads, dropout) |
|
self.norm1 = nn.LayerNorm(d_model) |
|
self.norm2 = nn.LayerNorm(d_model) |
|
self.feed_forward = nn.Sequential( |
|
nn.Linear(d_model, 4 * d_model), |
|
nn.GELU(), |
|
nn.Linear(4 * d_model, d_model), |
|
nn.Dropout(dropout) |
|
) |
|
|
|
|
|
self.state_proj = nn.Linear(d_model, d_model) |
|
|
|
def forward(self, x: torch.Tensor, recurrent_state: torch.Tensor, |
|
mask: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
|
recurrent_state = self.state_proj(recurrent_state) |
|
|
|
|
|
x = x + recurrent_state |
|
|
|
|
|
attended = self.attention(self.norm1(x), mask) |
|
x = x + attended |
|
|
|
|
|
x = x + self.feed_forward(self.norm2(x)) |
|
|
|
return x, x |
|
|
|
class CodaBlock(nn.Module): |
|
def __init__(self, d_model: int, vocab_size: int): |
|
super().__init__() |
|
self.norm = nn.LayerNorm(d_model) |
|
self.output_proj = nn.Linear(d_model, vocab_size) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
x = self.norm(x) |
|
return self.output_proj(x) |
|
|
|
class LatentRecurrentDepthLM(nn.Module): |
|
def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1): |
|
super().__init__() |
|
self.prelude = PreludeBlock(vocab_size, d_model, num_heads, dropout) |
|
self.recurrent = RecurrentBlock(d_model, num_heads, dropout) |
|
self.coda = CodaBlock(d_model, vocab_size) |
|
|
|
def forward(self, x: torch.Tensor, num_iterations: int, |
|
mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
|
|
hidden = self.prelude(x, mask) |
|
|
|
|
|
recurrent_state = torch.zeros_like(hidden) |
|
|
|
|
|
for _ in range(num_iterations): |
|
hidden, recurrent_state = self.recurrent(hidden, recurrent_state, mask) |
|
|
|
|
|
return self.coda(hidden) |
|
|
|
|
|
|
|
|
|
class LatentRecurrentDepthConfig(PretrainedConfig): |
|
model_type = "latent_recurrent_depth" |
|
|
|
def __init__(self, vocab_size=50257, d_model=768, num_heads=12, dropout=0.1, **kwargs): |
|
super().__init__(**kwargs) |
|
self.vocab_size = vocab_size |
|
self.d_model = d_model |
|
self.num_heads = num_heads |
|
self.dropout = dropout |
|
|
|
|
|
|
|
class LatentRecurrentDepthModel(PreTrainedModel): |
|
config_class = LatentRecurrentDepthConfig |
|
base_model_prefix = "latent_recurrent_depth" |
|
|
|
def __init__(self, config: LatentRecurrentDepthConfig): |
|
super().__init__(config) |
|
self.latent_model = LatentRecurrentDepthLM(config.vocab_size, config.d_model, config.num_heads, config.dropout) |
|
self.init_weights() |
|
|
|
def forward(self, input_ids: torch.Tensor, num_iterations: int, mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
return self.latent_model(input_ids, num_iterations, mask) |
|
|
|
def generate( |
|
self, |
|
input_ids: torch.Tensor, |
|
max_length: int = 20, |
|
num_iterations: int = 3, |
|
temperature: float = 1.0, |
|
top_k: Optional[int] = 50, |
|
) -> torch.Tensor: |
|
""" |
|
Generate a sequence of tokens given input_ids. |
|
|
|
Args: |
|
input_ids: torch.Tensor of shape (batch, seq_length) containing the prompt. |
|
max_length: The number of tokens to generate. |
|
num_iterations: The number of recurrent iterations to use in each forward pass. |
|
temperature: Temperature for scaling logits. |
|
top_k: If set, only sample from the top k tokens. |
|
|
|
Returns: |
|
generated: torch.Tensor containing the generated sequence. |
|
""" |
|
generated = input_ids.clone() |
|
self.eval() |
|
with torch.no_grad(): |
|
for _ in range(max_length): |
|
|
|
logits = self.forward(generated, num_iterations=num_iterations) |
|
|
|
next_token_logits = logits[:, -1, :] / temperature |
|
if top_k is not None: |
|
|
|
top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k) |
|
probabilities = F.softmax(top_k_logits, dim=-1) |
|
next_token = top_k_indices.gather(-1, torch.multinomial(probabilities, num_samples=1)) |
|
else: |
|
probabilities = F.softmax(next_token_logits, dim=-1) |
|
next_token = torch.multinomial(probabilities, num_samples=1) |
|
generated = torch.cat([generated, next_token], dim=1) |
|
|
|
if next_token.item() == self.config.eos_token_id: |
|
break |
|
return generated |
|
|