Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn as nn | |
import math | |
# --- Submodule: FusedQKVAttention --- | |
class FusedQKVAttention(nn.Module): | |
def __init__(self, d_model, num_heads): | |
super().__init__() | |
self.d_model = d_model | |
self.num_heads = num_heads | |
self.head_dim = d_model // num_heads | |
# Fused QKV projection | |
self.qkv_proj = nn.Linear(d_model, 3 * d_model) | |
self.wo = nn.Linear(d_model, d_model) | |
# Initialize weights for better training stability | |
nn.init.xavier_uniform_(self.qkv_proj.weight) | |
nn.init.xavier_uniform_(self.wo.weight) | |
nn.init.zeros_(self.qkv_proj.bias) | |
nn.init.zeros_(self.wo.bias) | |
def forward(self, x, attention_mask=None): | |
batch_size, seq_len, _ = x.shape | |
# Fused projection and reshape | |
qkv = self.qkv_proj(x).reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim) | |
qkv = qkv.permute(2, 0, 3, 1, 4) # [3, batch, heads, seq_len, head_dim] | |
q, k, v = qkv[0], qkv[1], qkv[2] | |
# Compute attention with memory efficiency | |
attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) | |
if attention_mask is not None: | |
attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) | |
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf')) | |
attention_weights = torch.softmax(attention_scores, dim=-1) | |
# Apply attention and reshape | |
context = torch.matmul(attention_weights, v) | |
context = context.transpose(1, 2).reshape(batch_size, seq_len, self.d_model) | |
return self.wo(context) | |
# --- Submodule: EnhancedFeedForward --- | |
class EnhancedFeedForward(nn.Module): | |
def __init__(self, d_model, ff_dim, dropout=0.1): | |
super().__init__() | |
self.linear1 = nn.Linear(d_model, ff_dim) | |
self.dropout1 = nn.Dropout(dropout) | |
self.linear2 = nn.Linear(ff_dim, d_model) | |
self.dropout2 = nn.Dropout(dropout) | |
self.activation = nn.GELU() | |
# Initialize weights for better training | |
nn.init.xavier_uniform_(self.linear1.weight) | |
nn.init.xavier_uniform_(self.linear2.weight) | |
nn.init.zeros_(self.linear1.bias) | |
nn.init.zeros_(self.linear2.bias) | |
def forward(self, x): | |
return self.dropout2(self.linear2(self.dropout1(self.activation(self.linear1(x))))) | |
# --- Submodule: EnhancedTransformerBlock --- | |
class EnhancedTransformerBlock(nn.Module): | |
def __init__(self, d_model, num_heads, ff_dim, dropout=0.1): | |
super().__init__() | |
self.attention = FusedQKVAttention(d_model, num_heads) | |
self.norm1 = nn.LayerNorm(d_model, eps=1e-6) | |
self.dropout1 = nn.Dropout(dropout) | |
self.feed_forward = EnhancedFeedForward(d_model, ff_dim, dropout) | |
self.norm2 = nn.LayerNorm(d_model, eps=1e-6) | |
self.dropout2 = nn.Dropout(dropout) | |
def forward(self, x, attention_mask=None): | |
# Pre-norm architecture | |
attn_input = self.norm1(x) | |
attn_output = self.attention(attn_input, attention_mask) | |
x = x + self.dropout1(attn_output) | |
ff_input = self.norm2(x) | |
ff_output = self.feed_forward(ff_input) | |
x = x + self.dropout2(ff_output) | |
return x | |
# --- Main Model Class: Snowflake4CausalLM --- | |
class Snowflake4CausalLM(nn.Module): | |
def __init__(self, vocab_size, max_seq_length, d_model, num_heads, num_layers, ff_dim, dropout=0.1): | |
super().__init__() | |
self.embedding = nn.Embedding(vocab_size, d_model) | |
# Initialize positional encodings without in-place modification | |
self.pos_encoding = nn.Parameter(torch.zeros(1, max_seq_length, d_model)) | |
position = torch.arange(max_seq_length).unsqueeze(1).float() | |
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) | |
pos_enc = torch.zeros(1, max_seq_length, d_model) | |
pos_enc[0, :, 0::2] = torch.sin(position * div_term) | |
pos_enc[0, :, 1::2] = torch.cos(position * div_term) | |
self.pos_encoding.data = pos_enc.data | |
self.layers = nn.ModuleList([ | |
EnhancedTransformerBlock(d_model, num_heads, ff_dim, dropout) | |
for _ in range(num_layers) | |
]) | |
self.final_norm = nn.LayerNorm(d_model, eps=1e-6) | |
self.dropout = nn.Dropout(dropout) | |
self.fc_out = nn.Linear(d_model, vocab_size) | |
# Tie embedding and output weights for memory efficiency and better generalization | |
self.fc_out.weight = self.embedding.weight | |
# Initialize embedding weights | |
nn.init.normal_(self.embedding.weight, mean=0, std=0.02) | |
def forward(self, input_ids, attention_mask=None): | |
seq_length = input_ids.size(1) | |
x = self.embedding(input_ids) + self.pos_encoding[:, :seq_length, :] | |
x = self.dropout(x) | |
for layer in self.layers: | |
x = layer(x, attention_mask) | |
x = self.final_norm(x) | |
return self.fc_out(x) |