File size: 2,890 Bytes
8124a18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
from collections import defaultdict
from typing import Dict, Optional
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from . import nethook
class LogitLens:
"""
Applies the LM head at the output of each hidden layer, then analyzes the
resultant token probability distribution.
Only works when hooking outputs of *one* individual generation.
Inspiration: https://www.lesswrong.com/posts/AcKRB8wDpdaN6v6ru/interpreting-gpt-the-logit-lens
Warning: when running multiple times (e.g. generation), will return
outputs _only_ for the last processing step.
"""
def __init__(
self,
model: AutoModelForCausalLM,
tok: AutoTokenizer,
layer_module_tmp: str,
ln_f_module: str,
lm_head_module: str,
disabled: bool = False,
):
self.disabled = disabled
self.model, self.tok = model, tok
self.n_layers = self.model.config.n_layer
self.lm_head, self.ln_f = (
nethook.get_module(model, lm_head_module),
nethook.get_module(model, ln_f_module),
)
self.output: Optional[Dict] = None
self.td: Optional[nethook.TraceDict] = None
self.trace_layers = [
layer_module_tmp.format(layer) for layer in range(self.n_layers)
]
def __enter__(self):
if not self.disabled:
self.td = nethook.TraceDict(
self.model,
self.trace_layers,
retain_input=False,
retain_output=True,
)
self.td.__enter__()
def __exit__(self, *args):
if self.disabled:
return
self.td.__exit__(*args)
self.output = {layer: [] for layer in range(self.n_layers)}
with torch.no_grad():
for layer, (_, t) in enumerate(self.td.items()):
cur_out = t.output[0]
assert (
cur_out.size(0) == 1
), "Make sure you're only running LogitLens on single generations only."
self.output[layer] = torch.softmax(
self.lm_head(self.ln_f(cur_out[:, -1, :])), dim=1
)
return self.output
def pprint(self, k=5):
to_print = defaultdict(list)
for layer, pred in self.output.items():
rets = torch.topk(pred[0], k)
for i in range(k):
to_print[layer].append(
(
self.tok.decode(rets[1][i]),
round(rets[0][i].item() * 1e2) / 1e2,
)
)
print(
"\n".join(
[
f"{layer}: {[(el[0], round(el[1] * 1e2)) for el in to_print[layer]]}"
for layer in range(self.n_layers)
]
)
)
|