testing-model / modeling_gpt.py
AliMuhammad73's picture
Upload model and tokenizer
3880c6e verified
# Model/model.py
import torch
import torch.nn as nn
import inspect
from huggingface_hub import PyTorchModelHubMixin
# Define hyperparameters and constants
BATCH_SIZE = 16
BLOCK_SIZE = 1024
MAX_ITERS = 5
EVAL_INTERVAL = 500
LEARNING_RATE = 6e-4
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
EVAL_ITERS = 200
N_EMBD = 768
N_HEAD = 12
N_LAYER = 12
DROPOUT = 0.2
MODEL_PATH = "Naive_gpt\model_weights_llama" # Where to save weights
class CausalSelfAttention(nn.Module):
def __init__(self):
super().__init__()
assert N_EMBD % N_HEAD == 0
# key, query, value projections for all heads, but in a batch
self.c_attn = nn.Linear(N_EMBD, 3 * N_EMBD)
# output projection
self.c_proj = nn.Linear(N_EMBD, N_EMBD)
self.c_proj.NANOGPT_SCALE_INIT = 1
# regularization
self.n_head = N_HEAD
self.n_embd = N_EMBD
def forward(self, x):
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
# nh is "number of heads", hs is "head size", and C (number of channels) = nh * hs
# e.g. in GPT-2 (124M), n_head=12, hs=64, so nh*hs=C=768 channels in the Transformer
qkv = self.c_attn(x)
q, k, v = qkv.split(self.n_embd, dim=2)
k = k.view(B, T, self.n_head, C //
self.n_head).transpose(1, 2) # (B, nh, T, hs)
q = q.view(B, T, self.n_head, C //
self.n_head).transpose(1, 2) # (B, nh, T, hs)
v = v.view(B, T, self.n_head, C //
self.n_head).transpose(1, 2) # (B, nh, T, hs)
y = nn.functional.scaled_dot_product_attention(
q, k, v, is_causal=True) # flash attention
# re-assemble all head outputs side by side
y = y.transpose(1, 2).contiguous().view(B, T, C)
# output projection
y = self.c_proj(y)
return y
class FeedFoward(nn.Module): #yeh MLP hai karpathy wala -> Feed forward hai sebastian wala
def __init__(self):
super().__init__()
self.c_fc = nn.Linear(N_EMBD, 4 * N_EMBD)
self.gelu = nn.GELU(approximate='tanh')
self.c_proj = nn.Linear(4 * N_EMBD, N_EMBD)
self.c_proj.NANOGPT_SCALE_INIT = 1
def forward(self, x):
x = self.c_fc(x)
x = self.gelu(x)
x = self.c_proj(x)
return x
""" a simple linear layer followed by a non-linearity """
class Block(nn.Module):
""" Transformer block: communication followed by computation """
def __init__(self, n_embd, n_head):
super().__init__()
head_size = N_EMBD // n_head
self.sa = CausalSelfAttention()
self.ffwd = FeedFoward()
self.ln1 = nn.LayerNorm(N_EMBD)
self.ln2 = nn.LayerNorm(N_EMBD)
def forward(self, x):
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
class GPTLanguageModel(nn.Module, PyTorchModelHubMixin):
def __init__(self, vocab_size=20000, block_size=1024, n_embd=768, n_head=12, n_layer=12):
super().__init__()
print("This is vocab size:", vocab_size)
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
self.position_embedding_table = nn.Embedding(block_size, n_embd)
self.blocks = nn.Sequential(
*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]
)
self.ln_f = nn.LayerNorm(n_embd)
self.lm_head = nn.Linear(n_embd, vocab_size)
self.token_embedding_table.weight = self.lm_head.weight
self.apply(self._init_weights)
self.config = {"BLOCK_SIZE": block_size, "N_EMBD": n_embd, "N_HEAD":n_head, "N_LAYER": n_layer}
def _init_weights(self, module):
if isinstance(module, nn.Linear):
std = 0.02
if hasattr(module, 'NANOGPT_SCALE_INIT'):
std *= (2 * N_LAYER) ** -0.5
torch.nn.init.normal_(module.weight, mean=0.0, std=std)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def forward(self, idx, targets=None):
B, T = idx.shape
assert T <= BLOCK_SIZE, f"Cannot forward sequence of length {T}, block size is only {BLOCK_SIZE}"
tok_emb = self.token_embedding_table(idx)
pos_emb = self.position_embedding_table(torch.arange(0, T, dtype=torch.long, device=idx.device))
x = tok_emb + pos_emb
x = self.blocks(x)
x = self.ln_f(x)
logits = self.lm_head(x)
if targets is None:
loss = None
else:
loss = nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
return logits, loss
def generate(self, idx, max_new_tokens, temperature=1.0):
"""
Generate tokens using the language model.
Args:
idx: Input token indices
max_new_tokens: Number of tokens to generate
temperature: Controls randomness in generation
- temperature > 1.0 increases randomness
- temperature < 1.0 decreases randomness
- temperature = 0 makes it deterministic (always picks highest probability)
"""
for _ in range(max_new_tokens):
# Truncate the sequence to the last BLOCK_SIZE tokens
idx_cond = idx[:, -BLOCK_SIZE:]
# Get logits from the model
logits, _ = self(idx_cond)
# Focus only on the last time step
logits = logits[:, -1, :]
if temperature == 0.0:
# For temperature = 0, simply take the argmax
idx_next = torch.argmax(logits, dim=-1, keepdim=True)
else:
# Apply temperature scaling
logits = logits / temperature
# Convert to probabilities
probs = torch.softmax(logits, dim=-1)
# Sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1)
# Append the new token to the sequence
idx = torch.cat((idx, idx_next), dim=1)
return idx
def save(self, path=MODEL_PATH):
torch.save(self.state_dict(), path)
def load(self, path=MODEL_PATH):
# Load the state dict
state_dict = torch.load(path)["model"]
new_state_dict = {}
for key, value in state_dict.items():
new_key = key.replace('_orig_mod.', '') # Remove 'orig_mod.' prefix
new_state_dict[new_key] = value
self.load_state_dict(new_state_dict)
def configure_optimizers(self, weight_decay=0.1, learning_rate=LEARNING_RATE, device=DEVICE):
param_dict = {pn: p for pn, p in self.named_parameters()}
param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
decay_parameters = [p for n, p in param_dict.items() if p.dim() >= 2]
nodecay_parameters = [p for n, p in param_dict.items() if p.dim() < 2]
optim_groups = [
{"params": decay_parameters, "weight_decay": weight_decay},
{"params": nodecay_parameters, "weight_decay": 0.0},
]
fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
use_fused = fused_available and device == "cuda"
optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=(0.9, 0.95), eps=1e-8, fused = use_fused)
return optimizer
MODEL_PATH = "Naive_gpt\model_weights_llama" # Where to save weights