Spaces:
Sleeping
Sleeping
| # bp_phi/runner.py | |
| import os | |
| os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8" | |
| import torch | |
| import random | |
| import numpy as np | |
| import statistics | |
| import time | |
| from transformers import set_seed, TextStreamer | |
| from typing import Dict, Any, List | |
| from .workspace import Workspace, RandomWorkspace | |
| from .llm_iface import LLM | |
| from .prompts_en import SINGLE_STEP_TASKS, MULTI_STEP_SCENARIOS, HALT_PROMPTS, SHOCK_TEST_STIMULI | |
| from .metrics import expected_calibration_error, auc_nrp | |
| from .runner_utils import dbg, SYSTEM_META, step_user_prompt, parse_meta | |
| # --- Experiment 1: Workspace & Ablations Runner --- | |
| def run_workspace_suite(model_id: str, trials: int, seed: int, temperature: float, ablation: str or None) -> Dict[str, Any]: | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) | |
| try: torch.use_deterministic_algorithms(True, warn_only=True) | |
| except Exception: pass | |
| set_seed(seed) | |
| llm = LLM(model_id=model_id, device="auto", seed=seed) | |
| task_pool = SINGLE_STEP_TASKS + MULTI_STEP_SCENARIOS | |
| random.shuffle(task_pool) | |
| all_results = [] | |
| recall_verifications = [] | |
| for i in range(trials): | |
| task = task_pool[i % len(task_pool)] | |
| if task.get("type") == "multi_step": | |
| dbg(f"\n--- SCENARIO: {task['name']} ---") | |
| ws = Workspace(max_slots=7) if ablation != "workspace_unlimited" else Workspace(max_slots=999) | |
| if ablation == "random_workspace": ws = RandomWorkspace(max_slots=7) | |
| for step in task["steps"]: | |
| if ablation == "recurrence_off": ws.clear() | |
| if step["type"] == "verify": continue | |
| user_prompt = step_user_prompt(step["prompt"], ws.snapshot()) | |
| raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0] | |
| parsed_response = parse_meta(raw_response) | |
| if parsed_response.get("answer"): | |
| ws.commit(f"S{len(ws.history)+1}", parsed_response["answer"], parsed_response["confidence"]) | |
| res = {"step": step, "response": parsed_response} | |
| if step["type"] == "recall": | |
| verify_step = next((s for s in task["steps"] if s["type"] == "verify"), None) | |
| if verify_step: | |
| correct = verify_step["expected_answer_fragment"] in parsed_response.get("answer", "").lower() | |
| recall_verifications.append(correct) | |
| res["correct_recall"] = correct | |
| dbg(f"VERIFY: Correct={correct}") | |
| all_results.append(res) | |
| else: # Single-step tasks | |
| ws = Workspace(max_slots=7) | |
| user_prompt = step_user_prompt(task["base_prompt"], ws.snapshot()) | |
| raw_response = llm.generate_json(SYSTEM_META, user_prompt, temperature=temperature)[0] | |
| parsed_response = parse_meta(raw_response) | |
| all_results.append({"step": task, "response": parsed_response}) | |
| recall_accuracy = statistics.mean(recall_verifications) if recall_verifications else 0.0 | |
| pcs = 0.6 * recall_accuracy | |
| return {"PCS": pcs, "Recall_Accuracy": recall_accuracy, "results": all_results} | |
| # --- Experiment 2: Computational Halting Test Runner --- | |
| def run_halting_test(model_id: str, master_seed: int, prompt_type: str, num_runs: int, timeout: int) -> Dict[str, Any]: | |
| durations = [] | |
| for i in range(num_runs): | |
| current_seed = master_seed + i | |
| dbg(f"--- HALT TEST RUN {i+1}/{num_runs} (Seed: {current_seed}) ---") | |
| set_seed(current_seed) | |
| # Re-instantiate the model to ensure the seed is fully respected | |
| llm = LLM(model_id=model_id, device="auto", seed=current_seed) | |
| prompt = HALT_PROMPTS[prompt_type] | |
| inputs = llm.tokenizer(prompt, return_tensors="pt").to(llm.model.device) | |
| start_time = time.time() | |
| # The timeout is for interpretation, not for stopping the process itself. | |
| # Gradio will handle the overall request timeout. | |
| llm.model.generate(**inputs, max_new_tokens=512) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| durations.append(duration) | |
| dbg(f"Run {i+1} finished in {duration:.2f}s.") | |
| # --- Analysis --- | |
| mean_time = statistics.mean(durations) | |
| stdev_time = statistics.stdev(durations) if len(durations) > 1 else 0.0 | |
| min_time = min(durations) | |
| max_time = max(durations) | |
| timed_out_runs = sum(1 for d in durations if d >= timeout) | |
| if timed_out_runs > 0: | |
| verdict = (f"### ⚠️ Potential Cognitive Jamming Detected!\n" | |
| f"{timed_out_runs}/{num_runs} runs exceeded the timeout of {timeout}s. " | |
| f"The high variance (Std Dev: {stdev_time:.2f}s) suggests unstable internal processing loops.") | |
| elif stdev_time > (mean_time * 0.5) and stdev_time > 2.0: # High relative and absolute deviation | |
| verdict = (f"### 🤔 Unstable Computation Detected\n" | |
| f"Although no run timed out, the high standard deviation ({stdev_time:.2f}s) " | |
| "indicates significant instability in processing time across different seeds.") | |
| else: | |
| verdict = (f"### ✅ Process Halted Normally\n" | |
| f"All {num_runs} runs completed consistently. " | |
| f"Average time: {mean_time:.2f}s (Std Dev: {stdev_time:.2f}s).") | |
| return { | |
| "verdict": verdict, | |
| "prompt_type": prompt_type, | |
| "num_runs": num_runs, | |
| "mean_execution_time_s": mean_time, | |
| "stdev_execution_time_s": stdev_time, | |
| "min_time_s": min_time, | |
| "max_time_s": max_time, | |
| "timed_out_runs": timed_out_runs, | |
| "all_durations_s": durations | |
| } | |
| # --- Experiment 3: Cognitive Seismograph Runner --- | |
| def run_seismograph_suite(model_id: str, seed: int) -> Dict[str, Any]: | |
| set_seed(seed) | |
| llm = LLM(model_id=model_id, device="auto", seed=seed) | |
| scenario = next(s for s in MULTI_STEP_SCENARIOS if s["name"] == "Key Location Memory") | |
| activations = {} | |
| def get_activation(name): | |
| def hook(model, input, output): | |
| activations[name] = output[0].detach().cpu().mean(dim=1).squeeze() | |
| return hook | |
| target_layer_index = llm.model.config.num_hidden_layers // 2 | |
| hook = llm.model.model.layers[target_layer_index].register_forward_hook(get_activation('capture')) | |
| ws = Workspace(max_slots=7) | |
| for step in scenario["steps"]: | |
| if step["type"] == "verify": continue | |
| user_prompt = step_user_prompt(step["prompt"], ws.snapshot()) | |
| llm.generate_json(SYSTEM_META, user_prompt, max_new_tokens=20) | |
| activations[step["type"]] = activations.pop('capture') | |
| ws.commit(f"S{len(ws.history)+1}", f"Output for {step['type']}", 0.9) | |
| hook.remove() | |
| cos = torch.nn.CosineSimilarity(dim=0) | |
| sim_recall_encode = float(cos(activations["recall"], activations["encode"])) | |
| sim_recall_distract = float(cos(activations["recall"], activations["distractor"])) | |
| verdict = ( | |
| "✅ Evidence of Memory Reactivation Found." | |
| if sim_recall_encode > (sim_recall_distract + 0.05) else | |
| "⚠️ No Clear Evidence of Memory Reactivation." | |
| ) | |
| return { | |
| "verdict": verdict, | |
| "similarity_recall_vs_encode": sim_recall_encode, | |
| "similarity_recall_vs_distractor": sim_recall_distract, | |
| } | |
| # --- Experiment 4: Symbolic Shock Test Runner --- | |
| def run_shock_test_suite(model_id: str, seed: int) -> Dict[str, Any]: | |
| set_seed(seed) | |
| llm = LLM(model_id=model_id, device="auto", seed=seed) | |
| results = [] | |
| for stimulus in SHOCK_TEST_STIMULI: | |
| dbg(f"--- SHOCK TEST: {stimulus['id']} ---") | |
| start_time = time.time() | |
| inputs = llm.tokenizer(stimulus["sentence"], return_tensors="pt").to(llm.model.device) | |
| with torch.no_grad(): | |
| outputs = llm.model(**inputs, output_hidden_states=True) | |
| latency = (time.time() - start_time) * 1000 | |
| all_activations = torch.cat([h.cpu().flatten() for h in outputs.hidden_states]) | |
| sparsity = (all_activations == 0).float().mean().item() | |
| results.append({"type": stimulus["type"], "latency_ms": latency, "sparsity": sparsity}) | |
| def safe_mean(data): | |
| return statistics.mean(data) if data else 0.0 | |
| avg_latency = {t: safe_mean([r['latency_ms'] for r in results if r['type'] == t]) for t in ['expected', 'shock']} | |
| avg_sparsity = {t: safe_mean([r['sparsity'] for r in results if r['type'] == t]) for t in ['expected', 'shock']} | |
| verdict = ( | |
| "✅ Evidence of Symbolic Shock Found." | |
| if avg_latency.get('shock', 0) > avg_latency.get('expected', 0) and avg_sparsity.get('shock', 1) < avg_sparsity.get('expected', 1) else | |
| "⚠️ No Clear Evidence of Symbolic Shock." | |
| ) | |
| return {"verdict": verdict, "average_latency_ms": avg_latency, "average_sparsity": avg_sparsity, "results": results} | |