Codette-Reasoning / training /train_cpu_lean.py
Raiff1982's picture
Upload 120 files
ed1b365 verified
#!/usr/bin/env python3
"""Pipeline 1: CPU-Lean LoRA Training for Codette Adapters
Train one adapter at a time on CPU using bf16 weights + LoRA.
Designed for machines with 16-32 GB RAM. Uses page file for overflow.
Memory: ~18 GB peak (bf16 model + LoRA + activations)
Speed: ~30-90s per step on Intel Core Ultra (Lunar Lake)
Time: ~3-9 hours per adapter (depending on dataset size + epochs)
Usage:
python train_cpu_lean.py newton
python train_cpu_lean.py empathy --epochs 2
python train_cpu_lean.py quantum --rank 16 --seq-len 512
python train_cpu_lean.py --list # Show available adapters
python train_cpu_lean.py newton --resume # Resume from checkpoint
The script auto-converts the trained adapter to GGUF format for inference.
Runs at BELOW_NORMAL priority so your computer stays responsive.
"""
import os, sys, time, json, gc, argparse, math
from pathlib import Path
from datetime import datetime, timedelta
# ── Environment bootstrap ───────────────────────────────────────
_site = r"J:\Lib\site-packages"
if _site not in sys.path:
sys.path.insert(0, _site)
os.environ["PATH"] = (
r"J:\Lib\site-packages\Library\bin" + os.pathsep + os.environ.get("PATH", "")
)
os.environ["HF_HOME"] = r"J:\hf_cache" # Keep model cache on J: (lots of space)
os.environ["TRANSFORMERS_CACHE"] = r"J:\hf_cache"
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
except Exception:
pass
# ── Set background priority ────────────────────────────────────
def set_low_priority():
"""Set process to BELOW_NORMAL priority so it doesn't lag the system."""
try:
import ctypes
BELOW_NORMAL = 0x00004000
handle = ctypes.windll.kernel32.GetCurrentProcess()
ctypes.windll.kernel32.SetPriorityClass(handle, BELOW_NORMAL)
print(" Process priority: BELOW_NORMAL (background-friendly)")
except Exception:
pass
# ── Memory monitoring ──────────────────────────────────────────
def get_memory_gb():
"""Return (used_gb, total_gb, page_used_gb, page_total_gb)."""
try:
import ctypes
class MEMSTAT(ctypes.Structure):
_fields_ = [
('dwLength', ctypes.c_ulong), ('dwMemoryLoad', ctypes.c_ulong),
('ullTotalPhys', ctypes.c_ulonglong), ('ullAvailPhys', ctypes.c_ulonglong),
('ullTotalPageFile', ctypes.c_ulonglong), ('ullAvailPageFile', ctypes.c_ulonglong),
('ullTotalVirtual', ctypes.c_ulonglong), ('ullAvailVirtual', ctypes.c_ulonglong),
('ullAvailExtendedVirtual', ctypes.c_ulonglong),
]
m = MEMSTAT(dwLength=ctypes.sizeof(MEMSTAT))
ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(m))
used = (m.ullTotalPhys - m.ullAvailPhys) / 1e9
total = m.ullTotalPhys / 1e9
page_used = (m.ullTotalPageFile - m.ullAvailPageFile) / 1e9
page_total = m.ullTotalPageFile / 1e9
return used, total, page_used, page_total
except Exception:
return 0, 0, 0, 0
def print_memory(label=""):
used, total, pu, pt = get_memory_gb()
pct = (used / total * 100) if total > 0 else 0
page_pct = (pu / pt * 100) if pt > 0 else 0
print(f" [{label}] RAM: {used:.1f}/{total:.1f} GB ({pct:.0f}%) | "
f"Page: {pu:.1f}/{pt:.1f} GB ({page_pct:.0f}%)")
# ── Configuration ──────────────────────────────────────────────
PROJECT_ROOT = Path(r"J:\codette-training-lab")
DATASET_DIR = PROJECT_ROOT / "datasets"
ADAPTER_OUT = PROJECT_ROOT / "adapters"
CKPT_DIR = PROJECT_ROOT / "training" / "checkpoints"
GGUF_CONVERTER = Path(r"J:\TheAI\llama.cpp\convert_lora_to_gguf.py")
MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct"
ADAPTER_CONFIG = {
"newton": {"dataset": "newton_reasoning.jsonl", "examples": 3000},
"davinci": {"dataset": "davinci_reasoning.jsonl", "examples": 2500},
"empathy": {"dataset": "empathy_reasoning.jsonl", "examples": 2500},
"philosophy": {"dataset": "philosophy_reasoning.jsonl", "examples": 2000},
"quantum": {"dataset": "quantum_reasoning.jsonl", "examples": 2000},
"consciousness": {"dataset": "consciousness_reasoning.jsonl", "examples": 3000},
"multi_perspective": {"dataset": "multi_perspective_reasoning.jsonl", "examples": 2500},
"systems_architecture": {"dataset": "systems_architecture_reasoning.jsonl", "examples": 2000},
}
# ── Dataset loading ────────────────────────────────────────────
def load_dataset_jsonl(adapter_name, max_examples=None):
"""Load chat-format JSONL dataset for an adapter."""
cfg = ADAPTER_CONFIG[adapter_name]
path = DATASET_DIR / cfg["dataset"]
if not path.exists():
raise FileNotFoundError(
f"Dataset not found: {path}\n"
f"Run the dataset engine first: python dataset_engine/generate.py {adapter_name}"
)
data = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
if max_examples and len(data) > max_examples:
data = data[:max_examples]
print(f" Dataset: {path.name} ({len(data)} examples)")
return data
def format_chat_to_text(messages, tokenizer):
"""Convert chat messages to a single training string using the model's chat template."""
try:
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
except Exception:
# Fallback: manual formatting
parts = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if role == "system":
parts.append(f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>")
elif role == "user":
parts.append(f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>")
elif role == "assistant":
parts.append(f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>")
text = "".join(parts)
return text
# ── Training loop ──────────────────────────────────────────────
def train_adapter(
adapter_name,
epochs=3,
rank=8,
alpha=16,
lr=2e-4,
batch_size=1,
grad_accum=16,
max_seq_len=256,
save_steps=100,
resume=False,
max_examples=None,
):
"""Train a single LoRA adapter on CPU."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model, TaskType
set_low_priority()
print_memory("before model load")
# ── Load tokenizer ──────────────────────────────────────
print(f"\n Loading tokenizer: {MODEL_ID}")
tokenizer = AutoTokenizer.from_pretrained(
MODEL_ID,
use_fast=True,
trust_remote_code=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
# ── Load model in bf16 ──────────────────────────────────
print(f" Loading model in bf16 (this takes a few minutes with page file)...")
print(f" If this is the first run, the model will be downloaded (~16 GB).")
print(f" Model cache: {os.environ.get('HF_HOME', '~/.cache/huggingface')}")
load_start = time.time()
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True, # Load layer-by-layer (lower peak RAM)
device_map="cpu",
)
model.config.use_cache = False # Required for gradient checkpointing
print(f" Model loaded in {time.time() - load_start:.0f}s")
print_memory("after model load")
# ── Enable gradient checkpointing ───────────────────────
model.gradient_checkpointing_enable()
print(" Gradient checkpointing: ON (saves ~40% activation memory)")
# ── Configure LoRA ──────────────────────────────────────
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=rank,
lora_alpha=alpha,
lora_dropout=0.05,
target_modules=["q_proj", "v_proj"], # Fewer targets = less memory
bias="none",
)
model = get_peft_model(model, lora_config)
trainable, total = model.get_nb_trainable_parameters()
print(f" LoRA: rank={rank}, alpha={alpha}, targets=q_proj,v_proj")
print(f" Trainable: {trainable:,} / {total:,} ({100*trainable/total:.4f}%)")
print_memory("after LoRA")
# ── Load dataset ────────────────────────────────────────
data = load_dataset_jsonl(adapter_name, max_examples=max_examples)
# ── Tokenize dataset ────────────────────────────────────
print(f" Tokenizing {len(data)} examples (max_seq_len={max_seq_len})...")
tokenized = []
skipped = 0
for item in data:
messages = item["messages"]
text = format_chat_to_text(messages, tokenizer)
tokens = tokenizer(
text,
truncation=True,
max_length=max_seq_len,
padding="max_length",
return_tensors="pt",
)
# Only keep examples that have meaningful content
real_tokens = (tokens["attention_mask"].sum().item())
if real_tokens < 10:
skipped += 1
continue
tokenized.append({
"input_ids": tokens["input_ids"].squeeze(0),
"attention_mask": tokens["attention_mask"].squeeze(0),
"labels": tokens["input_ids"].squeeze(0).clone(),
})
if skipped:
print(f" Skipped {skipped} examples (too short)")
print(f" Training on {len(tokenized)} examples")
# ── Checkpoint handling ─────────────────────────────────
ckpt_path = CKPT_DIR / adapter_name
ckpt_path.mkdir(parents=True, exist_ok=True)
start_step = 0
start_epoch = 0
if resume:
latest = None
for f in sorted(ckpt_path.glob("step_*")):
latest = f
if latest:
print(f" Resuming from checkpoint: {latest.name}")
model.load_adapter(str(latest), adapter_name="default")
step_num = int(latest.name.split("_")[1])
start_step = step_num
start_epoch = step_num // (len(tokenized) // grad_accum)
print(f" Resuming at step {start_step}, epoch ~{start_epoch}")
# ── Optimizer ───────────────────────────────────────────
optimizer = torch.optim.AdamW(
[p for p in model.parameters() if p.requires_grad],
lr=lr,
weight_decay=0.01,
)
# ── Training loop ───────────────────────────────────────
total_steps = (len(tokenized) * epochs) // grad_accum
print(f"\n{'='*60}")
print(f" TRAINING: {adapter_name}")
print(f" Epochs: {epochs} | Steps: {total_steps}")
print(f" Batch: {batch_size} x {grad_accum} accum = {batch_size * grad_accum} effective")
print(f" Seq len: {max_seq_len} | LR: {lr}")
est_time = total_steps * 60 # rough estimate: 60s/step
print(f" Est. time: {timedelta(seconds=est_time)}")
print(f"{'='*60}\n")
model.train()
global_step = start_step
running_loss = 0.0
step_times = []
best_loss = float("inf")
for epoch in range(start_epoch, epochs):
print(f" --- Epoch {epoch+1}/{epochs} ---")
# Shuffle training data each epoch
import random
random.shuffle(tokenized)
accum_loss = 0.0
accum_count = 0
for i, batch in enumerate(tokenized):
step_start = time.time()
input_ids = batch["input_ids"].unsqueeze(0) # [1, seq_len]
attention_mask = batch["attention_mask"].unsqueeze(0)
labels = batch["labels"].unsqueeze(0)
# Forward pass
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
)
loss = outputs.loss / grad_accum
loss.backward()
accum_loss += outputs.loss.item()
accum_count += 1
# Gradient accumulation step
if accum_count >= grad_accum:
# Gradient clipping
torch.nn.utils.clip_grad_norm_(
[p for p in model.parameters() if p.requires_grad],
max_norm=1.0,
)
optimizer.step()
optimizer.zero_grad()
global_step += 1
avg_loss = accum_loss / accum_count
running_loss = 0.9 * running_loss + 0.1 * avg_loss if running_loss > 0 else avg_loss
step_time = time.time() - step_start
step_times.append(step_time)
# Logging
if global_step % 5 == 0 or global_step <= 3:
avg_step = sum(step_times[-20:]) / len(step_times[-20:])
remaining = (total_steps - global_step) * avg_step
used, total_ram, _, _ = get_memory_gb()
print(
f" step {global_step:>5}/{total_steps} | "
f"loss={avg_loss:.4f} (avg={running_loss:.4f}) | "
f"{avg_step:.1f}s/step | "
f"RAM={used:.1f}/{total_ram:.1f}GB | "
f"ETA={timedelta(seconds=int(remaining))}"
)
# Save checkpoint
if global_step % save_steps == 0:
save_path = ckpt_path / f"step_{global_step}"
model.save_pretrained(str(save_path))
print(f" >> Checkpoint saved: {save_path.name}")
# Track best
if running_loss < best_loss:
best_loss = running_loss
best_path = ckpt_path / "best"
model.save_pretrained(str(best_path))
accum_loss = 0.0
accum_count = 0
# Periodic memory cleanup
if global_step % 50 == 0:
gc.collect()
# Clean up per-example tensors
del outputs, loss
if global_step % 10 == 0:
gc.collect()
# End of epoch
print(f" Epoch {epoch+1} complete | Running loss: {running_loss:.4f}")
gc.collect()
# ── Save final adapter ──────────────────────────────────
print(f"\n{'='*60}")
print(f" TRAINING COMPLETE")
print(f"{'='*60}")
final_path = ADAPTER_OUT / f"{adapter_name}-lora-cpu"
model.save_pretrained(str(final_path))
tokenizer.save_pretrained(str(final_path))
print(f" Adapter saved: {final_path}")
print(f" Final loss: {running_loss:.4f}")
if step_times:
avg_step = sum(step_times) / len(step_times)
total_time = sum(step_times)
print(f" Avg step time: {avg_step:.1f}s")
print(f" Total training time: {timedelta(seconds=int(total_time))}")
print_memory("final")
# ── Convert to GGUF ─────────────────────────────────────
convert_to_gguf(adapter_name, final_path)
return final_path
def convert_to_gguf(adapter_name, adapter_path):
"""Convert safetensors LoRA adapter to GGUF format for llama.cpp inference."""
if not GGUF_CONVERTER.exists():
print(f"\n GGUF converter not found at: {GGUF_CONVERTER}")
print(f" To convert manually later:")
print(f" python {GGUF_CONVERTER} --base {MODEL_ID} {adapter_path}")
return
gguf_out = ADAPTER_OUT / f"{adapter_name}-lora-f16.gguf"
print(f"\n Converting to GGUF: {gguf_out.name}...")
import subprocess
try:
result = subprocess.run(
[
sys.executable,
str(GGUF_CONVERTER),
"--base", MODEL_ID,
str(adapter_path),
"--outfile", str(gguf_out),
],
capture_output=True,
text=True,
timeout=600,
)
if result.returncode == 0:
size_mb = gguf_out.stat().st_size / 1e6
print(f" GGUF saved: {gguf_out} ({size_mb:.1f} MB)")
print(f" Ready for inference with codette_orchestrator.py!")
else:
print(f" GGUF conversion failed: {result.stderr[:500]}")
except Exception as e:
print(f" GGUF conversion error: {e}")
# ── CLI ────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="CPU-Lean LoRA Trainer for Codette",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python train_cpu_lean.py newton # Train newton adapter
python train_cpu_lean.py empathy --epochs 2 # Train empathy, 2 epochs
python train_cpu_lean.py --list # List available adapters
python train_cpu_lean.py quantum --resume # Resume from checkpoint
Memory: ~18 GB peak. With 16 GB RAM + page file, expect some disk swapping.
Speed: ~30-90s per training step on modern Intel CPU.
""",
)
parser.add_argument("adapter", nargs="?", help="Adapter name to train")
parser.add_argument("--list", action="store_true", help="List available adapters")
parser.add_argument("--epochs", type=int, default=3, help="Training epochs (default: 3)")
parser.add_argument("--rank", type=int, default=8, help="LoRA rank (default: 8)")
parser.add_argument("--alpha", type=int, default=16, help="LoRA alpha (default: 16)")
parser.add_argument("--lr", type=float, default=2e-4, help="Learning rate (default: 2e-4)")
parser.add_argument("--seq-len", type=int, default=256, help="Max sequence length (default: 256)")
parser.add_argument("--grad-accum", type=int, default=16, help="Gradient accumulation steps (default: 16)")
parser.add_argument("--save-steps", type=int, default=100, help="Save checkpoint every N steps (default: 100)")
parser.add_argument("--resume", action="store_true", help="Resume from latest checkpoint")
parser.add_argument("--max-examples", type=int, default=None, help="Limit dataset size (for testing)")
args = parser.parse_args()
print("=" * 60)
print(" CODETTE CPU-LEAN TRAINER (Pipeline 1)")
print("=" * 60)
print_memory("startup")
if args.list or not args.adapter:
print("\nAvailable adapters:")
for name, cfg in ADAPTER_CONFIG.items():
ds_path = DATASET_DIR / cfg["dataset"]
status = f"{cfg['examples']} examples" if ds_path.exists() else "DATASET MISSING"
gguf = ADAPTER_OUT / f"{name}-lora-f16.gguf"
trained = " [TRAINED]" if gguf.exists() else ""
print(f" {name:24s} {status}{trained}")
if not args.adapter:
print("\nUsage: python train_cpu_lean.py <adapter_name>")
return
if args.adapter not in ADAPTER_CONFIG:
print(f"\nUnknown adapter: {args.adapter}")
print(f"Available: {', '.join(ADAPTER_CONFIG.keys())}")
sys.exit(1)
try:
train_adapter(
adapter_name=args.adapter,
epochs=args.epochs,
rank=args.rank,
alpha=args.alpha,
lr=args.lr,
max_seq_len=args.seq_len,
grad_accum=args.grad_accum,
save_steps=args.save_steps,
resume=args.resume,
max_examples=args.max_examples,
)
except KeyboardInterrupt:
print("\n\n Training interrupted by user.")
print(" Use --resume to continue from last checkpoint.")
except Exception as e:
print(f"\n Training failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()