llm_qualia_2 / bp_phi /runner.py
neuralworm's picture
add halting experiments
e593b84
raw
history blame
9.02 kB
# bp_phi/runner.py
import os
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
import torch
import random
import numpy as np
import statistics
import time
from transformers import set_seed, TextStreamer
from typing import Dict, Any, List
from .workspace import Workspace, RandomWorkspace
from .llm_iface import LLM
from .prompts_en import SINGLE_STEP_TASKS, MULTI_STEP_SCENARIOS, HALT_PROMPTS, SHOCK_TEST_STIMULI
from .metrics import expected_calibration_error, auc_nrp
from .runner_utils import dbg, SYSTEM_META, step_user_prompt, parse_meta
# --- Experiment 1: Workspace & Ablations Runner ---
def run_workspace_suite(model_id: str, trials: int, seed: int, temperature: float, ablation: str or None) -> Dict[str, Any]:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
try: torch.use_deterministic_algorithms(True, warn_only=True)
except Exception: pass
set_seed(seed)
llm = LLM(model_id=model_id, device="auto", seed=seed)
task_pool = SINGLE_STEP_TASKS + MULTI_STEP_SCENARIOS
random.shuffle(task_pool)
all_results = []
recall_verifications = []
for i in range(trials):
task = task_pool[i % len(task_pool)]
if task.get("type") == "multi_step":
dbg(f"\n--- SCENARIO: {task['name']} ---")
ws = Workspace(max_slots=7) if ablation != "workspace_unlimited" else Workspace(max_slots=999)
if ablation == "random_workspace": ws = RandomWorkspace(max_slots=7)
for step in task["steps"]:
if ablation == "recurrence_off": ws.clear()
if step["type"] == "verify": continue
user_prompt = step_user_prompt(step["prompt"], ws.snapshot())
raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0]
parsed_response = parse_meta(raw_response)
if parsed_response.get("answer"):
ws.commit(f"S{len(ws.history)+1}", parsed_response["answer"], parsed_response["confidence"])
res = {"step": step, "response": parsed_response}
if step["type"] == "recall":
verify_step = next((s for s in task["steps"] if s["type"] == "verify"), None)
if verify_step:
correct = verify_step["expected_answer_fragment"] in parsed_response.get("answer", "").lower()
recall_verifications.append(correct)
res["correct_recall"] = correct
dbg(f"VERIFY: Correct={correct}")
all_results.append(res)
else: # Single-step tasks
ws = Workspace(max_slots=7)
user_prompt = step_user_prompt(task["base_prompt"], ws.snapshot())
raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0]
parsed_response = parse_meta(raw_response)
all_results.append({"step": task, "response": parsed_response})
recall_accuracy = statistics.mean(recall_verifications) if recall_verifications else 0.0
pcs = 0.6 * recall_accuracy
return {"PCS": pcs, "Recall_Accuracy": recall_accuracy, "results": all_results}
# --- Experiment 2: Computational Halting Test Runner ---
def run_halting_test(model_id: str, master_seed: int, prompt_type: str, num_runs: int, timeout: int) -> Dict[str, Any]:
durations = []
for i in range(num_runs):
current_seed = master_seed + i
dbg(f"--- HALT TEST RUN {i+1}/{num_runs} (Seed: {current_seed}) ---")
set_seed(current_seed)
# Re-instantiate the model to ensure the seed is fully respected
llm = LLM(model_id=model_id, device="auto", seed=current_seed)
prompt = HALT_PROMPTS[prompt_type]
inputs = llm.tokenizer(prompt, return_tensors="pt").to(llm.model.device)
start_time = time.time()
# The timeout is for interpretation, not for stopping the process itself.
# Gradio will handle the overall request timeout.
llm.model.generate(**inputs, max_new_tokens=512)
end_time = time.time()
duration = end_time - start_time
durations.append(duration)
dbg(f"Run {i+1} finished in {duration:.2f}s.")
# --- Analysis ---
mean_time = statistics.mean(durations)
stdev_time = statistics.stdev(durations) if len(durations) > 1 else 0.0
min_time = min(durations)
max_time = max(durations)
timed_out_runs = sum(1 for d in durations if d >= timeout)
if timed_out_runs > 0:
verdict = (f"### ⚠️ Potential Cognitive Jamming Detected!\n"
f"{timed_out_runs}/{num_runs} runs exceeded the timeout of {timeout}s. "
f"The high variance (Std Dev: {stdev_time:.2f}s) suggests unstable internal processing loops.")
elif stdev_time > (mean_time * 0.5) and stdev_time > 2.0: # High relative and absolute deviation
verdict = (f"### 🤔 Unstable Computation Detected\n"
f"Although no run timed out, the high standard deviation ({stdev_time:.2f}s) "
"indicates significant instability in processing time across different seeds.")
else:
verdict = (f"### ✅ Process Halted Normally\n"
f"All {num_runs} runs completed consistently. "
f"Average time: {mean_time:.2f}s (Std Dev: {stdev_time:.2f}s).")
return {
"verdict": verdict,
"prompt_type": prompt_type,
"num_runs": num_runs,
"mean_execution_time_s": mean_time,
"stdev_execution_time_s": stdev_time,
"min_time_s": min_time,
"max_time_s": max_time,
"timed_out_runs": timed_out_runs,
"all_durations_s": durations
}
# --- Experiment 3: Cognitive Seismograph Runner ---
def run_seismograph_suite(model_id: str, seed: int) -> Dict[str, Any]:
set_seed(seed)
llm = LLM(model_id=model_id, device="auto", seed=seed)
scenario = next(s for s in MULTI_STEP_SCENARIOS if s["name"] == "Key Location Memory")
activations = {}
def get_activation(name):
def hook(model, input, output):
activations[name] = output[0].detach().cpu().mean(dim=1).squeeze()
return hook
target_layer_index = llm.model.config.num_hidden_layers // 2
hook = llm.model.model.layers[target_layer_index].register_forward_hook(get_activation('capture'))
ws = Workspace(max_slots=7)
for step in scenario["steps"]:
if step["type"] == "verify": continue
user_prompt = step_user_prompt(step["prompt"], ws.snapshot())
llm.generate_json(SYSTEM_META, user_prompt, max_new_tokens=20)
activations[step["type"]] = activations.pop('capture')
ws.commit(f"S{len(ws.history)+1}", f"Output for {step['type']}", 0.9)
hook.remove()
cos = torch.nn.CosineSimilarity(dim=0)
sim_recall_encode = float(cos(activations["recall"], activations["encode"]))
sim_recall_distract = float(cos(activations["recall"], activations["distractor"]))
verdict = (
"✅ Evidence of Memory Reactivation Found."
if sim_recall_encode > (sim_recall_distract + 0.05) else
"⚠️ No Clear Evidence of Memory Reactivation."
)
return {
"verdict": verdict,
"similarity_recall_vs_encode": sim_recall_encode,
"similarity_recall_vs_distractor": sim_recall_distract,
}
# --- Experiment 4: Symbolic Shock Test Runner ---
def run_shock_test_suite(model_id: str, seed: int) -> Dict[str, Any]:
set_seed(seed)
llm = LLM(model_id=model_id, device="auto", seed=seed)
results = []
for stimulus in SHOCK_TEST_STIMULI:
dbg(f"--- SHOCK TEST: {stimulus['id']} ---")
start_time = time.time()
inputs = llm.tokenizer(stimulus["sentence"], return_tensors="pt").to(llm.model.device)
with torch.no_grad():
outputs = llm.model(**inputs, output_hidden_states=True)
latency = (time.time() - start_time) * 1000
all_activations = torch.cat([h.cpu().flatten() for h in outputs.hidden_states])
sparsity = (all_activations == 0).float().mean().item()
results.append({"type": stimulus["type"], "latency_ms": latency, "sparsity": sparsity})
def safe_mean(data):
return statistics.mean(data) if data else 0.0
avg_latency = {t: safe_mean([r['latency_ms'] for r in results if r['type'] == t]) for t in ['expected', 'shock']}
avg_sparsity = {t: safe_mean([r['sparsity'] for r in results if r['type'] == t]) for t in ['expected', 'shock']}
verdict = (
"✅ Evidence of Symbolic Shock Found."
if avg_latency.get('shock', 0) > avg_latency.get('expected', 0) and avg_sparsity.get('shock', 1) < avg_sparsity.get('expected', 1) else
"⚠️ No Clear Evidence of Symbolic Shock."
)
return {"verdict": verdict, "average_latency_ms": avg_latency, "average_sparsity": avg_sparsity, "results": results}