Prisma / scripts /spectral_to_csv.py
y3i12's picture
Initial commit
56e82ec
"""Convert spectral analysis JSON results to CSV tables for analysis."""
import json
import csv
import sys
import os
import re
from pathlib import Path
def classify_layer(name, model_type):
"""Classify a weight matrix by layer index, component type, and phase."""
if model_type == "prisma":
# mirror_blocks.N.component
m = re.match(r'mirror_blocks\.(\d+)\.', name)
if m:
layer_idx = int(m.group(1))
phase = "mirror"
if 'attn' in name:
comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
elif 'ffn.w3' in name or 'gate_expand' in name:
comp = 'W3'
elif 'ffn.w4' in name or 'gate_compress' in name:
comp = 'W4'
elif 'ffn.w1' in name:
comp = 'W1'
elif 'w2' in name:
comp = 'W2'
else:
comp = 'other'
return layer_idx, comp, phase
m = re.match(r'middle_blocks\.(\d+)\.', name)
if m:
layer_idx = int(m.group(1))
phase = "middle"
if 'attn' in name:
comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
elif 'gate' in name:
comp = 'W3'
elif 'ffn.w1' in name:
comp = 'W1'
elif 'ffn.w2' in name:
comp = 'W2'
else:
comp = 'other'
return layer_idx, comp, phase
m = re.match(r'(first|last)_block\.', name)
if m:
phase = m.group(1)
if 'attn' in name:
comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
elif 'ffn.w3' in name or 'gate' in name:
comp = 'W3'
elif 'ffn.w4' in name:
comp = 'W4'
elif 'ffn.w1' in name:
comp = 'W1'
elif 'ffn.w2' in name:
comp = 'W2'
else:
comp = 'other'
return 0, comp, phase
if 'embed' in name:
return -1, 'embed', 'embed'
if 'head' in name or 'lm_head' in name:
return 99, 'head', 'head'
return -1, 'other', 'other'
else: # GPT-2 style
m = re.match(r'transformer\.h\.(\d+)\.', name)
if m:
layer_idx = int(m.group(1))
if 'c_attn' in name:
comp = 'QKV'
elif 'c_proj' in name and 'mlp' not in name:
comp = 'O'
elif 'c_fc' in name:
comp = 'W1'
elif 'mlp.c_proj' in name:
comp = 'W2'
else:
comp = 'other'
return layer_idx, comp, "layer"
if 'wte' in name:
return -1, 'embed', 'embed'
if 'wpe' in name:
return -1, 'pos_embed', 'embed'
return -1, 'other', 'other'
def json_to_csvs(json_path, output_dir, model_type="prisma"):
with open(json_path) as f:
data = json.load(f)
os.makedirs(output_dir, exist_ok=True)
# 1. Full weight matrix summary
rows = []
for name, info in data.items():
if 'activation' in name or name.startswith('_'):
continue
layer_idx, comp, phase = classify_layer(name, model_type)
rows.append({
'name': name,
'layer_idx': layer_idx,
'component': comp,
'phase': phase,
'shape': 'x'.join(str(s) for s in info['shape']),
'effective_rank': round(info['effective_rank'], 2),
'stable_rank': round(info['stable_rank'], 3),
'spectral_norm': round(info['spectral_norm'], 4),
'frobenius_norm': round(info['frobenius_norm'], 4),
'alpha': round(info['alpha'], 4),
'alpha_r2': round(info['alpha_r2'], 4),
'signal_ratio': round(info['signal_ratio'], 4),
'condition_number': round(info['condition_number'], 2),
'mp_bound': round(info['mp_bound'], 4),
'n_above_mp': info['n_above_mp'],
'n_total': info['n_total'],
'sv_1': round(info['top_10_sv'][0], 4) if info['top_10_sv'] else 0,
'sv_2': round(info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 else 0,
'sv_10': round(info['top_10_sv'][9], 4) if len(info['top_10_sv']) > 9 else 0,
'sv1_sv2_ratio': round(info['top_10_sv'][0] / info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 and info['top_10_sv'][1] > 0 else 0,
})
with open(os.path.join(output_dir, 'weights_full.csv'), 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=rows[0].keys())
w.writeheader()
w.writerows(sorted(rows, key=lambda r: (r['phase'], r['layer_idx'], r['component'])))
# 2. Layer-level FFN summary (W1 progression = the lens)
ffn_rows = [r for r in rows if r['component'] == 'W1']
with open(os.path.join(output_dir, 'ffn_w1_progression.csv'), 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'condition_number', 'sv1_sv2_ratio'])
w.writeheader()
for r in sorted(ffn_rows, key=lambda r: (r['phase'], r['layer_idx'])):
w.writerow({k: r[k] for k in w.fieldnames})
# 3. Gate comparison (W3 vs W4)
gate_rows = [r for r in rows if r['component'] in ('W3', 'W4') and r['phase'] == 'mirror']
with open(os.path.join(output_dir, 'gate_comparison.csv'), 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=['layer_idx', 'component', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'sv1_sv2_ratio'])
w.writeheader()
for r in sorted(gate_rows, key=lambda r: (r['layer_idx'], r['component'])):
w.writerow({k: r[k] for k in w.fieldnames})
# 4. Attention head comparison (Q, K, V, O per layer)
attn_rows = [r for r in rows if r['component'] in ('Q', 'K', 'V', 'O', 'QKV')]
with open(os.path.join(output_dir, 'attention_progression.csv'), 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'component', 'effective_rank', 'stable_rank', 'alpha', 'signal_ratio', 'condition_number'])
w.writeheader()
for r in sorted(attn_rows, key=lambda r: (r['phase'], r['layer_idx'], r['component'])):
w.writerow({k: r[k] for k in w.fieldnames})
# 5. Summary statistics
alphas = [r['alpha'] for r in rows if r['alpha'] > 0]
eff_ranks = [r['effective_rank'] for r in rows if r['layer_idx'] >= 0]
signal_ratios = [r['signal_ratio'] for r in rows if r['layer_idx'] >= 0]
summary = {
'n_matrices': len(rows),
'mean_alpha': round(sum(alphas) / len(alphas), 4) if alphas else 0,
'min_alpha': round(min(alphas), 4) if alphas else 0,
'max_alpha': round(max(alphas), 4) if alphas else 0,
'mean_effective_rank': round(sum(eff_ranks) / len(eff_ranks), 2) if eff_ranks else 0,
'mean_signal_ratio': round(sum(signal_ratios) / len(signal_ratios), 4) if signal_ratios else 0,
'n_well_trained (alpha<2)': sum(1 for a in alphas if a < 2.0),
'n_total_alpha': len(alphas),
}
with open(os.path.join(output_dir, 'summary.csv'), 'w', newline='') as f:
w = csv.DictWriter(f, fieldnames=summary.keys())
w.writeheader()
w.writerow(summary)
print(f"Wrote CSVs to {output_dir}/")
print(f" weights_full.csv ({len(rows)} matrices)")
print(f" ffn_w1_progression.csv ({len(ffn_rows)} layers)")
print(f" gate_comparison.csv ({len(gate_rows)} entries)")
print(f" attention_progression.csv ({len(attn_rows)} entries)")
print(f" summary.csv")
if __name__ == '__main__':
base = "circuits/scripts/spectral_output/mirrored_300M_mk4_cont"
# Prisma
json_to_csvs(
f"{base}/results.json",
f"{base}/csv_prisma",
model_type="prisma"
)
# GPT-2 medium
if os.path.exists(f"{base}/results_b.json"):
json_to_csvs(
f"{base}/results_b.json",
f"{base}/csv_gpt2",
model_type="gpt2"
)