| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import torch |
| | import torch.nn.functional as F |
| | import numpy as np |
| | from collections import defaultdict |
| |
|
| | DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| |
|
| | class InternalAnalyzer: |
| | def __init__(self, model, tokenizer, max_len=512): |
| | self.model = model.to(DEVICE).eval() |
| | self.tokenizer = tokenizer |
| | self.max_len = max_len |
| |
|
| | |
| | |
| | |
| |
|
| | @torch.no_grad() |
| | def extract_layers(self, texts): |
| | """Get per-layer mean-pooled representations for each input.""" |
| | if isinstance(texts, str): |
| | texts = [texts] |
| |
|
| | inputs = self.tokenizer( |
| | texts, max_length=self.max_len, padding="max_length", |
| | truncation=True, return_tensors="pt").to(DEVICE) |
| |
|
| | outputs = self.model( |
| | input_ids=inputs["input_ids"], |
| | attention_mask=inputs["attention_mask"], |
| | output_hidden_states=True) |
| |
|
| | mask = inputs["attention_mask"].unsqueeze(-1).float() |
| | n_tokens = inputs["attention_mask"].sum(-1) |
| |
|
| | |
| | layer_pooled = [] |
| | for h in outputs.hidden_states: |
| | pooled = (h * mask).sum(1) / mask.sum(1).clamp(min=1) |
| | layer_pooled.append(pooled.cpu()) |
| |
|
| | return { |
| | "texts": texts, |
| | "layer_pooled": layer_pooled, |
| | "layer_raw": outputs.hidden_states, |
| | "final_embedding": outputs.last_hidden_state.cpu(), |
| | "attention_mask": inputs["attention_mask"].cpu(), |
| | "n_tokens": n_tokens.cpu(), |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | def spectral_trajectory(self, data): |
| | """ |
| | Eigenvalue spectrum at each layer for each input. |
| | Shows how the representation's internal structure evolves. |
| | """ |
| | results = [] |
| | n_layers = len(data["layer_pooled"]) |
| | B = data["layer_pooled"][0].shape[0] |
| |
|
| | for b in range(B): |
| | trajectory = [] |
| | for layer_idx in range(n_layers): |
| | |
| | |
| | h = data["layer_raw"][layer_idx][b].cpu().float() |
| | mask = data["attention_mask"][b] |
| | n_real = mask.sum().int().item() |
| | h = h[:n_real] |
| |
|
| | if n_real < 2: |
| | trajectory.append({"spectrum": [], "eff_dim": 0, "entropy": 0}) |
| | continue |
| |
|
| | |
| | h_centered = h - h.mean(0, keepdim=True) |
| | try: |
| | S = torch.linalg.svdvals(h_centered) |
| | except Exception: |
| | trajectory.append({"spectrum": [], "eff_dim": 0, "entropy": 0}) |
| | continue |
| |
|
| | |
| | S_norm = S / (S.sum() + 1e-12) |
| |
|
| | |
| | eff_dim = (S.sum() ** 2) / (S.pow(2).sum() + 1e-12) |
| |
|
| | |
| | S_pos = S_norm[S_norm > 1e-12] |
| | entropy = -(S_pos * S_pos.log()).sum() |
| |
|
| | trajectory.append({ |
| | "spectrum": S[:20].tolist(), |
| | "eff_dim": eff_dim.item(), |
| | "entropy": entropy.item(), |
| | "top1_ratio": (S[0] / (S.sum() + 1e-12)).item(), |
| | }) |
| |
|
| | results.append({ |
| | "text": data["texts"][b], |
| | "trajectory": trajectory, |
| | }) |
| |
|
| | return results |
| |
|
| | |
| | |
| | |
| |
|
| | def effective_dimensionality(self, data, k_neighbors=50): |
| | """ |
| | Local effective dimensionality around each embedding. |
| | High = rich understanding. Low = surface-level placement. |
| | """ |
| | embeddings = data["final_embedding"].float() |
| | B = embeddings.shape[0] |
| |
|
| | if B < k_neighbors + 1: |
| | k_neighbors = max(B - 1, 2) |
| |
|
| | |
| | sim = embeddings @ embeddings.T |
| | results = [] |
| |
|
| | for b in range(B): |
| | |
| | sims = sim[b].clone() |
| | sims[b] = -1 |
| | _, topk_idx = sims.topk(k_neighbors) |
| | neighbors = embeddings[topk_idx] |
| |
|
| | |
| | centered = neighbors - neighbors.mean(0, keepdim=True) |
| | try: |
| | S = torch.linalg.svdvals(centered) |
| | except Exception: |
| | results.append({"eff_dim": 0, "local_variance": 0}) |
| | continue |
| |
|
| | |
| | eff_dim = (S.sum() ** 2) / (S.pow(2).sum() + 1e-12) |
| |
|
| | |
| | S_norm = S / (S.sum() + 1e-12) |
| | decay_rate = (S_norm[:5].sum() / S_norm.sum()).item() |
| |
|
| | results.append({ |
| | "text": data["texts"][b], |
| | "eff_dim": eff_dim.item(), |
| | "decay_rate": decay_rate, |
| | "local_spread": centered.norm(dim=-1).mean().item(), |
| | }) |
| |
|
| | return results |
| |
|
| | |
| | |
| | |
| |
|
| | def cross_layer_divergence(self, data): |
| | """ |
| | How much does the representation change between layers? |
| | High change = computation happening. Low change = pass-through. |
| | """ |
| | results = [] |
| | n_layers = len(data["layer_pooled"]) |
| | B = data["layer_pooled"][0].shape[0] |
| |
|
| | for b in range(B): |
| | profile = [] |
| | for i in range(n_layers - 1): |
| | h_curr = data["layer_pooled"][i][b].float() |
| | h_next = data["layer_pooled"][i + 1][b].float() |
| |
|
| | |
| | cos = F.cosine_similarity(h_curr.unsqueeze(0), |
| | h_next.unsqueeze(0)).item() |
| | |
| | l2 = (h_next - h_curr).norm().item() |
| |
|
| | |
| | h_curr_n = F.normalize(h_curr, dim=0) |
| | h_next_n = F.normalize(h_next, dim=0) |
| | angle = torch.acos(torch.clamp( |
| | (h_curr_n * h_next_n).sum(), -1, 1)).item() |
| |
|
| | profile.append({ |
| | "layer": f"{i}β{i+1}", |
| | "cosine": cos, |
| | "l2_shift": l2, |
| | "angle_rad": angle, |
| | }) |
| |
|
| | |
| | total_path = sum(p["l2_shift"] for p in profile) |
| | |
| | max_shift_layer = max(range(len(profile)), |
| | key=lambda i: profile[i]["l2_shift"]) |
| |
|
| | results.append({ |
| | "text": data["texts"][b], |
| | "profile": profile, |
| | "total_path": total_path, |
| | "max_shift_layer": max_shift_layer, |
| | "input_output_cos": F.cosine_similarity( |
| | data["layer_pooled"][0][b].unsqueeze(0).float(), |
| | data["layer_pooled"][-1][b].unsqueeze(0).float() |
| | ).item(), |
| | }) |
| |
|
| | return results |
| |
|
| | |
| | |
| | |
| |
|
| | def token_influence(self, texts): |
| | """ |
| | Which tokens influence the output most? |
| | Uses gradient of output norm w.r.t. input embeddings. |
| | """ |
| | if isinstance(texts, str): |
| | texts = [texts] |
| |
|
| | results = [] |
| | for text in texts: |
| | inputs = self.tokenizer( |
| | [text], max_length=self.max_len, padding="max_length", |
| | truncation=True, return_tensors="pt").to(DEVICE) |
| |
|
| | |
| | input_ids = inputs["input_ids"] |
| | attention_mask = inputs["attention_mask"] |
| | n_real = attention_mask.sum().item() |
| |
|
| | |
| | emb = self.model.token_emb(input_ids) + \ |
| | self.model.pos_emb(torch.arange(input_ids.shape[1], |
| | device=DEVICE).unsqueeze(0)) |
| | emb = self.model.emb_drop(self.model.emb_norm(emb)) |
| | emb.retain_grad() |
| |
|
| | |
| | kpm = ~attention_mask.bool() |
| | x = emb |
| | for layer in self.model.encoder.layers: |
| | x = layer(x, src_key_padding_mask=kpm) |
| |
|
| | |
| | mask = attention_mask.unsqueeze(-1).float() |
| | pooled = (x * mask).sum(1) / mask.sum(1).clamp(min=1) |
| | output = F.normalize(self.model.output_proj(pooled), dim=-1) |
| |
|
| | |
| | output.sum().backward() |
| | grad = emb.grad[0].cpu() |
| |
|
| | |
| | influence = grad.norm(dim=-1)[:int(n_real)] |
| | influence = influence / (influence.sum() + 1e-12) |
| |
|
| | |
| | token_ids = input_ids[0][:int(n_real)].cpu().tolist() |
| | tokens = self.tokenizer.convert_ids_to_tokens(token_ids) |
| |
|
| | results.append({ |
| | "text": text, |
| | "tokens": tokens, |
| | "influence": influence.tolist(), |
| | "top_tokens": sorted(zip(tokens, influence.tolist()), |
| | key=lambda x: -x[1])[:10], |
| | "concentration": (influence.max() / influence.mean()).item(), |
| | }) |
| |
|
| | self.model.zero_grad() |
| |
|
| | return results |
| |
|
| | |
| | |
| | |
| |
|
| | def analyze(self, texts): |
| | """Run all analyses on a set of texts.""" |
| | if isinstance(texts, str): |
| | texts = [texts] |
| |
|
| | print(f" Analyzing {len(texts)} inputs...") |
| |
|
| | data = self.extract_layers(texts) |
| | spectral = self.spectral_trajectory(data) |
| | eff_dim = self.effective_dimensionality(data) |
| | divergence = self.cross_layer_divergence(data) |
| | influence = self.token_influence(texts) |
| |
|
| | report = {} |
| | for i, text in enumerate(texts): |
| | report[text] = { |
| | "embedding": data["final_embedding"][i], |
| | "n_tokens": data["n_tokens"][i].item(), |
| | "spectral": spectral[i], |
| | "eff_dim": eff_dim[i] if i < len(eff_dim) else {}, |
| | "divergence": divergence[i], |
| | "influence": influence[i], |
| | } |
| |
|
| | return report |
| |
|
| | |
| | |
| | |
| |
|
| | def print_report(self, report): |
| | """Print full analysis report.""" |
| | print(f"\n{'='*70}") |
| | print("INTERNAL ANALYSIS REPORT") |
| | print(f"{'='*70}") |
| |
|
| | |
| | print(f"\n {'Text':<25} {'Tokens':>6} {'EffDim':>7} {'Path':>7} " |
| | f"{'MaxShift':>9} {'InOutCos':>8} {'Concentrate':>11}") |
| | print(f" {'-'*75}") |
| |
|
| | for text, r in report.items(): |
| | label = text[:24] |
| | ed = r["eff_dim"].get("eff_dim", 0) |
| | tp = r["divergence"]["total_path"] |
| | ms = r["divergence"]["max_shift_layer"] |
| | ioc = r["divergence"]["input_output_cos"] |
| | conc = r["influence"]["concentration"] |
| | print(f" {label:<25} {r['n_tokens']:>6} {ed:>7.1f} {tp:>7.2f} " |
| | f" layer {ms:>2} {ioc:>7.3f} {conc:>10.1f}") |
| |
|
| | |
| | print(f"\n SPECTRAL TRAJECTORY (effective dim per layer):") |
| | print(f" {'Text':<25}", end="") |
| | n_layers = len(next(iter(report.values()))["spectral"]["trajectory"]) |
| | for i in range(n_layers): |
| | print(f" L{i:>2}", end="") |
| | print() |
| | print(f" {'-'*75}") |
| |
|
| | for text, r in report.items(): |
| | label = text[:24] |
| | print(f" {label:<25}", end="") |
| | for step in r["spectral"]["trajectory"]: |
| | ed = step.get("eff_dim", 0) |
| | print(f" {ed:>4.0f}", end="") |
| | print() |
| |
|
| | |
| | print(f"\n SPECTRAL ENTROPY (information content per layer):") |
| | print(f" {'Text':<25}", end="") |
| | for i in range(n_layers): |
| | print(f" L{i:>2}", end="") |
| | print() |
| | print(f" {'-'*75}") |
| |
|
| | for text, r in report.items(): |
| | label = text[:24] |
| | print(f" {label:<25}", end="") |
| | for step in r["spectral"]["trajectory"]: |
| | ent = step.get("entropy", 0) |
| | print(f" {ent:>4.1f}", end="") |
| | print() |
| |
|
| | |
| | print(f"\n COMPUTATION PROFILE (L2 shift between layers):") |
| | print(f" {'Text':<25}", end="") |
| | for i in range(n_layers - 1): |
| | print(f" {i}β{i+1:>2}", end="") |
| | print() |
| | print(f" {'-'*75}") |
| |
|
| | for text, r in report.items(): |
| | label = text[:24] |
| | print(f" {label:<25}", end="") |
| | for step in r["divergence"]["profile"]: |
| | print(f" {step['l2_shift']:>4.1f}", end="") |
| | print() |
| |
|
| | |
| | print(f"\n TOKEN INFLUENCE (top contributing tokens):") |
| | for text, r in report.items(): |
| | top = r["influence"]["top_tokens"][:5] |
| | tok_str = " ".join(f"{t}={v:.3f}" for t, v in top) |
| | print(f" {text[:40]:<42} {tok_str}") |
| |
|
| | def compare(self, report, text_a, text_b): |
| | """Compare internal representations of two specific inputs.""" |
| | a = report[text_a] |
| | b = report[text_b] |
| |
|
| | cos = F.cosine_similarity( |
| | a["embedding"].unsqueeze(0), |
| | b["embedding"].unsqueeze(0)).item() |
| |
|
| | print(f"\n{'='*70}") |
| | print(f"COMPARISON: '{text_a}' vs '{text_b}'") |
| | print(f"{'='*70}") |
| | print(f" Output cosine: {cos:.4f}") |
| | print(f" Tokens: {a['n_tokens']} vs {b['n_tokens']}") |
| |
|
| | |
| | ed_a = a["eff_dim"].get("eff_dim", 0) |
| | ed_b = b["eff_dim"].get("eff_dim", 0) |
| | print(f" Effective dim: {ed_a:.1f} vs {ed_b:.1f} (Ξ={abs(ed_a-ed_b):.1f})") |
| |
|
| | |
| | pa = a["divergence"]["total_path"] |
| | pb = b["divergence"]["total_path"] |
| | print(f" Total path: {pa:.2f} vs {pb:.2f} (Ξ={abs(pa-pb):.2f})") |
| |
|
| | |
| | print(f"\n Effective dim trajectory:") |
| | print(f" {'Layer':<8} {'A':>8} {'B':>8} {'Ξ':>8}") |
| | traj_a = a["spectral"]["trajectory"] |
| | traj_b = b["spectral"]["trajectory"] |
| | for i in range(len(traj_a)): |
| | ea = traj_a[i].get("eff_dim", 0) |
| | eb = traj_b[i].get("eff_dim", 0) |
| | print(f" L{i:<6} {ea:>8.1f} {eb:>8.1f} {abs(ea-eb):>8.1f}") |
| |
|
| | |
| | print(f"\n Computation profile (L2 shift):") |
| | print(f" {'Transition':<10} {'A':>8} {'B':>8} {'Ξ':>8}") |
| | for i in range(len(a["divergence"]["profile"])): |
| | sa = a["divergence"]["profile"][i]["l2_shift"] |
| | sb = b["divergence"]["profile"][i]["l2_shift"] |
| | label = a["divergence"]["profile"][i]["layer"] |
| | print(f" {label:<10} {sa:>8.2f} {sb:>8.2f} {abs(sa-sb):>8.2f}") |
| |
|
| | |
| | print(f"\n Top tokens:") |
| | print(f" A: {' '.join(f'{t}={v:.3f}' for t,v in a['influence']['top_tokens'][:5])}") |
| | print(f" B: {' '.join(f'{t}={v:.3f}' for t,v in b['influence']['top_tokens'][:5])}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | from transformers import AutoModel, AutoTokenizer |
| |
|
| | REPO_ID = "AbstractPhil/geolip-captionbert-8192" |
| | print("Loading model...") |
| | model = AutoModel.from_pretrained(REPO_ID, trust_remote_code=True) |
| | tokenizer = AutoTokenizer.from_pretrained(REPO_ID) |
| |
|
| | analyzer = InternalAnalyzer(model, tokenizer) |
| |
|
| | |
| | test_words = [ |
| | |
| | "girl", |
| | "woman", |
| | "dog", |
| | "sunset", |
| | "painting", |
| | |
| | "subtraction", |
| | "multiplication", |
| | "prophetic", |
| | "differential", |
| | "adjacency", |
| | |
| | "a girl sitting near a window", |
| | "a dog playing on the beach", |
| | "the differential equation of motion", |
| | ] |
| |
|
| | report = analyzer.analyze(test_words) |
| | analyzer.print_report(report) |
| |
|
| | |
| | analyzer.compare(report, "girl", "woman") |
| | analyzer.compare(report, "girl", "subtraction") |
| | analyzer.compare(report, "a girl sitting near a window", |
| | "the differential equation of motion") |
| |
|
| | print(f"\n{'='*70}") |
| | print("DONE") |
| | print(f"{'='*70}") |