| import os |
| from datetime import datetime |
|
|
| |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" |
| os.environ["CUDA_VISIBLE_DEVICES"] = "2" |
|
|
| import math |
| import random |
| import re |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from collections import Counter |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support |
| from sklearn.model_selection import train_test_split |
| from torch.utils.data import DataLoader, Dataset |
| from datasets import load_dataset |
|
|
| """ |
| Homework 1 (Part I) – Transformer-based sentiment analysis on the IMDB dataset. |
| |
| This script implements: |
| - Data loading and preprocessing for the IMDB movie review dataset |
| - A Transformer-based text classification model |
| - Training and evaluation loops for binary sentiment analysis |
| - Saving of the trained model together with vocabulary and configuration |
| |
| The code is organized into clearly separated sections: |
| 1) Data preparation and tokenization |
| 2) Transformer components (building blocks) |
| 3) Full Transformer classifier |
| 4) Training and evaluation logic |
| 5) Execution example using a train/validation split of IMDB |
| |
| Model Analysis and Improvement: |
| 1. After evaluation, delve into analyzing your model's behavior to identify |
| areas for improvement and fine-tuning. |
| 2. Analyze translation errors (if applicable): Examine specific translation |
| examples where the model performs poorly and try to understand the reasons |
| behind these errors. Are there issues with handling rare words or |
| idiomatic expressions? |
| 3. Explore the impact of model size: Experiment with different Transformer |
| model sizes (e.g., small, medium, large) to understand how model |
| complexity affects performance. |
| """ |
|
|
| |
| |
| |
|
|
| def tokenize(text): |
| """ |
| Tokenize a raw review string into a list of normalized word tokens. |
| |
| Steps: |
| - Convert to lowercase |
| - Remove HTML line breaks |
| - Remove non-alphanumeric characters (except whitespace) |
| - Split on whitespace |
| |
| Args: |
| text (str): Raw review text. |
| |
| Returns: |
| List[str]: List of token strings. |
| """ |
| text = text.lower() |
| text = re.sub(r"<br />", " ", text) |
| text = re.sub(r"[^a-zA-Z0-9\s]", "", text) |
| return text.split() |
|
|
| class IMDBDataset(Dataset): |
| """ |
| Torch Dataset wrapper for IMDB sequences and labels. |
| |
| Each item corresponds to: |
| - a fixed-length sequence of token IDs |
| - a sentiment label (0 = negative, 1 = positive) |
| """ |
| def __init__(self, sequences, labels): |
| self.sequences = torch.tensor(sequences, dtype=torch.long) |
| self.labels = torch.tensor(labels, dtype=torch.long) |
|
|
| def __len__(self): |
| return len(self.labels) |
|
|
| def __getitem__(self, idx): |
| return self.sequences[idx], self.labels[idx] |
|
|
| def build_vocab(texts, max_vocab_size=10000): |
| """ |
| Build a word-to-index vocabulary from a collection of texts. |
| |
| The vocabulary is constructed using token frequency counts from the |
| training set only to avoid information leakage. Two special tokens |
| are always included: |
| - "<PAD>" mapped to index 0 |
| - "<UNK>" mapped to index 1 |
| |
| The remaining (max_vocab_size - 2) most frequent tokens are added. |
| |
| Args: |
| texts (Iterable[str]): Training texts. |
| max_vocab_size (int): Maximum size of the vocabulary. |
| |
| Returns: |
| Dict[str, int]: Mapping from token string to integer index. |
| """ |
| counter = Counter() |
| for text in texts: |
| counter.update(tokenize(text)) |
| |
| |
| vocab = {"<PAD>": 0, "<UNK>": 1} |
| common_words = counter.most_common(max_vocab_size - 2) |
| for word, _ in common_words: |
| vocab[word] = len(vocab) |
| return vocab |
|
|
| def preprocess_data(texts, vocab, max_len=128): |
| """ |
| Convert raw texts into padded/truncated sequences of token IDs. |
| |
| Steps: |
| - Tokenize each text |
| - Map tokens to vocabulary indices (using <UNK> for OOV tokens) |
| - Truncate to max_len or pad with <PAD> to reach max_len |
| |
| Args: |
| texts (Iterable[str]): Input texts (reviews). |
| vocab (Dict[str, int]): Token-to-index mapping. |
| max_len (int): Maximum sequence length in tokens. |
| |
| Returns: |
| np.ndarray: Array of shape (num_examples, max_len) with dtype int. |
| """ |
| sequences = [] |
| for text in texts: |
| tokens = tokenize(text) |
| token_ids = [vocab.get(token, vocab["<UNK>"]) for token in tokens] |
| |
| if len(token_ids) < max_len: |
| token_ids += [vocab["<PAD>"]] * (max_len - len(token_ids)) |
| else: |
| token_ids = token_ids[:max_len] |
| sequences.append(token_ids) |
| return np.array(sequences) |
|
|
| |
| |
| |
|
|
| class PositionalEncoding(nn.Module): |
| """ |
| Sinusoidal positional encoding module. |
| |
| Implements the deterministic positional encoding from the original |
| Transformer paper ("Attention is All You Need"), which is added to |
| token embeddings to inject information about token positions. |
| """ |
| def __init__(self, d_model, max_len=5000): |
| super().__init__() |
| pe = torch.zeros(max_len, d_model) |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
| |
| pe[:, 0::2] = torch.sin(position * div_term) |
| pe[:, 1::2] = torch.cos(position * div_term) |
| |
| self.register_buffer('pe', pe.unsqueeze(0)) |
|
|
| def forward(self, x): |
| """ |
| Add positional encodings to input embeddings. |
| |
| Args: |
| x (Tensor): Input tensor of shape [batch_size, seq_len, d_model]. |
| |
| Returns: |
| Tensor: Positionally encoded representations with same shape as x. |
| """ |
| return x + self.pe[:, :x.size(1)] |
|
|
| class MultiHeadAttention(nn.Module): |
| """ |
| Multi-head self-attention mechanism. |
| |
| For each token, attention is computed over all tokens in the sequence |
| (including itself) using multiple attention heads. Each head operates |
| in its own subspace and the outputs are concatenated. |
| """ |
| def __init__(self, d_model, num_heads): |
| super().__init__() |
| assert d_model % num_heads == 0 |
| self.d_model = d_model |
| self.num_heads = num_heads |
| self.d_k = d_model // num_heads |
| |
| self.W_q = nn.Linear(d_model, d_model) |
| self.W_k = nn.Linear(d_model, d_model) |
| self.W_v = nn.Linear(d_model, d_model) |
| self.W_o = nn.Linear(d_model, d_model) |
|
|
| def forward(self, x, mask=None): |
| """ |
| Apply multi-head self-attention to the input sequence. |
| |
| Args: |
| x (Tensor): Input tensor of shape [batch_size, seq_len, d_model]. |
| mask (Tensor, optional): Attention mask of shape |
| [batch_size, 1, 1, seq_len] or broadcastable equivalent, |
| where positions with 0 are masked out. |
| |
| Returns: |
| Tensor: Output tensor of shape [batch_size, seq_len, d_model]. |
| """ |
| batch_size, seq_len, _ = x.shape |
| |
| |
| Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) |
| K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) |
| V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) |
| |
| |
| scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) |
| if mask is not None: |
| scores = scores.masked_fill(mask == 0, -1e9) |
| |
| attn = torch.softmax(scores, dim=-1) |
| context = torch.matmul(attn, V) |
| |
| |
| context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) |
| return self.W_o(context) |
|
|
| class TransformerEncoderBlock(nn.Module): |
| """ |
| Single Transformer encoder block consisting of: |
| - multi-head self-attention sublayer (with residual + layer norm) |
| - position-wise feed-forward sublayer (with residual + layer norm) |
| """ |
| def __init__(self, d_model, num_heads, d_ff, dropout=0.1): |
| super().__init__() |
| self.mha = MultiHeadAttention(d_model, num_heads) |
| self.ffn = nn.Sequential( |
| nn.Linear(d_model, d_ff), |
| nn.ReLU(), |
| nn.Linear(d_ff, d_model) |
| ) |
| self.layernorm1 = nn.LayerNorm(d_model) |
| self.layernorm2 = nn.LayerNorm(d_model) |
| self.dropout = nn.Dropout(dropout) |
|
|
| def forward(self, x, mask=None): |
| """ |
| Forward pass through one encoder block. |
| |
| Args: |
| x (Tensor): Input tensor of shape [batch_size, seq_len, d_model]. |
| mask (Tensor, optional): Attention mask (see MultiHeadAttention). |
| |
| Returns: |
| Tensor: Output tensor of shape [batch_size, seq_len, d_model]. |
| """ |
| |
| attn_out = self.mha(x, mask) |
| x = self.layernorm1(x + self.dropout(attn_out)) |
| |
| ffn_out = self.ffn(x) |
| x = self.layernorm2(x + self.dropout(ffn_out)) |
| return x |
|
|
| |
| |
| |
|
|
| class TransformerClassifier(nn.Module): |
| """ |
| Transformer-based text classifier for IMDB sentiment analysis. |
| |
| Architecture: |
| - Token embedding layer |
| - Sinusoidal positional encoding |
| - Stack of Transformer encoder blocks |
| - Global average pooling over sequence dimension |
| - Linear classification head to predict sentiment label |
| """ |
| def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len, num_classes=2, dropout=0.1): |
| super().__init__() |
| self.embedding = nn.Embedding(vocab_size, d_model) |
| self.pos_encoding = PositionalEncoding(d_model, max_len) |
| |
| self.encoder_layers = nn.ModuleList([ |
| TransformerEncoderBlock(d_model, num_heads, d_ff, dropout) |
| for _ in range(num_layers) |
| ]) |
| |
| self.dropout = nn.Dropout(dropout) |
| |
| self.classifier = nn.Linear(d_model, num_classes) |
|
|
| def forward(self, x, mask=None): |
| """ |
| Forward pass for the classifier. |
| |
| Args: |
| x (Tensor): Input tensor of token IDs |
| with shape [batch_size, seq_len]. |
| mask (Tensor, optional): Attention mask (not used in this script). |
| |
| Returns: |
| Tensor: Logits of shape [batch_size, num_classes]. |
| """ |
| x = self.dropout(self.pos_encoding(self.embedding(x))) |
| |
| for layer in self.encoder_layers: |
| x = layer(x, mask) |
| |
| |
| x = x.mean(dim=1) |
| return self.classifier(x) |
|
|
| |
| |
| |
|
|
| def train_model(model, train_loader, val_loader, epochs, lr, device): |
| """ |
| Train the Transformer classifier on the IMDB training split. |
| |
| Args: |
| model (nn.Module): TransformerClassifier instance. |
| train_loader (DataLoader): Batches of (sequence, label) for training. |
| val_loader (DataLoader): Batches for validation. |
| epochs (int): Number of full passes through the training set. |
| lr (float): Initial learning rate for Adam optimizer. |
| device (torch.device): Device on which to run training. |
| |
| Uses: |
| - CrossEntropyLoss for binary sentiment classification. |
| - Adam optimizer with StepLR scheduler (gamma=0.5 every 2 epochs). |
| """ |
| criterion = nn.CrossEntropyLoss() |
| optimizer = optim.Adam(model.parameters(), lr=lr) |
| scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5) |
| |
| model.to(device) |
| |
| for epoch in range(epochs): |
| model.train() |
| total_loss = 0 |
| for batch_seq, batch_lab in train_loader: |
| batch_seq, batch_lab = batch_seq.to(device), batch_lab.to(device) |
| |
| optimizer.zero_grad() |
| outputs = model(batch_seq) |
| loss = criterion(outputs, batch_lab) |
| loss.backward() |
| optimizer.step() |
| total_loss += loss.item() |
| |
| scheduler.step() |
| val_metrics = evaluate_model(model, val_loader, device) |
| val_acc = val_metrics["accuracy"] |
| val_p = val_metrics["precision"] |
| val_r = val_metrics["recall"] |
| val_f1 = val_metrics["f1"] |
| print( |
| f"Epoch {epoch+1}/{epochs} | " |
| f"Loss: {total_loss/len(train_loader):.4f} | " |
| f"Val Acc: {val_acc:.4f} | " |
| f"Val P: {val_p:.4f} | Val R: {val_r:.4f} | Val F1: {val_f1:.4f}" |
| ) |
|
|
| def evaluate_model(model, loader, device): |
| """ |
| Evaluate the model on a dataset. |
| |
| Args: |
| model (nn.Module): Trained (or partially trained) classifier. |
| loader (DataLoader): DataLoader for validation or test data. |
| device (torch.device): Device on which to perform evaluation. |
| |
| Returns: |
| Dict[str, float]: Dictionary with accuracy, precision, recall, and F1. |
| """ |
| model.eval() |
| all_preds = [] |
| all_labels = [] |
| |
| with torch.no_grad(): |
| for batch_seq, batch_lab in loader: |
| batch_seq, batch_lab = batch_seq.to(device), batch_lab.to(device) |
| outputs = model(batch_seq) |
| preds = torch.argmax(outputs, dim=1) |
| all_preds.extend(preds.cpu().numpy()) |
| all_labels.extend(batch_lab.cpu().numpy()) |
| |
| acc = accuracy_score(all_labels, all_preds) |
| p, r, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='binary') |
| return {"accuracy": acc, "precision": p, "recall": r, "f1": f1} |
|
|
| def count_trainable_parameters(model): |
| """ |
| Count the number of trainable parameters in a model. |
| |
| Args: |
| model (nn.Module): Model whose parameters should be counted. |
| |
| Returns: |
| int: Number of trainable parameters. |
| """ |
| return sum(p.numel() for p in model.parameters() if p.requires_grad) |
|
|
| def write_experiment_report_md( |
| report_path, |
| results, |
| best_result, |
| device, |
| train_size, |
| val_size, |
| ): |
| """ |
| Write a Markdown report summarizing model-size experiment results. |
| |
| Args: |
| report_path (str): Output Markdown file path. |
| results (List[Dict]): Per-model experiment outputs. |
| best_result (Dict): Best-performing entry from `results`. |
| device (torch.device): Device used during training. |
| train_size (int): Number of training samples. |
| val_size (int): Number of validation samples. |
| """ |
| lines = [] |
| lines.append("# IMDB Transformer Model-Size Experiment Report") |
| lines.append("") |
| lines.append(f"- Generated at: `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`") |
| lines.append(f"- Device: `{device}`") |
| lines.append(f"- Training samples: `{train_size}`") |
| lines.append(f"- Validation samples: `{val_size}`") |
| lines.append(f"- Max vocab size: `{MAX_VOCAB}`") |
| lines.append(f"- Max sequence length: `{MAX_LEN}`") |
| lines.append(f"- Batch size: `{BATCH_SIZE}`") |
| lines.append(f"- Epochs: `{EPOCHS}`") |
| lines.append(f"- Learning rate: `{LR}`") |
| lines.append("") |
|
|
| lines.append("## Overall Comparison") |
| lines.append("") |
| lines.append("| Model Size | Trainable Params | Accuracy | Precision | Recall | F1 | Checkpoint |") |
| lines.append("|---|---:|---:|---:|---:|---:|---|") |
| for item in results: |
| metrics = item["metrics"] |
| lines.append( |
| f"| {item['size']} | {item['params']:,} | " |
| f"{metrics['accuracy']:.4f} | {metrics['precision']:.4f} | " |
| f"{metrics['recall']:.4f} | {metrics['f1']:.4f} | " |
| f"`{item['checkpoint_path']}` |" |
| ) |
| lines.append("") |
|
|
| lines.append("## Best Model") |
| lines.append("") |
| lines.append(f"- Best size by validation F1: `{best_result['size']}`") |
| lines.append(f"- Checkpoint: `{best_result['checkpoint_path']}`") |
| lines.append(f"- Trainable parameters: `{best_result['params']:,}`") |
| lines.append("- Metrics:") |
| lines.append(f" - Accuracy: `{best_result['metrics']['accuracy']:.4f}`") |
| lines.append(f" - Precision: `{best_result['metrics']['precision']:.4f}`") |
| lines.append(f" - Recall: `{best_result['metrics']['recall']:.4f}`") |
| lines.append(f" - F1: `{best_result['metrics']['f1']:.4f}`") |
| lines.append("") |
|
|
| lines.append("## Per-Model Details") |
| lines.append("") |
| for item in results: |
| cfg = item["config"] |
| metrics = item["metrics"] |
| lines.append(f"### {item['size'].capitalize()} model") |
| lines.append("") |
| lines.append("- Architecture:") |
| lines.append(f" - `d_model`: `{cfg['d_model']}`") |
| lines.append(f" - `num_heads`: `{cfg['num_heads']}`") |
| lines.append(f" - `num_layers`: `{cfg['num_layers']}`") |
| lines.append(f" - `d_ff`: `{cfg['d_ff']}`") |
| lines.append(f"- Trainable params: `{item['params']:,}`") |
| lines.append(f"- Checkpoint: `{item['checkpoint_path']}`") |
| lines.append("- Validation metrics:") |
| lines.append(f" - Accuracy: `{metrics['accuracy']:.4f}`") |
| lines.append(f" - Precision: `{metrics['precision']:.4f}`") |
| lines.append(f" - Recall: `{metrics['recall']:.4f}`") |
| lines.append(f" - F1: `{metrics['f1']:.4f}`") |
| lines.append("") |
|
|
| with open(report_path, "w", encoding="utf-8") as f: |
| f.write("\n".join(lines)) |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| def load_imdb_texts(split: str = "train"): |
| """ |
| Load IMDB dataset texts and labels using `datasets.load_dataset`. |
| |
| Args: |
| split (str): Dataset split, e.g. "train" or "test". |
| |
| Returns: |
| Tuple[List[str], List[int]]: List of review texts and sentiment labels, |
| where labels are integers 0 (negative) and 1 (positive). |
| """ |
| ds = load_dataset("imdb", split=split) |
| texts = ds["text"] |
| labels = ds["label"] |
| return texts, labels |
|
|
| |
| |
| |
| |
| |
| MAX_VOCAB = 5000 |
| |
| |
| MAX_LEN = 64 |
| |
| |
| BATCH_SIZE = 32 |
| |
| EPOCHS = 5 |
| |
| LR = 0.001 |
|
|
| |
| |
| |
| MODEL_SIZES = { |
| "small": {"d_model": 64, "num_heads": 4, "num_layers": 1, "d_ff": 128}, |
| "medium": {"d_model": 128, "num_heads": 8, "num_layers": 2, "d_ff": 256}, |
| "large": {"d_model": 256, "num_heads": 8, "num_layers": 4, "d_ff": 512}, |
| } |
|
|
| |
| |
| |
| SAVE_DIR = os.path.join(".", "saved_model") |
| os.makedirs(SAVE_DIR, exist_ok=True) |
| MODEL_PATH = os.path.join(SAVE_DIR, "transformer_imdb.pt") |
| REPORT_PATH = os.path.join(SAVE_DIR, "transformer_imdb_experiment_report.md") |
|
|
| def main(): |
| """ |
| Train a Transformer-based sentiment classifier on IMDB and save the model, |
| vocabulary, and configuration to disk. |
| """ |
| |
| all_train_texts, all_train_labels = load_imdb_texts(split="train") |
|
|
| train_texts, val_texts, train_labels, val_labels = train_test_split( |
| all_train_texts, |
| all_train_labels, |
| test_size=0.2, |
| random_state=42, |
| stratify=all_train_labels, |
| ) |
|
|
| |
| vocab = build_vocab(train_texts, MAX_VOCAB) |
|
|
| |
| train_sequences = preprocess_data(train_texts, vocab, MAX_LEN) |
| val_sequences = preprocess_data(val_texts, vocab, MAX_LEN) |
|
|
| train_dataset = IMDBDataset(train_sequences, train_labels) |
| val_dataset = IMDBDataset(val_sequences, val_labels) |
|
|
| |
| train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True) |
| val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False) |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| results = [] |
|
|
| |
| |
| for size_name, size_cfg in MODEL_SIZES.items(): |
| print("\n" + "=" * 72) |
| print(f"Training {size_name.upper()} model with config: {size_cfg}") |
| print("=" * 72) |
|
|
| model = TransformerClassifier( |
| len(vocab), |
| size_cfg["d_model"], |
| size_cfg["num_heads"], |
| size_cfg["num_layers"], |
| size_cfg["d_ff"], |
| MAX_LEN, |
| ) |
| param_count = count_trainable_parameters(model) |
| print(f"Trainable parameters ({size_name}): {param_count:,}") |
|
|
| train_model(model, train_loader, val_loader, EPOCHS, LR, device) |
| val_metrics = evaluate_model(model, val_loader, device) |
| size_model_path = os.path.join(SAVE_DIR, f"transformer_imdb_{size_name}.pt") |
| results.append( |
| { |
| "size": size_name, |
| "params": param_count, |
| "config": size_cfg, |
| "metrics": val_metrics, |
| "checkpoint_path": size_model_path, |
| } |
| ) |
|
|
| |
| torch.save( |
| { |
| "model_state_dict": model.state_dict(), |
| "vocab": vocab, |
| "config": { |
| "max_vocab": MAX_VOCAB, |
| "max_len": MAX_LEN, |
| "batch_size": BATCH_SIZE, |
| "epochs": EPOCHS, |
| "lr": LR, |
| "size_name": size_name, |
| **size_cfg, |
| }, |
| "val_metrics": val_metrics, |
| }, |
| size_model_path, |
| ) |
| print(f"Saved {size_name} model to {size_model_path}") |
|
|
| |
| print("\n" + "#" * 72) |
| print("Model Size Impact Summary (Validation Set)") |
| print("#" * 72) |
| print(f"{'Size':<10} {'Params':>12} {'Acc':>8} {'Precision':>10} {'Recall':>8} {'F1':>8}") |
| for item in results: |
| m = item["metrics"] |
| print( |
| f"{item['size']:<10} " |
| f"{item['params']:>12,} " |
| f"{m['accuracy']:>8.4f} " |
| f"{m['precision']:>10.4f} " |
| f"{m['recall']:>8.4f} " |
| f"{m['f1']:>8.4f}" |
| ) |
|
|
| |
| best_result = max(results, key=lambda x: x["metrics"]["f1"]) |
| best_model_path = os.path.join(SAVE_DIR, f"transformer_imdb_{best_result['size']}.pt") |
| torch.save( |
| { |
| "best_size": best_result["size"], |
| "best_model_path": best_model_path, |
| "all_results": results, |
| }, |
| MODEL_PATH, |
| ) |
| print(f"\nBest model by Val F1: {best_result['size']} -> {best_model_path}") |
| print(f"Experiment summary saved to {MODEL_PATH}") |
|
|
| write_experiment_report_md( |
| REPORT_PATH, |
| results, |
| best_result, |
| device, |
| train_size=len(train_texts), |
| val_size=len(val_texts), |
| ) |
| print(f"Markdown report saved to {REPORT_PATH}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |