Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| from transformer_lens import HookedTransformer, utils | |
| from io import StringIO | |
| import sys | |
| import torch | |
| from functools import partial | |
| import plotly.offline as pyo | |
| import plotly.graph_objs as go | |
| import numpy as np | |
| import plotly.express as px | |
| import circuitsvis as cv | |
| # Little bit of front end for model selector | |
| # Radio buttons | |
| model_name = st.sidebar.radio("Model (only use patching for\nsmall (<4L) models due to memory limits)", [ | |
| "gelu-1l", | |
| "gelu-2l", | |
| "gelu-3l", | |
| "gelu-4l", | |
| "attn-only-1l", | |
| "attn-only-2l", | |
| "attn-only-3l", | |
| "attn-only-4l", | |
| "solu-1l", | |
| "solu-2l", | |
| "solu-3l", | |
| "solu-4l", | |
| "solu-6l", | |
| "solu-8l", | |
| "solu-10l", | |
| "solu-12l", | |
| "gpt2-small", | |
| "gpt2-medium", | |
| #"gpt2-large", | |
| #"gpt2-xl", | |
| ], index=1) | |
| # Backend code | |
| model = HookedTransformer.from_pretrained(model_name) | |
| def predict_next_token(prompt): | |
| logits = model(prompt)[0,-1] | |
| answer_index = logits.argmax() | |
| answer = model.tokenizer.decode(answer_index) | |
| answer = f"<b>|{answer}|</b> (answer by {model.cfg.model_name})" | |
| return answer | |
| def test_prompt(prompt, answer): | |
| output = StringIO() | |
| sys.stdout = output | |
| utils.test_prompt(prompt, answer, model) | |
| output = output.getvalue() | |
| return output | |
| def compute_residual_stream_patch(clean_prompt=None, answer=None, corrupt_prompt=None, corrupt_answer=None, layers=None): | |
| model.reset_hooks() | |
| clean_answer_index = model.tokenizer.encode(answer)[0] | |
| corrupt_answer_index = model.tokenizer.encode(corrupt_answer)[0] | |
| clean_tokens = model.to_str_tokens(clean_prompt) | |
| _, corrupt_cache = model.run_with_cache(corrupt_prompt) | |
| # Patching function | |
| def patch_residual_stream(activations, hook, layer="blocks.6.hook_resid_post", pos=5): | |
| activations[:, pos, :] = corrupt_cache[layer][:, pos, :] | |
| return activations | |
| # Compute logit diffs | |
| n_layers = len(layers) | |
| n_pos = len(clean_tokens) | |
| patching_effect = torch.zeros(n_layers, n_pos) | |
| for l, layer in enumerate(layers): | |
| for pos in range(n_pos): | |
| fwd_hooks = [(layer, partial(patch_residual_stream, layer=layer, pos=pos))] | |
| prediction_logits = model.run_with_hooks(clean_prompt, fwd_hooks=fwd_hooks)[0, -1] | |
| patching_effect[l, pos] = prediction_logits[clean_answer_index] - prediction_logits[corrupt_answer_index] | |
| return patching_effect | |
| def compute_attn_patch(clean_prompt=None, answer=None, corrupt_prompt=None, corrupt_answer=None): | |
| use_attn_result_prev = model.cfg.use_attn_result | |
| model.cfg.use_attn_result = True | |
| clean_answer_index = model.tokenizer.encode(answer)[0] | |
| corrupt_answer_index = model.tokenizer.encode(corrupt_answer)[0] | |
| clean_tokens = model.to_str_tokens(clean_prompt) | |
| _, corrupt_cache = model.run_with_cache(corrupt_prompt) | |
| # Patching function | |
| def patch_head_result(activations, hook, head=None, pos=None): | |
| activations[:, pos, head, :] = corrupt_cache[hook.name][:, pos, head, :] | |
| return activations | |
| n_layers = model.cfg.n_layers | |
| n_heads = model.cfg.n_heads | |
| n_pos = len(clean_tokens) | |
| patching_effect = torch.zeros(n_layers*n_heads, n_pos) | |
| for layer in range(n_layers): | |
| for head in range(n_heads): | |
| for pos in range(n_pos): | |
| fwd_hooks = [(f"blocks.{layer}.attn.hook_result", partial(patch_head_result, head=head, pos=pos))] | |
| prediction_logits = model.run_with_hooks(clean_prompt, fwd_hooks=fwd_hooks)[0, -1] | |
| patching_effect[n_heads*layer+head, pos] = prediction_logits[clean_answer_index] - prediction_logits[corrupt_answer_index] | |
| model.cfg.use_attn_result = use_attn_result_prev | |
| return patching_effect | |
| def imshow(tensor, xlabel="X", ylabel="Y", zlabel=None, xticks=None, yticks=None, c_midpoint=0.0, c_scale="RdBu", **kwargs): | |
| tensor = utils.to_numpy(tensor) | |
| xticks = [str(x) for x in xticks] | |
| yticks = [str(y) for y in yticks] | |
| labels = {"x": xlabel, "y": ylabel} | |
| if zlabel is not None: | |
| labels["color"] = zlabel | |
| fig = px.imshow(tensor, x=xticks, y=yticks, labels=labels, color_continuous_midpoint=c_midpoint, | |
| color_continuous_scale=c_scale, **kwargs) | |
| return fig | |
| def plot_residual_stream_patch(clean_prompt=None, answer=None, corrupt_prompt=None, corrupt_answer=None): | |
| layers = ["blocks.0.hook_resid_pre", *[f"blocks.{i}.hook_resid_post" for i in range(model.cfg.n_layers)]] | |
| clean_tokens = model.to_str_tokens(clean_prompt) | |
| token_labels = [f"(pos {i:2}) {t}" for i, t in enumerate(clean_tokens)] | |
| patching_effect = compute_residual_stream_patch(clean_prompt=clean_prompt, answer=answer, corrupt_prompt=corrupt_prompt, corrupt_answer=corrupt_answer, layers=layers) | |
| fig = imshow(patching_effect, xticks=token_labels, yticks=layers, xlabel="Position", ylabel="Layer", | |
| zlabel="Logit Difference", title="Patching residual stream at specific layer and position") | |
| return fig | |
| def plot_attn_patch(clean_prompt=None, answer=None, corrupt_prompt=None, corrupt_answer=None): | |
| clean_tokens = model.to_str_tokens(clean_prompt) | |
| n_layers = model.cfg.n_layers | |
| n_heads = model.cfg.n_heads | |
| layerhead_labels = [f"{l}.{h}" for l in range(n_layers) for h in range(n_heads)] | |
| token_labels = [f"(pos {i:2}) {t}" for i, t in enumerate(clean_tokens)] | |
| patching_effect = compute_attn_patch(clean_prompt=clean_prompt, answer=answer, corrupt_prompt=corrupt_prompt, corrupt_answer=corrupt_answer) | |
| return imshow(patching_effect, xticks=token_labels, yticks=layerhead_labels, xlabel="Position", ylabel="Layer.Head", | |
| zlabel="Logit Difference", title=f"Patching attention outputs for specific layer, head, and position", width=600, height=300+200*n_layers) | |
| # Frontend code | |
| st.title("Simple Trafo Mech Int") | |
| st.subheader("Transformer Mechanistic Interpretability") | |
| st.markdown("Powered by [TransformerLens](https://github.com/neelnanda-io/TransformerLens/)") | |
| st.markdown("For _what_ these plots are, and _why_, see this [tutorial](https://docs.google.com/document/d/1e6cs8d9QNretWvOLsv_KaMp6kSPWpJEW0GWc0nwjqxo/).") | |
| # Predict next token | |
| st.header("Predict the next token") | |
| st.markdown("Just a simple test UI, enter a prompt and the model will predict the next token") | |
| prompt_simple = st.text_input("Prompt:", "Today, the weather is", key="prompt_simple") | |
| if "prompt_simple_output" not in st.session_state: | |
| st.session_state.prompt_simple_output = None | |
| if st.button("Run model", key="key_button_prompt_simple"): | |
| res = predict_next_token(prompt_simple) | |
| st.session_state.prompt_simple_output = res | |
| if st.session_state.prompt_simple_output: | |
| st.markdown(st.session_state.prompt_simple_output, unsafe_allow_html=True) | |
| # Test prompt | |
| st.header("Verbose test prompt") | |
| st.markdown("Enter a prompt and the correct answer, the model will run the prompt and print the results") | |
| prompt = st.text_input("Prompt:", "The most popular programming language is", key="prompt") | |
| answer = st.text_input("Answer:", " Java", key="answer") | |
| if "test_prompt_output" not in st.session_state: | |
| st.session_state.test_prompt_output = None | |
| if st.button("Run model", key="key_button_test_prompt"): | |
| res = test_prompt(prompt, answer) | |
| st.session_state.test_prompt_output = res | |
| if st.session_state.test_prompt_output: | |
| st.code(st.session_state.test_prompt_output) | |
| # Residual stream patching | |
| st.header("Residual stream patching") | |
| st.markdown("Enter a clean prompt, correct answer, corrupt prompt and corrupt answer, the model will compute the patching effect") | |
| default_clean_prompt = "Her name was Alex Hart. Tomorrow at lunch time Alex" | |
| default_clean_answer = "Hart" | |
| default_corrupt_prompt = "Her name was Alex Carroll. Tomorrow at lunch time Alex" | |
| default_corrupt_answer = "Carroll" | |
| clean_prompt = st.text_input("Clean Prompt:", default_clean_prompt) | |
| clean_answer = st.text_input("Correct Answer:", default_clean_answer) | |
| corrupt_prompt = st.text_input("Corrupt Prompt:", default_corrupt_prompt) | |
| corrupt_answer = st.text_input("Corrupt Answer:", default_corrupt_answer) | |
| if "residual_stream_patch_out" not in st.session_state: | |
| st.session_state.residual_stream_patch_out = None | |
| if st.button("Run model", key="key_button_residual_stream_patch"): | |
| fig = plot_residual_stream_patch(clean_prompt=clean_prompt, answer=clean_answer, corrupt_prompt=corrupt_prompt, corrupt_answer=corrupt_answer) | |
| st.session_state.residual_stream_patch_out = fig | |
| if st.session_state.residual_stream_patch_out: | |
| st.plotly_chart(st.session_state.residual_stream_patch_out) | |
| # Attention head output | |
| st.header("Attention head output patching") | |
| st.markdown("Enter a clean prompt, correct answer, corrupt prompt and corrupt answer, the model will compute the patching effect") | |
| clean_prompt_attn = st.text_input("Clean Prompt:", default_clean_prompt, key="key2_clean_prompt_attn") | |
| clean_answer_attn = st.text_input("Correct Answer:", default_clean_answer, key="key2_clean_answer_attn") | |
| corrupt_prompt_attn = st.text_input("Corrupt Prompt:", default_corrupt_prompt, key="key2_corrupt_prompt_attn") | |
| corrupt_answer_attn = st.text_input("Corrupt Answer:", default_corrupt_answer, key="key2_corrupt_answer_attn") | |
| if "attn_head_patch_out" not in st.session_state: | |
| st.session_state.attn_head_patch_out = None | |
| if st.button("Run model", key="key_button_attn_head_patch"): | |
| fig = plot_attn_patch(clean_prompt=clean_prompt_attn, answer=clean_answer_attn, corrupt_prompt=corrupt_prompt_attn, corrupt_answer=corrupt_answer_attn) | |
| st.session_state.attn_head_patch_out = fig | |
| if st.session_state.attn_head_patch_out: | |
| st.plotly_chart(st.session_state.attn_head_patch_out) | |
| # Attention Head Visualization | |
| st.header("Attention Pattern Visualization") | |
| st.markdown("Powered by [CircuitsVis](https://github.com/alan-cooney/CircuitsVis)") | |
| st.markdown("Enter a prompt, show attention patterns") | |
| default_prompt_attn = "Her name was Alex Hart. Tomorrow at lunch time Alex" | |
| prompt_attn = st.text_input("Prompt:", default_prompt_attn) | |
| if "attn_html" not in st.session_state: | |
| st.session_state.attn_html = None | |
| if st.button("Run model", key="key_button_attention_head"): | |
| _, cache = model.run_with_cache(prompt_attn) | |
| st.session_state.attn_html = [] | |
| for layer in range(model.cfg.n_layers): | |
| html = cv.attention.attention_patterns(tokens=model.to_str_tokens(prompt_attn), | |
| attention=cache[f'blocks.{layer}.attn.hook_pattern'][0]) | |
| st.session_state.attn_html.append(html.show_code()) | |
| if st.session_state.attn_html: | |
| for layer in range(len(st.session_state.attn_html)): | |
| st.write(f"Attention patterns Layer {layer}:") | |
| st.components.v1.html(st.session_state.attn_html[layer], height=500) | |