latent-recurrent-depth-lm / modeling_latent_recurrent_depth.py
codewithdark's picture
Update modeling_latent_recurrent_depth.py
23d54f5 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
import math
from transformers import PretrainedConfig, PreTrainedModel
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.o_proj = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
batch_size, seq_len, d_model = x.shape
# Project and reshape for multi-head attention
q = self.q_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim)
k = self.k_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim)
v = self.v_proj(x).reshape(batch_size, seq_len, self.num_heads, self.head_dim)
# Transpose for attention computation
q = q.transpose(1, 2) # (batch_size, num_heads, seq_len, head_dim)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
# Compute attention scores
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# Apply attention to values
out = torch.matmul(attn_weights, v) # (batch_size, num_heads, seq_len, head_dim)
out = out.transpose(1, 2) # (batch_size, seq_len, num_heads, head_dim)
out = out.reshape(batch_size, seq_len, d_model)
return self.o_proj(out)
class PreludeBlock(nn.Module):
def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1):
super().__init__()
self.token_embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = nn.Parameter(torch.zeros(1, 1024, d_model)) # Max sequence length of 1024
self.attention = MultiHeadAttention(d_model, num_heads, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, 4 * d_model),
nn.GELU(),
nn.Linear(4 * d_model, d_model),
nn.Dropout(dropout)
)
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
seq_len = x.size(1)
# Embed tokens and add positional encoding
x = self.token_embedding(x) + self.pos_encoding[:, :seq_len, :]
# Self-attention block
attended = self.attention(self.norm1(x), mask)
x = x + attended
# Feed-forward block
x = x + self.feed_forward(self.norm2(x))
return x
class RecurrentBlock(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
super().__init__()
self.attention = MultiHeadAttention(d_model, num_heads, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, 4 * d_model),
nn.GELU(),
nn.Linear(4 * d_model, d_model),
nn.Dropout(dropout)
)
# Recurrent state projection
self.state_proj = nn.Linear(d_model, d_model)
def forward(self, x: torch.Tensor, recurrent_state: torch.Tensor,
mask: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]:
# Update recurrent state
recurrent_state = self.state_proj(recurrent_state)
# Combine input with recurrent state
x = x + recurrent_state
# Self-attention block
attended = self.attention(self.norm1(x), mask)
x = x + attended
# Feed-forward block
x = x + self.feed_forward(self.norm2(x))
return x, x # Return both output and new recurrent state
class CodaBlock(nn.Module):
def __init__(self, d_model: int, vocab_size: int):
super().__init__()
self.norm = nn.LayerNorm(d_model)
self.output_proj = nn.Linear(d_model, vocab_size)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.norm(x)
return self.output_proj(x)
class LatentRecurrentDepthLM(nn.Module):
def __init__(self, vocab_size: int, d_model: int, num_heads: int, dropout: float = 0.1):
super().__init__()
self.prelude = PreludeBlock(vocab_size, d_model, num_heads, dropout)
self.recurrent = RecurrentBlock(d_model, num_heads, dropout)
self.coda = CodaBlock(d_model, vocab_size)
def forward(self, x: torch.Tensor, num_iterations: int,
mask: Optional[torch.Tensor] = None) -> torch.Tensor:
# Initial embedding and processing
hidden = self.prelude(x, mask)
# Initialize recurrent state
recurrent_state = torch.zeros_like(hidden)
# Apply recurrent block multiple times
for _ in range(num_iterations):
hidden, recurrent_state = self.recurrent(hidden, recurrent_state, mask)
# Final output projection
return self.coda(hidden)
# Configuration for the Latent Recurrent Depth Model
class LatentRecurrentDepthConfig(PretrainedConfig):
model_type = "latent_recurrent_depth"
def __init__(self, vocab_size=50257, d_model=768, num_heads=12, dropout=0.1, **kwargs):
super().__init__(**kwargs)
self.vocab_size = vocab_size
self.d_model = d_model
self.num_heads = num_heads
self.dropout = dropout
# Hugging Face-Compatible Model Wrapper
class LatentRecurrentDepthModel(PreTrainedModel):
config_class = LatentRecurrentDepthConfig
base_model_prefix = "latent_recurrent_depth"
def __init__(self, config: LatentRecurrentDepthConfig):
super().__init__(config)
self.latent_model = LatentRecurrentDepthLM(config.vocab_size, config.d_model, config.num_heads, config.dropout)
self.init_weights()
def forward(self, input_ids: torch.Tensor, num_iterations: int, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
return self.latent_model(input_ids, num_iterations, mask)
def generate(
self,
input_ids: torch.Tensor,
max_length: int = 20,
num_iterations: int = 3,
temperature: float = 1.0,
top_k: Optional[int] = 50,
) -> torch.Tensor:
"""
Generate a sequence of tokens given input_ids.
Args:
input_ids: torch.Tensor of shape (batch, seq_length) containing the prompt.
max_length: The number of tokens to generate.
num_iterations: The number of recurrent iterations to use in each forward pass.
temperature: Temperature for scaling logits.
top_k: If set, only sample from the top k tokens.
Returns:
generated: torch.Tensor containing the generated sequence.
"""
generated = input_ids.clone()
self.eval()
with torch.no_grad():
for _ in range(max_length):
# Get logits from the model for the current sequence.
logits = self.forward(generated, num_iterations=num_iterations)
# Use only the logits for the last token in the sequence.
next_token_logits = logits[:, -1, :] / temperature
if top_k is not None:
# Top-k filtering
top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k)
probabilities = F.softmax(top_k_logits, dim=-1)
next_token = top_k_indices.gather(-1, torch.multinomial(probabilities, num_samples=1))
else:
probabilities = F.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probabilities, num_samples=1)
generated = torch.cat([generated, next_token], dim=1)
# Optionally, break if the EOS token is generated.
if next_token.item() == self.config.eos_token_id:
break
return generated