| | """Convert spectral analysis JSON results to CSV tables for analysis."""
|
| | import json
|
| | import csv
|
| | import sys
|
| | import os
|
| | import re
|
| | from pathlib import Path
|
| |
|
| |
|
| | def classify_layer(name, model_type):
|
| | """Classify a weight matrix by layer index, component type, and phase."""
|
| | if model_type == "prisma":
|
| |
|
| | m = re.match(r'mirror_blocks\.(\d+)\.', name)
|
| | if m:
|
| | layer_idx = int(m.group(1))
|
| | phase = "mirror"
|
| | if 'attn' in name:
|
| | comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
|
| | elif 'ffn.w3' in name or 'gate_expand' in name:
|
| | comp = 'W3'
|
| | elif 'ffn.w4' in name or 'gate_compress' in name:
|
| | comp = 'W4'
|
| | elif 'ffn.w1' in name:
|
| | comp = 'W1'
|
| | elif 'w2' in name:
|
| | comp = 'W2'
|
| | else:
|
| | comp = 'other'
|
| | return layer_idx, comp, phase
|
| |
|
| | m = re.match(r'middle_blocks\.(\d+)\.', name)
|
| | if m:
|
| | layer_idx = int(m.group(1))
|
| | phase = "middle"
|
| | if 'attn' in name:
|
| | comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
|
| | elif 'gate' in name:
|
| | comp = 'W3'
|
| | elif 'ffn.w1' in name:
|
| | comp = 'W1'
|
| | elif 'ffn.w2' in name:
|
| | comp = 'W2'
|
| | else:
|
| | comp = 'other'
|
| | return layer_idx, comp, phase
|
| |
|
| | m = re.match(r'(first|last)_block\.', name)
|
| | if m:
|
| | phase = m.group(1)
|
| | if 'attn' in name:
|
| | comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn'
|
| | elif 'ffn.w3' in name or 'gate' in name:
|
| | comp = 'W3'
|
| | elif 'ffn.w4' in name:
|
| | comp = 'W4'
|
| | elif 'ffn.w1' in name:
|
| | comp = 'W1'
|
| | elif 'ffn.w2' in name:
|
| | comp = 'W2'
|
| | else:
|
| | comp = 'other'
|
| | return 0, comp, phase
|
| |
|
| | if 'embed' in name:
|
| | return -1, 'embed', 'embed'
|
| | if 'head' in name or 'lm_head' in name:
|
| | return 99, 'head', 'head'
|
| | return -1, 'other', 'other'
|
| |
|
| | else:
|
| | m = re.match(r'transformer\.h\.(\d+)\.', name)
|
| | if m:
|
| | layer_idx = int(m.group(1))
|
| | if 'c_attn' in name:
|
| | comp = 'QKV'
|
| | elif 'c_proj' in name and 'mlp' not in name:
|
| | comp = 'O'
|
| | elif 'c_fc' in name:
|
| | comp = 'W1'
|
| | elif 'mlp.c_proj' in name:
|
| | comp = 'W2'
|
| | else:
|
| | comp = 'other'
|
| | return layer_idx, comp, "layer"
|
| |
|
| | if 'wte' in name:
|
| | return -1, 'embed', 'embed'
|
| | if 'wpe' in name:
|
| | return -1, 'pos_embed', 'embed'
|
| | return -1, 'other', 'other'
|
| |
|
| |
|
| | def json_to_csvs(json_path, output_dir, model_type="prisma"):
|
| | with open(json_path) as f:
|
| | data = json.load(f)
|
| |
|
| | os.makedirs(output_dir, exist_ok=True)
|
| |
|
| |
|
| | rows = []
|
| | for name, info in data.items():
|
| | if 'activation' in name or name.startswith('_'):
|
| | continue
|
| | layer_idx, comp, phase = classify_layer(name, model_type)
|
| | rows.append({
|
| | 'name': name,
|
| | 'layer_idx': layer_idx,
|
| | 'component': comp,
|
| | 'phase': phase,
|
| | 'shape': 'x'.join(str(s) for s in info['shape']),
|
| | 'effective_rank': round(info['effective_rank'], 2),
|
| | 'stable_rank': round(info['stable_rank'], 3),
|
| | 'spectral_norm': round(info['spectral_norm'], 4),
|
| | 'frobenius_norm': round(info['frobenius_norm'], 4),
|
| | 'alpha': round(info['alpha'], 4),
|
| | 'alpha_r2': round(info['alpha_r2'], 4),
|
| | 'signal_ratio': round(info['signal_ratio'], 4),
|
| | 'condition_number': round(info['condition_number'], 2),
|
| | 'mp_bound': round(info['mp_bound'], 4),
|
| | 'n_above_mp': info['n_above_mp'],
|
| | 'n_total': info['n_total'],
|
| | 'sv_1': round(info['top_10_sv'][0], 4) if info['top_10_sv'] else 0,
|
| | 'sv_2': round(info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 else 0,
|
| | 'sv_10': round(info['top_10_sv'][9], 4) if len(info['top_10_sv']) > 9 else 0,
|
| | 'sv1_sv2_ratio': round(info['top_10_sv'][0] / info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 and info['top_10_sv'][1] > 0 else 0,
|
| | })
|
| |
|
| | with open(os.path.join(output_dir, 'weights_full.csv'), 'w', newline='') as f:
|
| | w = csv.DictWriter(f, fieldnames=rows[0].keys())
|
| | w.writeheader()
|
| | w.writerows(sorted(rows, key=lambda r: (r['phase'], r['layer_idx'], r['component'])))
|
| |
|
| |
|
| | ffn_rows = [r for r in rows if r['component'] == 'W1']
|
| | with open(os.path.join(output_dir, 'ffn_w1_progression.csv'), 'w', newline='') as f:
|
| | w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'condition_number', 'sv1_sv2_ratio'])
|
| | w.writeheader()
|
| | for r in sorted(ffn_rows, key=lambda r: (r['phase'], r['layer_idx'])):
|
| | w.writerow({k: r[k] for k in w.fieldnames})
|
| |
|
| |
|
| | gate_rows = [r for r in rows if r['component'] in ('W3', 'W4') and r['phase'] == 'mirror']
|
| | with open(os.path.join(output_dir, 'gate_comparison.csv'), 'w', newline='') as f:
|
| | w = csv.DictWriter(f, fieldnames=['layer_idx', 'component', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'sv1_sv2_ratio'])
|
| | w.writeheader()
|
| | for r in sorted(gate_rows, key=lambda r: (r['layer_idx'], r['component'])):
|
| | w.writerow({k: r[k] for k in w.fieldnames})
|
| |
|
| |
|
| | attn_rows = [r for r in rows if r['component'] in ('Q', 'K', 'V', 'O', 'QKV')]
|
| | with open(os.path.join(output_dir, 'attention_progression.csv'), 'w', newline='') as f:
|
| | w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'component', 'effective_rank', 'stable_rank', 'alpha', 'signal_ratio', 'condition_number'])
|
| | w.writeheader()
|
| | for r in sorted(attn_rows, key=lambda r: (r['phase'], r['layer_idx'], r['component'])):
|
| | w.writerow({k: r[k] for k in w.fieldnames})
|
| |
|
| |
|
| | alphas = [r['alpha'] for r in rows if r['alpha'] > 0]
|
| | eff_ranks = [r['effective_rank'] for r in rows if r['layer_idx'] >= 0]
|
| | signal_ratios = [r['signal_ratio'] for r in rows if r['layer_idx'] >= 0]
|
| |
|
| | summary = {
|
| | 'n_matrices': len(rows),
|
| | 'mean_alpha': round(sum(alphas) / len(alphas), 4) if alphas else 0,
|
| | 'min_alpha': round(min(alphas), 4) if alphas else 0,
|
| | 'max_alpha': round(max(alphas), 4) if alphas else 0,
|
| | 'mean_effective_rank': round(sum(eff_ranks) / len(eff_ranks), 2) if eff_ranks else 0,
|
| | 'mean_signal_ratio': round(sum(signal_ratios) / len(signal_ratios), 4) if signal_ratios else 0,
|
| | 'n_well_trained (alpha<2)': sum(1 for a in alphas if a < 2.0),
|
| | 'n_total_alpha': len(alphas),
|
| | }
|
| | with open(os.path.join(output_dir, 'summary.csv'), 'w', newline='') as f:
|
| | w = csv.DictWriter(f, fieldnames=summary.keys())
|
| | w.writeheader()
|
| | w.writerow(summary)
|
| |
|
| | print(f"Wrote CSVs to {output_dir}/")
|
| | print(f" weights_full.csv ({len(rows)} matrices)")
|
| | print(f" ffn_w1_progression.csv ({len(ffn_rows)} layers)")
|
| | print(f" gate_comparison.csv ({len(gate_rows)} entries)")
|
| | print(f" attention_progression.csv ({len(attn_rows)} entries)")
|
| | print(f" summary.csv")
|
| |
|
| |
|
| | if __name__ == '__main__':
|
| | base = "circuits/scripts/spectral_output/mirrored_300M_mk4_cont"
|
| |
|
| |
|
| | json_to_csvs(
|
| | f"{base}/results.json",
|
| | f"{base}/csv_prisma",
|
| | model_type="prisma"
|
| | )
|
| |
|
| |
|
| | if os.path.exists(f"{base}/results_b.json"):
|
| | json_to_csvs(
|
| | f"{base}/results_b.json",
|
| | f"{base}/csv_gpt2",
|
| | model_type="gpt2"
|
| | )
|
| |
|