| """ |
| Test the trained filename anomaly detector. |
| Usage: python test_model.py |
| """ |
| import json, math, os |
|
|
| |
| MODEL_PATH = 'model.json' |
| assert os.path.exists(MODEL_PATH), f"No model found at {MODEL_PATH}!" |
| with open(MODEL_PATH) as f: |
| payload = json.load(f) |
|
|
| hp = payload['hyperparams'] |
| n_embd = hp['n_embd'] |
| n_head = hp['n_head'] |
| n_layer = hp['n_layer'] |
| block_size = hp['block_size'] |
| head_dim = hp['head_dim'] |
| uchars = payload['vocab'] |
| vocab_size = payload['vocab_size'] |
| weights = payload['weights'] |
| BOS = vocab_size - 1 |
| stoi = {ch: i for i, ch in enumerate(uchars)} |
|
|
| print(f"Model loaded: {n_embd}d, {n_head}h, {n_layer}L, vocab={vocab_size}") |
|
|
| |
| def linear(x, w): |
| return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w] |
|
|
| def rmsnorm(x): |
| ms = sum(xi * xi for xi in x) / len(x) |
| scale = (ms + 1e-5) ** -0.5 |
| return [xi * scale for xi in x] |
|
|
| def softmax_floats(logits): |
| m = max(logits) |
| exps = [math.exp(v - m) for v in logits] |
| s = sum(exps) |
| return [e / s for e in exps] |
|
|
| def gpt_forward(token_id, pos_id, keys, values): |
| tok_emb = weights['wte'][token_id] |
| pos_emb = weights['wpe'][pos_id] |
| x = [t + p for t, p in zip(tok_emb, pos_emb)] |
| x = rmsnorm(x) |
| for li in range(n_layer): |
| x_res = x |
| x = rmsnorm(x) |
| q = linear(x, weights[f'layer{li}.attn_wq']) |
| k = linear(x, weights[f'layer{li}.attn_wk']) |
| v = linear(x, weights[f'layer{li}.attn_wv']) |
| keys[li].append(k) |
| values[li].append(v) |
| x_attn = [] |
| for h in range(n_head): |
| hs = h * head_dim |
| q_h = q[hs:hs+head_dim] |
| k_h = [ki[hs:hs+head_dim] for ki in keys[li]] |
| v_h = [vi[hs:hs+head_dim] for vi in values[li]] |
| attn = [sum(q_h[j]*k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))] |
| aw = softmax_floats(attn) |
| head_out = [sum(aw[t]*v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)] |
| x_attn.extend(head_out) |
| x = linear(x_attn, weights[f'layer{li}.attn_wo']) |
| x = [a + b for a, b in zip(x, x_res)] |
| x_res = x |
| x = rmsnorm(x) |
| x = linear(x, weights[f'layer{li}.mlp_fc1']) |
| x = [max(0, xi) for xi in x] |
| x = linear(x, weights[f'layer{li}.mlp_fc2']) |
| x = [a + b for a, b in zip(x, x_res)] |
| return linear(x, weights['lm_head']) |
|
|
| def score_filename(name): |
| """Return negative log-likelihood (lower = more normal).""" |
| toks = [BOS] + [stoi[c] for c in name if c in stoi] + [BOS] |
| if len(toks) > block_size + 1: |
| toks = toks[:block_size + 1] |
| keys = [[] for _ in range(n_layer)] |
| vals = [[] for _ in range(n_layer)] |
| total_nll = 0.0 |
| for pos in range(len(toks) - 1): |
| logits = gpt_forward(toks[pos], pos, keys, vals) |
| probs = softmax_floats(logits) |
| p = probs[toks[pos + 1]] |
| total_nll += -math.log(p) if p > 0 else 1e6 |
| return total_nll |
|
|
| |
| normal_filenames = [ |
| "acr_banner_spring25_enUS_v01.png", |
| "acr_email_bf24_enGB_v02.jpg", |
| "acr_video_demo_enUS_v01.mp4", |
| "acr_logo_primary_enUS_v03.svg", |
| "acr_report_fy24q4_enUS_v01.pdf", |
| ] |
|
|
| anomalous_filenames = [ |
| "DELETE_THIS_NOW.exe", |
| "..hidden_config.bat", |
| "photo_2024_vacation_IMG_3847.HEIC", |
| "meeting notes final FINAL v2 (1).docx", |
| "acr banner spring enUS v01.png", |
| ] |
|
|
| print("\n--- Normal filenames (should have LOW NLL) ---") |
| for fn in normal_filenames: |
| nll = score_filename(fn) |
| print(f" NLL {nll:7.2f} | {fn}") |
|
|
| print("\n--- Anomalous filenames (should have HIGH NLL) ---") |
| for fn in anomalous_filenames: |
| nll = score_filename(fn) |
| print(f" NLL {nll:7.2f} | {fn}") |
|
|