jit-lora / src /ane_lora_trainer.py
Ex0bit's picture
Upload complete JIT LoRA research: paper, source code, tests, and figures
208eb59
"""
ane_lora_trainer.py — LoRA training engine using Apple Neural Engine.
Manages per-layer LoRA adapters (A & B matrices), compiles ANE kernels once,
and runs forward/backward passes on ANE hardware. Training loop:
1. Forward: base model inference via MLX, with LoRA additions via ANE
2. Loss: cross-entropy computed on CPU
3. Backward: LoRA gradients computed on ANE
4. Update: Adam optimizer on CPU (LoRA params only — tiny, instant)
The adapter weights live as numpy arrays in shared memory. MLX inference
reads them (zero-copy via mlx.array), ANE training writes updated values.
"""
import json
import logging
import math
import time
from pathlib import Path
from typing import Optional
import numpy as np
from ane_bridge_py import ANEBridge
from ane_mil_lora import LoRAKernelSet
from neural_config import NeuralConfig
log = logging.getLogger("ane_lora_trainer")
class LoRAAdapter:
"""Per-target LoRA adapter (A & B matrices) for all layers."""
def __init__(self, n_layers: int, dim: int, rank: int):
self.n_layers = n_layers
self.dim = dim
self.rank = rank
# A: [rank, dim] — initialized with small random values (Kaiming)
# B: [dim, rank] — initialized to zeros (standard LoRA init)
scale = 1.0 / math.sqrt(dim)
self.A = [np.random.randn(rank, dim).astype(np.float32) * scale
for _ in range(n_layers)]
self.B = [np.zeros((dim, rank), dtype=np.float32)
for _ in range(n_layers)]
def param_count(self) -> int:
"""Total trainable parameters."""
return self.n_layers * 2 * self.dim * self.rank
def memory_bytes(self) -> int:
"""Total memory for adapter weights."""
return self.param_count() * 4 # fp32
class AdamState:
"""Adam optimizer state for LoRA parameters."""
def __init__(self, adapter: LoRAAdapter, lr: float = 1e-5,
beta1: float = 0.9, beta2: float = 0.999,
eps: float = 1e-8, weight_decay: float = 0.0):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.weight_decay = weight_decay
self.t = 0 # Step counter
n = adapter.n_layers
# First moment (m) and second moment (v) for each parameter
self.m_A = [np.zeros_like(adapter.A[i]) for i in range(n)]
self.v_A = [np.zeros_like(adapter.A[i]) for i in range(n)]
self.m_B = [np.zeros_like(adapter.B[i]) for i in range(n)]
self.v_B = [np.zeros_like(adapter.B[i]) for i in range(n)]
def step(self, adapter: LoRAAdapter,
grads_A: list[np.ndarray], grads_B: list[np.ndarray],
grad_clip: float = 1.0):
"""One Adam update step for all layers.
Args:
adapter: LoRA adapter to update in-place
grads_A: list of dA gradients per layer
grads_B: list of dB gradients per layer
grad_clip: max gradient norm (per-parameter)
"""
self.t += 1
bc1 = 1 - self.beta1 ** self.t # Bias correction
bc2 = 1 - self.beta2 ** self.t
for i in range(adapter.n_layers):
for param, grad, m, v in [
(adapter.A, grads_A, self.m_A, self.v_A),
(adapter.B, grads_B, self.m_B, self.v_B),
]:
g = grad[i]
# Gradient clipping (per-parameter norm)
gnorm = np.linalg.norm(g)
if gnorm > grad_clip:
g = g * (grad_clip / gnorm)
# Weight decay (decoupled, AdamW-style)
if self.weight_decay > 0:
param[i] -= self.lr * self.weight_decay * param[i]
# Adam moments
m[i] = self.beta1 * m[i] + (1 - self.beta1) * g
v[i] = self.beta2 * v[i] + (1 - self.beta2) * g * g
# Bias-corrected update
m_hat = m[i] / bc1
v_hat = v[i] / bc2
param[i] -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
class ANELoRATrainer:
"""Main training engine orchestrating ANE kernels + optimizer.
Usage:
trainer = ANELoRATrainer(config)
trainer.initialize(n_layers=32, dim=3584)
# Per-turn training
for input_ids, target_ids in training_data:
loss = trainer.train_step(activations, target_logits)
# Save adapter
trainer.save_adapter("/path/to/adapter/")
"""
def __init__(self, config: NeuralConfig):
self.config = config
self.ane: Optional[ANEBridge] = None
self.kernels: Optional[LoRAKernelSet] = None
self.initialized = False
# Per-target adapters: {target_name: LoRAAdapter}
self.adapters: dict[str, LoRAAdapter] = {}
self.optimizers: dict[str, AdamState] = {}
# Training stats
self.total_steps = 0
self.total_cycles = 0
self.last_loss = float('inf')
self.loss_history: list[float] = []
self.adapter_version = 0
def initialize(self, n_layers: int, dim: int):
"""Initialize ANE bridge, compile kernels, create adapters.
Args:
n_layers: number of transformer layers
dim: model hidden dimension
"""
rank = self.config.lora_rank
seq = self.config.ane_seq_len
scaling = self.config.lora_scaling
log.info(f"Initializing ANE LoRA trainer: {n_layers} layers, "
f"dim={dim}, rank={rank}, seq={seq}, scaling={scaling:.2f}")
# Init ANE bridge
self.ane = ANEBridge()
log.info(f"ANE bridge initialized (compile budget: "
f"{self.ane.compile_budget_remaining})")
# Compile LoRA kernels (4 kernels total, reused across all layers)
self.kernels = LoRAKernelSet(self.ane, dim, rank, seq, scaling)
log.info(f"LoRA kernels compiled (4 kernels, "
f"compile count: {self.ane.compile_count})")
# Create per-target adapters
for target in self.config.lora_targets:
adapter = LoRAAdapter(n_layers, dim, rank)
self.adapters[target] = adapter
self.optimizers[target] = AdamState(
adapter,
lr=self.config.learning_rate,
beta1=self.config.adam_beta1,
beta2=self.config.adam_beta2,
eps=self.config.adam_eps,
weight_decay=self.config.weight_decay,
)
total_params = sum(a.param_count() for a in self.adapters.values())
total_mb = sum(a.memory_bytes() for a in self.adapters.values()) / 1e6
log.info(f"Adapters initialized: {len(self.adapters)} targets, "
f"{total_params:,} params ({total_mb:.1f} MB)")
self.initialized = True
self.n_layers = n_layers
self.dim = dim
def get_adapter_weights(self, target: str, layer: int) -> tuple[np.ndarray, np.ndarray]:
"""Get LoRA A and B matrices for a specific target and layer.
Used by MLX inference to add LoRA contribution.
Returns:
(A [rank, dim], B [dim, rank])
"""
adapter = self.adapters[target]
return adapter.A[layer], adapter.B[layer]
def compute_lora_forward(self, target: str, layer: int,
x: np.ndarray) -> np.ndarray:
"""Compute LoRA forward pass for one target in one layer on ANE.
Args:
target: "q_proj" or "v_proj"
layer: transformer layer index
x: [1, dim, 1, seq] fp32 activation
Returns:
[1, dim, 1, seq] fp32 LoRA output (to be added to base output)
"""
adapter = self.adapters[target]
return self.kernels.forward(x, adapter.A[layer], adapter.B[layer])
def compute_lora_backward(self, target: str, layer: int,
grad_out: np.ndarray,
x: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Compute LoRA gradients for one target in one layer on ANE.
Args:
target: "q_proj" or "v_proj"
layer: transformer layer index
grad_out: [1, dim, 1, seq] fp32 upstream gradient
x: [1, dim, 1, seq] fp32 saved activation
Returns:
(dA [rank, dim], dB [dim, rank])
"""
adapter = self.adapters[target]
return self.kernels.backward(
grad_out, x, adapter.A[layer], adapter.B[layer])
def train_step(self, layer_activations: list[np.ndarray],
logits: np.ndarray, target_ids: np.ndarray) -> float:
"""One complete training step: forward + loss + backward + update.
This is the simplified version that computes LoRA gradients
using a "shortcut" approach: we approximate the gradient by
computing dL/d(lora_output) for each layer independently,
treating the base model's gradient flow as given.
For the full training loop with proper gradient propagation,
the neural_daemon integrates with MLX's autograd.
Args:
layer_activations: list of [1, dim, 1, seq] per layer
(saved during MLX forward pass)
logits: [vocab, seq] fp32 model output logits
target_ids: [seq] int target token IDs
Returns:
float: cross-entropy loss value
"""
if not self.initialized:
raise RuntimeError("Trainer not initialized")
# 1. Compute loss and gradient of logits
loss, dlogits = self._cross_entropy_backward(logits, target_ids)
# 2. Compute LoRA gradients for each target and layer
all_grads: dict[str, tuple[list[np.ndarray], list[np.ndarray]]] = {}
for target in self.adapters:
grads_A = []
grads_B = []
for layer_idx in range(self.n_layers):
# Get saved activation for this layer
x = layer_activations[layer_idx]
# For now, use dlogits as approximate gradient signal
# In the full implementation, MLX computes proper per-layer gradients
# and feeds them through the daemon's training pipeline
grad_out = self._approximate_layer_gradient(
layer_idx, dlogits, layer_activations)
# Compute LoRA gradients on ANE
dA, dB = self.compute_lora_backward(
target, layer_idx, grad_out, x)
grads_A.append(dA)
grads_B.append(dB)
all_grads[target] = (grads_A, grads_B)
# 3. Adam update for each target
for target, (grads_A, grads_B) in all_grads.items():
self.optimizers[target].step(
self.adapters[target], grads_A, grads_B,
grad_clip=self.config.gradient_clip)
self.total_steps += 1
self.last_loss = loss
self.loss_history.append(loss)
return loss
def train_micro_step_direct(self, target: str, layer: int,
x: np.ndarray,
grad_out: np.ndarray) -> tuple[float, float]:
"""Direct micro-training step for a single layer/target.
Called by the neural daemon when MLX provides per-layer gradients.
This is the primary training interface.
Args:
target: "q_proj" or "v_proj"
layer: layer index
x: [1, dim, 1, seq] fp32 activation
grad_out: [1, dim, 1, seq] fp32 gradient from MLX
Returns:
(grad_norm_A, grad_norm_B) for monitoring
"""
# Compute gradients on ANE
dA, dB = self.compute_lora_backward(target, layer, grad_out, x)
# Update just this layer
adapter = self.adapters[target]
optimizer = self.optimizers[target]
optimizer.t += 1
bc1 = 1 - optimizer.beta1 ** optimizer.t
bc2 = 1 - optimizer.beta2 ** optimizer.t
grad_norm_A = float(np.linalg.norm(dA))
grad_norm_B = float(np.linalg.norm(dB))
for param_list, grad, m_list, v_list in [
(adapter.A, dA, optimizer.m_A, optimizer.v_A),
(adapter.B, dB, optimizer.m_B, optimizer.v_B),
]:
g = grad
gnorm = np.linalg.norm(g)
if gnorm > self.config.gradient_clip:
g = g * (self.config.gradient_clip / gnorm)
if self.config.weight_decay > 0:
param_list[layer] -= optimizer.lr * self.config.weight_decay * param_list[layer]
m_list[layer] = optimizer.beta1 * m_list[layer] + (1 - optimizer.beta1) * g
v_list[layer] = optimizer.beta2 * v_list[layer] + (1 - optimizer.beta2) * g * g
m_hat = m_list[layer] / bc1
v_hat = v_list[layer] / bc2
param_list[layer] -= optimizer.lr * m_hat / (np.sqrt(v_hat) + optimizer.eps)
return grad_norm_A, grad_norm_B
def run_training_cycle(self, layer_activations: list[np.ndarray],
logits: np.ndarray, target_ids: np.ndarray,
steps: int = 0) -> dict:
"""Run a full micro-training cycle (multiple steps on same data).
Args:
layer_activations: per-layer activations from forward pass
logits: model output logits
target_ids: target token IDs
steps: number of steps (0 = use config default)
Returns:
dict with training metrics
"""
steps = steps or self.config.steps_per_cycle
start = time.time()
losses = []
for step in range(steps):
loss = self.train_step(layer_activations, logits, target_ids)
losses.append(loss)
elapsed = time.time() - start
self.total_cycles += 1
# Auto-save
if (self.config.auto_save_interval > 0 and
self.total_cycles % self.config.auto_save_interval == 0):
self.save_adapter()
self.adapter_version += 1
return {
"cycle": self.total_cycles,
"steps": steps,
"initial_loss": losses[0],
"final_loss": losses[-1],
"mean_loss": float(np.mean(losses)),
"elapsed_sec": elapsed,
"steps_per_sec": steps / elapsed if elapsed > 0 else 0,
"adapter_version": self.adapter_version,
}
@staticmethod
def _cross_entropy_backward(logits: np.ndarray,
target_ids: np.ndarray) -> tuple[float, np.ndarray]:
"""Compute cross-entropy loss and gradient w.r.t. logits.
Args:
logits: [vocab, seq] fp32
target_ids: [seq] int
Returns:
(loss, dlogits [vocab, seq])
"""
vocab, seq_len = logits.shape
# Stable softmax
logits_shifted = logits - logits.max(axis=0, keepdims=True)
exp_logits = np.exp(logits_shifted)
probs = exp_logits / exp_logits.sum(axis=0, keepdims=True)
# Loss: -log(prob of correct token)
target_probs = probs[target_ids, np.arange(seq_len)]
loss = -np.log(target_probs + 1e-10).mean()
# Gradient: probs - one_hot(target)
dlogits = probs.copy()
dlogits[target_ids, np.arange(seq_len)] -= 1.0
dlogits /= seq_len # Mean reduction
return float(loss), dlogits
def _approximate_layer_gradient(self, layer_idx: int,
dlogits: np.ndarray,
activations: list[np.ndarray]) -> np.ndarray:
"""Approximate per-layer gradient for standalone training.
Uses the layer's activation as a gradient proxy, scaled by layer depth
and a lightweight signal from the loss gradient. This avoids the
prohibitively expensive random projection from vocab-size space.
In the full daemon, MLX computes exact gradients.
"""
seq = self.config.ane_seq_len
dim = self.dim
# Scale factor: layers closer to output get more gradient
depth_scale = (layer_idx + 1) / self.n_layers
# Use the layer activation itself as gradient proxy,
# scaled by loss gradient magnitude (cheap approximation)
activation = activations[layer_idx] # [1, dim, 1, seq]
grad_magnitude = np.sqrt((dlogits ** 2).mean()) * depth_scale
# Add small perturbation based on layer index for gradient diversity
rng = np.random.RandomState(layer_idx + self.total_steps)
noise = rng.randn(1, dim, 1, seq).astype(np.float32) * 0.01
grad = (activation * grad_magnitude + noise).astype(np.float32)
return grad.reshape(1, dim, 1, seq)
def save_adapter(self, path: str = ""):
"""Save all adapter weights to disk."""
path = path or self.config.adapter_dir
Path(path).mkdir(parents=True, exist_ok=True)
for target, adapter in self.adapters.items():
target_dir = Path(path) / target
target_dir.mkdir(exist_ok=True)
for i in range(adapter.n_layers):
np.save(str(target_dir / f"A_{i:03d}.npy"), adapter.A[i])
np.save(str(target_dir / f"B_{i:03d}.npy"), adapter.B[i])
# Save metadata
meta = {
"n_layers": self.n_layers,
"dim": self.dim,
"rank": self.config.lora_rank,
"targets": list(self.adapters.keys()),
"total_steps": self.total_steps,
"total_cycles": self.total_cycles,
"last_loss": self.last_loss,
"adapter_version": self.adapter_version,
"timestamp": time.time(),
}
with open(Path(path) / "adapter_meta.json", "w") as f:
json.dump(meta, f, indent=2)
log.info(f"Adapter saved to {path} (v{self.adapter_version}, "
f"{self.total_steps} steps, loss={self.last_loss:.4f})")
def load_adapter(self, path: str = ""):
"""Load adapter weights from disk."""
path = path or self.config.adapter_dir
meta_path = Path(path) / "adapter_meta.json"
if not meta_path.exists():
log.warning(f"No adapter found at {path}")
return False
with open(meta_path) as f:
meta = json.load(f)
for target in meta["targets"]:
if target not in self.adapters:
log.warning(f"Adapter target {target} not in current config")
continue
adapter = self.adapters[target]
target_dir = Path(path) / target
for i in range(min(meta["n_layers"], adapter.n_layers)):
a_path = target_dir / f"A_{i:03d}.npy"
b_path = target_dir / f"B_{i:03d}.npy"
if a_path.exists() and b_path.exists():
adapter.A[i] = np.load(str(a_path))
adapter.B[i] = np.load(str(b_path))
self.total_steps = meta.get("total_steps", 0)
self.total_cycles = meta.get("total_cycles", 0)
self.last_loss = meta.get("last_loss", float('inf'))
self.adapter_version = meta.get("adapter_version", 0)
log.info(f"Adapter loaded from {path} (v{self.adapter_version}, "
f"{self.total_steps} steps)")
return True
def reset_adapter(self):
"""Reset all adapters to initial values (fresh start)."""
for target, adapter in self.adapters.items():
scale = 1.0 / math.sqrt(adapter.dim)
for i in range(adapter.n_layers):
adapter.A[i] = np.random.randn(
adapter.rank, adapter.dim).astype(np.float32) * scale
adapter.B[i] = np.zeros(
(adapter.dim, adapter.rank), dtype=np.float32)
# Reset optimizer state
optimizer = self.optimizers[target]
optimizer.t = 0
for i in range(adapter.n_layers):
optimizer.m_A[i].fill(0)
optimizer.v_A[i].fill(0)
optimizer.m_B[i].fill(0)
optimizer.v_B[i].fill(0)
self.total_steps = 0
self.total_cycles = 0
self.last_loss = float('inf')
self.loss_history.clear()
self.adapter_version += 1
log.info("Adapter reset to initial values")
def update_learning_rate(self, lr: float):
"""Update learning rate for all optimizers."""
for opt in self.optimizers.values():
opt.lr = lr
self.config.learning_rate = lr
def stats(self) -> dict:
"""Return training statistics."""
total_params = sum(a.param_count() for a in self.adapters.values())
total_mb = sum(a.memory_bytes() for a in self.adapters.values()) / 1e6
result = {
"initialized": self.initialized,
"total_params": total_params,
"adapter_memory_mb": round(total_mb, 1),
"targets": list(self.adapters.keys()),
"total_steps": self.total_steps,
"total_cycles": self.total_cycles,
"last_loss": self.last_loss,
"adapter_version": self.adapter_version,
}
if self.ane:
result["ane_compile_count"] = self.ane.compile_count
result["ane_compile_budget"] = self.ane.compile_budget_remaining
if self.loss_history:
recent = self.loss_history[-10:]
result["recent_avg_loss"] = round(float(np.mean(recent)), 4)
return result
def cleanup(self):
"""Free ANE resources."""
if self.kernels:
self.kernels.free()
self.kernels = None
self.initialized = False
log.info("ANE LoRA trainer cleaned up")
def self_test():
"""Test the training engine with a small model."""
logging.basicConfig(level=logging.INFO,
format="%(name)s: %(message)s")
print("ANE LoRA Trainer Self-Test")
print("=" * 50)
config = NeuralConfig()
config.lora_rank = 16
config.lora_targets = ["q_proj", "v_proj"]
config.ane_seq_len = 16
config.learning_rate = 1e-4 # Higher LR for test
config.adapter_dir = "/tmp/jarvis_lora_test"
config.resolve_paths()
trainer = ANELoRATrainer(config)
# Test with small dims
n_layers = 4
dim = 64
seq = 16
vocab = 128
print(f"\nInitializing: {n_layers} layers, dim={dim}, rank={config.lora_rank}")
trainer.initialize(n_layers, dim)
print(f"[OK] Initialized: {trainer.stats()['total_params']:,} params")
# Test forward pass
print("\nTesting LoRA forward pass...")
x = np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.1
out_q = trainer.compute_lora_forward("q_proj", 0, x)
out_v = trainer.compute_lora_forward("v_proj", 0, x)
print(f"[OK] Forward: q_proj max={np.abs(out_q).max():.6f}, "
f"v_proj max={np.abs(out_v).max():.6f}")
# Test training step
print("\nTesting training step...")
activations = [np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.1
for _ in range(n_layers)]
logits = np.random.randn(vocab, seq).astype(np.float32)
target_ids = np.random.randint(0, vocab, size=seq)
loss = trainer.train_step(activations, logits, target_ids)
print(f"[OK] Training step: loss={loss:.4f}")
# Test multiple steps (verify loss changes)
print("\nRunning 5 training steps...")
losses = [loss]
for _ in range(4):
l = trainer.train_step(activations, logits, target_ids)
losses.append(l)
print(f"[OK] Losses: {[f'{l:.4f}' for l in losses]}")
print(f" Steps completed: {trainer.total_steps}")
# Test direct micro-step
print("\nTesting direct micro-step...")
grad_out = np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.01
gn_a, gn_b = trainer.train_micro_step_direct("q_proj", 0, x, grad_out)
print(f"[OK] Micro-step: grad_norm_A={gn_a:.6f}, grad_norm_B={gn_b:.6f}")
# Test save/load
print("\nTesting save/load...")
trainer.save_adapter()
# Get current weights
A_before, B_before = trainer.get_adapter_weights("q_proj", 0)
A_copy = A_before.copy()
# Reset and verify weights changed
trainer.reset_adapter()
A_after, _ = trainer.get_adapter_weights("q_proj", 0)
assert not np.allclose(A_copy, A_after), "Reset didn't change weights"
# Load and verify weights restored
trainer.load_adapter()
A_loaded, _ = trainer.get_adapter_weights("q_proj", 0)
assert np.allclose(A_copy, A_loaded), "Loaded weights don't match saved"
print("[OK] Save/load round-trip verified")
# Cleanup
trainer.cleanup()
print(f"\n[PASS] All trainer tests passed")
print(f" Stats: {trainer.stats()}")
# Clean up test files
import shutil
shutil.rmtree("/tmp/jarvis_lora_test", ignore_errors=True)
return True
if __name__ == "__main__":
success = self_test()
exit(0 if success else 1)