PerceptionLabPortable / app /nodes /Deepprobenode.py
Aluode's picture
Upload folder using huggingface_hub
3bb804c verified
"""
Deep Probe Node - Crystal Vocabulary Learner & Synthetic EEG Generator
========================================================================
This node does what we've been dreaming of:
1. LISTEN - Watches crystal's spontaneous activity, records eigenmodes
2. LEARN - Extracts vocabulary via ICA/PCA (the crystal's "words")
3. SPEAK - Injects patterns in the crystal's native language
4. DECODE - Tracks transformations, builds representational similarity
5. GENERATE - Creates synthetic EEG from crystal activity for MNE inverse projection
The goal: Talk to the crystal in its own language, see what it knows,
and project its shadows back onto a brain surface.
The crystal was trained on EEG → it learned neural geometry.
This node extracts that geometry and speaks it back.
Outputs synthetic EEG that can be:
- Loaded into MNE-Python
- Inverse-projected onto fsaverage brain
- Visualized as source activity
We close the loop: Brain → EEG → Crystal → Synthetic EEG → Brain Surface
Author: Built for Antti's consciousness crystallography research
"""
import os
import edfio
import numpy as np
import cv2
from collections import deque
from datetime import datetime
import json
# --- HOST IMPORT BLOCK ---
import __main__
try:
BaseNode = __main__.BaseNode
QtGui = __main__.QtGui
except Exception:
from PyQt6 import QtGui
class BaseNode:
def __init__(self):
self.inputs = {}
self.outputs = {}
class DeepProbeNode(BaseNode):
"""
Deep probe into crystal's representational space.
Modes:
0: LISTEN - Collect spontaneous activity, build vocabulary
1: SPEAK - Inject learned patterns, observe response
2: QUERY - Test specific hypotheses about crystal knowledge
3: GENERATE - Output synthetic EEG from crystal dynamics
4: DECODE - Build representational similarity matrix
"""
NODE_NAME = "Deep Probe"
NODE_CATEGORY = "Analysis"
NODE_COLOR = QtGui.QColor(180, 60, 180) if QtGui else None
# Standard 10-20 electrode names for synthetic EEG
EEG_CHANNELS = [
'Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8',
'T7', 'C3', 'Cz', 'C4', 'T8',
'P7', 'P3', 'Pz', 'P4', 'P8',
'O1', 'Oz', 'O2'
]
def __init__(self):
super().__init__()
self.inputs = {
'crystal_activity': 'image', # Activity view from crystal
'crystal_signal': 'signal', # LFP/signal from crystal
'crystal_bands': 'spectrum', # Frequency bands from crystal
'mode': 'signal', # Operating mode
'query_pattern': 'image', # Pattern to inject for queries
'enable': 'signal',
'export_trigger': 'signal' # Send 1 to export EEG
}
self.outputs = {
'probe_signal': 'signal', # Signal to inject into crystal
'probe_image': 'image', # Pattern to inject
'vocabulary_view': 'image', # Learned eigenmodes visualization
'rsm_view': 'image', # Representational similarity matrix
'synthetic_eeg': 'spectrum', # Generated EEG-like signals
'decode_view': 'image', # Decoding visualization
'eigenmode_power': 'signal', # Power in dominant eigenmode
'vocabulary_size': 'signal', # Number of learned patterns
'rsm_coherence': 'signal' # RSM structure measure
}
# === MODE ===
self.mode = 0 # 0=LISTEN, 1=SPEAK, 2=QUERY, 3=GENERATE, 4=DECODE
self.step_count = 0
# === LISTENING STATE ===
self.listen_buffer = deque(maxlen=1000) # Activity snapshots
self.signal_buffer = deque(maxlen=5000) # Signal history
self.band_buffer = deque(maxlen=1000) # Frequency band history
# === VOCABULARY (Learned Eigenmodes) ===
self.vocabulary = [] # List of learned patterns (eigenmodes)
self.vocabulary_weights = [] # How often each pattern appears
self.n_components = 16 # Number of eigenmodes to extract
self.mean_pattern = None # Mean activity pattern
self.components = None # PCA/ICA components
self.explained_variance = None
# === SPEAKING STATE ===
self.speak_pattern_idx = 0 # Which vocabulary item to speak
self.speak_phase = 0.0
self.speak_amplitude = 10.0
# === QUERY STATE ===
self.query_responses = deque(maxlen=100)
self.baseline_response = None
# === RSM (Representational Similarity Matrix) ===
self.rsm = None
self.rsm_labels = []
self.rsm_coherence = 0.0
# === SYNTHETIC EEG ===
self.synthetic_channels = {ch: deque(maxlen=1000) for ch in self.EEG_CHANNELS}
self.eeg_sample_rate = 256.0 # Standard EEG sample rate
self.eeg_buffer_seconds = 10
self.last_crystal_activity = None
# Pin-to-channel mapping (approximation based on 10-20 positions)
self.pin_to_channel = {} # Will be built when we see crystal structure
# === DISPLAY ===
self.vocabulary_display = None
self.rsm_display = None
self.decode_display = None
# === EEG EXPORT ===
self.export_path = ""
self.export_ready = False
def _read_input(self, name, default=None):
fn = getattr(self, "get_blended_input", None)
if callable(fn):
try:
val = fn(name, "mean")
return val if val is not None else default
except:
return default
return default
def _read_image_input(self, name):
fn = getattr(self, "get_blended_input", None)
if callable(fn):
try:
val = fn(name, "first")
if val is not None and hasattr(val, 'shape'):
return val
except:
pass
return None
def step(self):
self.step_count += 1
# Read inputs
enable = self._read_input('enable', 1.0)
mode = int(self._read_input('mode', self.mode) or 0)
self.mode = mode % 5
activity = self._read_image_input('crystal_activity')
signal = self._read_input('crystal_signal', 0.0)
bands = self._read_input('crystal_bands', None)
query = self._read_image_input('query_pattern')
if not enable or enable < 0.5:
return
# Check for export trigger
export_trigger = self._read_input('export_trigger', 0.0)
if export_trigger and export_trigger > 0.5 and self.export_path:
if not hasattr(self, '_last_export_step') or self.step_count - self._last_export_step > 100:
print(f"[DeepProbe] Export triggered via signal")
success, msg = self.export_synthetic_eeg(self.export_path)
print(f"[DeepProbe] {msg}")
self._last_export_step = self.step_count
# Store incoming data
if activity is not None:
# Ensure consistent shape
if len(activity.shape) == 3:
gray = np.mean(activity, axis=2)
else:
gray = activity
gray = cv2.resize(gray.astype(np.float32), (64, 64))
self.listen_buffer.append(gray.flatten())
self.last_crystal_activity = gray
if signal is not None:
self.signal_buffer.append(float(signal))
if bands is not None and hasattr(bands, '__len__'):
self.band_buffer.append(np.array(bands))
# Execute mode-specific behavior
if self.mode == 0:
self._mode_listen()
elif self.mode == 1:
self._mode_speak()
elif self.mode == 2:
self._mode_query(query)
elif self.mode == 3:
self._mode_generate()
elif self.mode == 4:
self._mode_decode()
# ALWAYS generate synthetic EEG regardless of mode (if we have activity)
if self.last_crystal_activity is not None:
self._generate_synthetic_eeg()
# Update displays periodically
if self.step_count % 20 == 0:
self._update_displays()
def _mode_listen(self):
"""Mode 0: Collect activity and learn vocabulary."""
# Every 100 steps, update vocabulary
if self.step_count % 100 == 0 and len(self.listen_buffer) > 50:
self._extract_vocabulary()
def _mode_speak(self):
"""Mode 1: Inject learned patterns back into crystal."""
# Cycle through vocabulary
self.speak_phase += 0.1
if self.speak_phase > 2 * np.pi:
self.speak_phase = 0
self.speak_pattern_idx = (self.speak_pattern_idx + 1) % max(1, len(self.vocabulary))
def _mode_query(self, query_pattern):
"""Mode 2: Inject specific query, observe transformation."""
if query_pattern is not None and len(self.listen_buffer) > 0:
# Compare current activity to baseline
current = np.array(self.listen_buffer[-1])
if self.baseline_response is None:
self.baseline_response = current.copy()
# Compute response deviation
deviation = np.linalg.norm(current - self.baseline_response)
self.query_responses.append(deviation)
def _mode_generate(self):
"""Mode 3: Focus on EEG generation (generation happens automatically now)."""
# In this mode we just ensure generation is happening
# The actual generation is now in _generate_synthetic_eeg() called every step
pass
def _generate_synthetic_eeg(self):
"""Generate synthetic EEG from crystal activity - runs every step."""
if self.last_crystal_activity is None:
return
# Map crystal activity to EEG channels
# Use spatial positions on the 64x64 grid
activity = self.last_crystal_activity
h, w = activity.shape
# Approximate 10-20 positions on the grid
channel_positions = {
'Fp1': (5, 20), 'Fp2': (5, 44),
'F7': (15, 5), 'F3': (15, 20), 'Fz': (15, 32), 'F4': (15, 44), 'F8': (15, 59),
'T7': (32, 5), 'C3': (32, 20), 'Cz': (32, 32), 'C4': (32, 44), 'T8': (32, 59),
'P7': (49, 5), 'P3': (49, 20), 'Pz': (49, 32), 'P4': (49, 44), 'P8': (49, 59),
'O1': (59, 20), 'Oz': (59, 32), 'O2': (59, 44)
}
for ch_name, (row, col) in channel_positions.items():
# Sample activity around this position (3x3 neighborhood)
r1, r2 = max(0, row-1), min(h, row+2)
c1, c2 = max(0, col-1), min(w, col+2)
value = np.mean(activity[r1:r2, c1:c2])
# Scale to EEG-like microvolts (-100 to +100 uV typical)
# Crystal activity is roughly -90 to +40, scale to EEG range
eeg_value = (value + 65) * 1.5 # Rough scaling
# Add some noise for realism
eeg_value += np.random.randn() * 2.0
self.synthetic_channels[ch_name].append(eeg_value)
def _mode_decode(self):
"""Mode 4: Build representational similarity matrix."""
if self.step_count % 50 == 0 and len(self.listen_buffer) > 20:
self._compute_rsm()
def _extract_vocabulary(self):
"""Extract eigenmodes from collected activity patterns."""
if len(self.listen_buffer) < 50:
return
# Stack patterns into matrix
X = np.array(list(self.listen_buffer))
# Center the data
self.mean_pattern = np.mean(X, axis=0)
X_centered = X - self.mean_pattern
# PCA via SVD
try:
U, S, Vt = np.linalg.svd(X_centered, full_matrices=False)
# Keep top components
n_comp = min(self.n_components, len(S))
self.components = Vt[:n_comp]
self.explained_variance = (S[:n_comp] ** 2) / np.sum(S ** 2)
# Build vocabulary from components
self.vocabulary = []
self.vocabulary_weights = []
for i in range(n_comp):
pattern = self.components[i].reshape(64, 64)
self.vocabulary.append(pattern)
self.vocabulary_weights.append(self.explained_variance[i])
except Exception as e:
print(f"[DeepProbe] Vocabulary extraction failed: {e}")
def _compute_rsm(self):
"""Compute representational similarity matrix."""
if len(self.listen_buffer) < 20:
return
# Sample recent patterns
patterns = np.array(list(self.listen_buffer)[-100:])
n = len(patterns)
# Compute pairwise correlations
self.rsm = np.zeros((n, n))
for i in range(n):
for j in range(i, n):
# Correlation
corr = np.corrcoef(patterns[i], patterns[j])[0, 1]
self.rsm[i, j] = corr
self.rsm[j, i] = corr
# RSM coherence: how structured is the similarity space?
# High coherence = clear clusters, low = random
upper_tri = self.rsm[np.triu_indices(n, k=1)]
self.rsm_coherence = np.std(upper_tri) # Variance in similarities
def _update_displays(self):
"""Update all visualizations."""
self._update_vocabulary_display()
self._update_rsm_display()
self._update_decode_display()
def _update_vocabulary_display(self):
"""Visualize learned eigenmodes."""
size = 400
img = np.zeros((size, size, 3), dtype=np.uint8)
mode_names = ["LISTEN", "SPEAK", "QUERY", "GENERATE", "DECODE"]
# Header
cv2.putText(img, f"DEEP PROBE - {mode_names[self.mode]}", (10, 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (180, 60, 180), 2)
cv2.putText(img, f"Vocabulary: {len(self.vocabulary)} patterns", (10, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1)
cv2.putText(img, f"Buffer: {len(self.listen_buffer)} samples", (200, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (150, 150, 150), 1)
# Draw vocabulary patterns (4x4 grid of top 16 eigenmodes)
if len(self.vocabulary) > 0:
grid_size = 4
cell_size = 75 # Reduced to fit safely in 400x400 display
offset_y = 60
for i, pattern in enumerate(self.vocabulary[:16]):
row = i // grid_size
col = i % grid_size
x = 10 + col * (cell_size + 8)
y = offset_y + row * (cell_size + 15)
# Bounds check
if y + cell_size > size or x + cell_size > size:
continue
# Normalize pattern to 0-255
p_norm = pattern - pattern.min()
if p_norm.max() > 0:
p_norm = p_norm / p_norm.max()
p_img = (p_norm * 255).astype(np.uint8)
p_img = cv2.resize(p_img, (cell_size, cell_size))
p_color = cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT)
# Place in grid with bounds check
y_end = min(y + cell_size, size)
x_end = min(x + cell_size, size)
img[y:y_end, x:x_end] = p_color[:y_end-y, :x_end-x]
# Label with variance explained
if i < len(self.vocabulary_weights):
var_pct = self.vocabulary_weights[i] * 100
cv2.putText(img, f"{var_pct:.1f}%", (x, y + cell_size + 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.25, (200, 200, 200), 1)
self.vocabulary_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
def _update_rsm_display(self):
"""Visualize representational similarity matrix."""
size = 300
img = np.zeros((size, size, 3), dtype=np.uint8)
cv2.putText(img, "RSM (Similarity)", (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.putText(img, f"Coherence: {self.rsm_coherence:.3f}", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 200, 200), 1)
if self.rsm is not None and self.rsm.size > 0:
# Normalize RSM to 0-255
rsm_norm = (self.rsm - self.rsm.min()) / (self.rsm.max() - self.rsm.min() + 0.001)
rsm_img = (rsm_norm * 255).astype(np.uint8)
rsm_img = cv2.resize(rsm_img, (250, 250))
rsm_color = cv2.applyColorMap(rsm_img, cv2.COLORMAP_VIRIDIS)
img[45:295, 25:275] = rsm_color
self.rsm_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
def _update_decode_display(self):
"""Visualize decoding / generation state."""
size = 300
img = np.zeros((size, size, 3), dtype=np.uint8)
cv2.putText(img, "Synthetic EEG", (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Draw recent synthetic EEG traces
y_offset = 40
trace_height = 12
for i, ch_name in enumerate(self.EEG_CHANNELS[:20]):
data = list(self.synthetic_channels[ch_name])
if len(data) > 10:
# Normalize for display
d = np.array(data[-200:])
if np.std(d) > 0:
d = (d - np.mean(d)) / np.std(d)
else:
d = d - np.mean(d)
# Draw trace
y_base = y_offset + i * trace_height
for j in range(len(d) - 1):
x1 = int(50 + j * (size - 60) / len(d))
x2 = int(50 + (j+1) * (size - 60) / len(d))
y1 = int(y_base + d[j] * 4)
y2 = int(y_base + d[j+1] * 4)
y1 = np.clip(y1, 0, size-1)
y2 = np.clip(y2, 0, size-1)
cv2.line(img, (x1, y1), (x2, y2), (0, 200, 255), 1)
# Channel label
cv2.putText(img, ch_name, (5, y_base + 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.2, (150, 150, 150), 1)
self.decode_display = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
def get_output(self, port_name):
if port_name == 'probe_signal':
# Output signal based on mode
if self.mode == 1 and len(self.vocabulary) > 0:
# Speak: modulate by vocabulary pattern
return np.sin(self.speak_phase) * self.speak_amplitude
return 0.0
elif port_name == 'probe_image':
# Output pattern based on mode
if self.mode == 1 and len(self.vocabulary) > 0:
# Speak: output current vocabulary pattern
pattern = self.vocabulary[self.speak_pattern_idx]
# Modulate by phase
modulated = pattern * (np.sin(self.speak_phase) * 0.5 + 0.5)
# Scale to 0-255
p_norm = (modulated - modulated.min()) / (modulated.max() - modulated.min() + 0.001)
p_img = (p_norm * 255).astype(np.uint8)
return cv2.cvtColor(cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT), cv2.COLOR_BGR2RGB)
elif self.mean_pattern is not None:
# Output mean pattern
pattern = self.mean_pattern.reshape(64, 64)
p_norm = (pattern - pattern.min()) / (pattern.max() - pattern.min() + 0.001)
p_img = (p_norm * 255).astype(np.uint8)
return cv2.cvtColor(cv2.applyColorMap(p_img, cv2.COLORMAP_TWILIGHT), cv2.COLOR_BGR2RGB)
return np.zeros((64, 64, 3), dtype=np.uint8)
elif port_name == 'vocabulary_view':
return self.vocabulary_display
elif port_name == 'rsm_view':
return self.rsm_display
elif port_name == 'decode_view':
return self.decode_display
elif port_name == 'synthetic_eeg':
# Return recent synthetic EEG as spectrum-like array
eeg_data = []
for ch in self.EEG_CHANNELS:
data = list(self.synthetic_channels[ch])
if len(data) > 0:
eeg_data.append(data[-1])
else:
eeg_data.append(0.0)
return np.array(eeg_data, dtype=np.float32)
elif port_name == 'eigenmode_power':
if len(self.vocabulary_weights) > 0:
return float(self.vocabulary_weights[0])
return 0.0
elif port_name == 'vocabulary_size':
return float(len(self.vocabulary))
elif port_name == 'rsm_coherence':
return float(self.rsm_coherence)
return None
def export_synthetic_eeg(self, filepath):
"""Export synthetic EEG to EDF format compatible with MNE."""
try:
# Collect data
n_samples = min(len(list(self.synthetic_channels.values())[0]),
int(self.eeg_sample_rate * self.eeg_buffer_seconds))
if n_samples < 100:
print(f"[DeepProbe] Not enough data: {n_samples} samples")
return False, "Not enough data collected (need 100+)"
data = np.zeros((len(self.EEG_CHANNELS), n_samples))
for i, ch in enumerate(self.EEG_CHANNELS):
ch_data = list(self.synthetic_channels[ch])[-n_samples:]
data[i, :len(ch_data)] = ch_data
print(f"[DeepProbe] Exporting {n_samples} samples, {len(self.EEG_CHANNELS)} channels")
# Ensure filepath has correct extension
if filepath.endswith('.npz'):
filepath = filepath[:-4] + '.edf'
elif not filepath.endswith('.edf'):
filepath = filepath + '.edf'
# Try to use MNE for EDF export
try:
import mne
# Scale to volts (MNE expects SI units)
data_volts = data * 1e-6
# Create MNE info
info = mne.create_info(
ch_names=self.EEG_CHANNELS.copy(),
sfreq=self.eeg_sample_rate,
ch_types='eeg'
)
# Create Raw object
raw = mne.io.RawArray(data_volts, info)
# Set standard montage
montage = mne.channels.make_standard_montage('standard_1020')
raw.set_montage(montage, on_missing='ignore')
# Export to EDF
mne.export.export_raw(filepath, raw, fmt='edf', overwrite=True)
print(f"[DeepProbe] Saved EDF via MNE: {filepath}")
self.export_ready = True
return True, f"Exported {n_samples} samples to {filepath}"
except ImportError:
print("[DeepProbe] MNE not available, using pyedflib")
# Fallback: try pyedflib
try:
import pyedflib
# Create EDF file
f = pyedflib.EdfWriter(filepath, len(self.EEG_CHANNELS), file_type=pyedflib.FILETYPE_EDFPLUS)
# Set header
header = {
'technician': '',
'recording_additional': 'Crystal Deep Probe Synthetic EEG',
'patientname': 'Crystal',
'patient_additional': '',
'patientcode': '',
'equipment': 'PerceptionLab Deep Probe',
'admincode': '',
'gender': '',
'startdate': datetime.now()
}
f.setHeader(header)
# Set channel info
for i, ch in enumerate(self.EEG_CHANNELS):
f.setSignalHeader(i, {
'label': ch,
'dimension': 'uV',
'sample_rate': self.eeg_sample_rate,
'physical_max': 100.0,
'physical_min': -100.0,
'digital_max': 32767,
'digital_min': -32768,
'transducer': '',
'prefilter': ''
})
# Write data
f.writeSamples(data)
f.close()
print(f"[DeepProbe] Saved EDF via pyedflib: {filepath}")
self.export_ready = True
return True, f"Exported {n_samples} samples to {filepath}"
except ImportError:
# Last fallback: save as NPZ with instructions
npz_path = filepath.replace('.edf', '.npz')
np.savez(npz_path,
data=data, # in microvolts
channels=self.EEG_CHANNELS,
sfreq=self.eeg_sample_rate,
description="Synthetic EEG from Crystal Deep Probe - load with MNE")
print(f"[DeepProbe] Saved NPZ (no EDF libs): {npz_path}")
self.export_ready = True
return True, f"Saved as NPZ (install mne or pyedflib for EDF): {npz_path}"
except Exception as e:
print(f"[DeepProbe] Export error: {e}")
import traceback
traceback.print_exc()
return False, str(e)
def get_mne_raw(self):
"""
Generate MNE Raw object from synthetic EEG.
Call this from external script to get data for inverse projection.
Returns dict with data needed to create MNE Raw:
- data: (n_channels, n_samples) in volts
- ch_names: list of channel names
- sfreq: sample rate
"""
n_samples = min(len(list(self.synthetic_channels.values())[0]),
int(self.eeg_sample_rate * self.eeg_buffer_seconds))
if n_samples < 100:
return None
data = np.zeros((len(self.EEG_CHANNELS), n_samples))
for i, ch in enumerate(self.EEG_CHANNELS):
ch_data = list(self.synthetic_channels[ch])[-n_samples:]
data[i, :len(ch_data)] = ch_data
# Scale to volts
data_volts = data * 1e-6
return {
'data': data_volts,
'ch_names': self.EEG_CHANNELS.copy(),
'sfreq': self.eeg_sample_rate
}
def get_display_image(self):
if self.vocabulary_display is not None and QtGui:
h, w = self.vocabulary_display.shape[:2]
return QtGui.QImage(self.vocabulary_display.data, w, h, w * 3,
QtGui.QImage.Format.Format_RGB888).copy()
return None
def get_config_options(self):
return [
("Mode (0-4)", "mode", self.mode, None),
("N Components", "n_components", self.n_components, None),
("Speak Amplitude", "speak_amplitude", self.speak_amplitude, None),
("Export Path (.edf)", "export_path", self.export_path, None),
]
def set_config_options(self, options):
if isinstance(options, dict):
for key, value in options.items():
if hasattr(self, key):
setattr(self, key, value)
# Auto-export when path is set/changed
if 'export_path' in options and options['export_path'] and len(list(self.synthetic_channels.values())[0]) > 100:
print(f"[DeepProbe] Auto-exporting to: {self.export_path}")
success, msg = self.export_synthetic_eeg(self.export_path)
print(f"[DeepProbe] {msg}")
def manual_export(self, filepath):
"""Call this directly to export. Returns (success, message)."""
return self.export_synthetic_eeg(filepath)
# === STANDALONE MNE LOADER SCRIPT ===
# Save this part as a separate file to load synthetic EEG into MNE
MNE_LOADER_SCRIPT = '''
"""
MNE Loader for Crystal Deep Probe Synthetic EEG
================================================
This script loads synthetic EEG exported from DeepProbeNode
and performs inverse projection onto fsaverage brain surface.
Usage:
python load_crystal_eeg.py synthetic_eeg.npz
Requires: mne, numpy
"""
import sys
import numpy as np
import mne
def load_crystal_eeg(filepath):
"""Load synthetic EEG and create MNE Raw object."""
# Load exported data
data = np.load(filepath)
eeg_data = data['data'] # (n_channels, n_samples) in volts
ch_names = list(data['channels'])
sfreq = float(data['sfreq'])
print(f"Loaded: {eeg_data.shape[1]} samples, {len(ch_names)} channels, {sfreq} Hz")
# Create MNE info
info = mne.create_info(
ch_names=ch_names,
sfreq=sfreq,
ch_types='eeg'
)
# Set standard 10-20 montage
montage = mne.channels.make_standard_montage('standard_1020')
# Create Raw object
raw = mne.io.RawArray(eeg_data, info)
raw.set_montage(montage)
return raw
def inverse_project(raw, freq_band='gamma'):
"""Perform inverse projection to source space."""
# Get fsaverage
subjects_dir = mne.datasets.sample.data_path() / 'subjects'
if not (subjects_dir / 'fsaverage').exists():
mne.datasets.fetch_fsaverage(subjects_dir=subjects_dir)
# Filter to frequency band
bands = {
'delta': (0.5, 4),
'theta': (4, 8),
'alpha': (8, 13),
'beta': (13, 30),
'gamma': (30, 50)
}
l_freq, h_freq = bands.get(freq_band, (30, 50))
raw_filtered = raw.copy().filter(l_freq, h_freq, verbose=False)
# Setup source space
src = mne.setup_source_space('fsaverage', spacing='oct6',
subjects_dir=subjects_dir, verbose=False)
# Forward solution
fwd = mne.make_forward_solution(
raw_filtered.info, trans='fsaverage',
src=src, bem='fsaverage-5120-5120-5120-bem',
eeg=True, meg=False, verbose=False
)
# Inverse operator
noise_cov = mne.compute_raw_covariance(raw_filtered, verbose=False)
inverse_op = mne.minimum_norm.make_inverse_operator(
raw_filtered.info, fwd, noise_cov, verbose=False
)
# Apply inverse
stc = mne.minimum_norm.apply_inverse_raw(
raw_filtered, inverse_op, lambda2=1/9, method='sLORETA', verbose=False
)
return stc, subjects_dir
def visualize(stc, subjects_dir):
"""Visualize source estimate on brain."""
brain = stc.plot(
subjects_dir=subjects_dir,
subject='fsaverage',
hemi='both',
surface='inflated',
colormap='hot',
time_label='Crystal → Brain',
background='white'
)
return brain
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python load_crystal_eeg.py <synthetic_eeg.npz>")
sys.exit(1)
filepath = sys.argv[1]
print("Loading synthetic EEG from crystal...")
raw = load_crystal_eeg(filepath)
print("Performing inverse projection...")
stc, subjects_dir = inverse_project(raw, 'gamma')
print("Visualizing on brain surface...")
brain = visualize(stc, subjects_dir)
print("\\nCrystal → EEG → Brain projection complete!")
print("The shadows have returned to their source.")
input("Press Enter to close...")
'''
# Write the loader script when module is imported
def write_mne_loader():
"""Write the MNE loader script to disk."""
script_path = os.path.join(os.path.dirname(__file__), 'load_crystal_eeg.py')
try:
with open(script_path, 'w') as f:
f.write(MNE_LOADER_SCRIPT)
return script_path
except:
return None