| """ |
| KernelX Intelligence Layer — Data Ingestion and Preprocessing (Stage 1) |
| |
| Reads raw state_transitions.jsonl from the bridge's TrajectoryManager, |
| applies feature scaling (symlog for huge counters), drops sparse-zero |
| features, and produces train/val/test splits for World Model and |
| Strategist training. |
| |
| Usage: |
| python -m training.data.preprocess --input data/state_transitions.jsonl |
| """ |
|
|
| import json |
| import argparse |
| import sys |
| from pathlib import Path |
| from typing import List, Dict, Any |
|
|
| import numpy as np |
|
|
| |
| |
| |
|
|
| CONFIG_PATH = Path(__file__).parent / "preprocessing_config.json" |
|
|
| def load_config() -> dict: |
| with open(CONFIG_PATH) as f: |
| return json.load(f) |
|
|
| CONFIG = load_config() |
|
|
| SYMLOG_FEATURES = CONFIG["symlog_features"] |
| ACTIVE_FEATURES = CONFIG["active_features"] |
| FEATURE_NAMES = CONFIG["feature_names"] |
| SPARSE_ZERO = CONFIG["sparse_zero_features"] |
|
|
| |
| |
| |
|
|
| IDX_CPU = 0 |
| IDX_PRIO = 1 |
| IDX_STATIC_PRIO = 2 |
| IDX_NORMAL_PRIO = 3 |
| IDX_EXEC_NS = 4 |
| IDX_VRUNTIME = 5 |
| IDX_MIGRATIONS = 6 |
| IDX_CPUS_ALLOWED = 7 |
| IDX_CTX_SWITCHES = 8 |
| IDX_WAIT_US = 9 |
|
|
| |
| |
| |
|
|
| def symmetric_log(x: float) -> float: |
| """sgn(x) * ln(1 + |x|) — compresses huge values while preserving sign.""" |
| return float(np.sign(x) * np.log1p(np.abs(x))) |
|
|
|
|
| def preprocess_features(raw_features: List[float]) -> List[float]: |
| """Transform a raw 24D feature vector: symlog the big counters.""" |
| f = list(raw_features) |
| for idx in SYMLOG_FEATURES: |
| f[idx] = symmetric_log(f[idx]) |
| return f |
|
|
|
|
| def extract_active(scaled_features: List[float]) -> List[float]: |
| """Keep only the active (non-zero, information-carrying) features.""" |
| return [scaled_features[i] for i in ACTIVE_FEATURES] |
|
|
| |
| |
| |
|
|
| def format_state(active_vector: List[float]) -> str: |
| """Convert a 10D active feature vector into a compact text string.""" |
| parts = [] |
| for name, val in zip(FEATURE_NAMES, active_vector): |
| if val == int(val): |
| parts.append(f"{name}:{int(val)}") |
| else: |
| parts.append(f"{name}:{val:.2f}") |
| return " | ".join(parts) |
|
|
| |
| |
| |
|
|
| def preprocess_record(record: Dict[str, Any]) -> Dict[str, Any]: |
| """Transform a single raw JSONL record into training-ready format.""" |
| s_t_scaled = preprocess_features(record["state_t"]["features"]) |
| s_t1_scaled = preprocess_features(record["state_t_next"]["features"]) |
|
|
| s_t_active = extract_active(s_t_scaled) |
| s_t1_active = extract_active(s_t1_scaled) |
|
|
| return { |
| "state": s_t_active, |
| "action": record["action"], |
| "reward": record["reward"], |
| "next_state": s_t1_active, |
| "pid": record["state_t"]["pid"], |
| "cpu": record["state_t"]["cpu"], |
| "timestamp": record["state_t"]["timestamp"], |
| } |
|
|
| |
| |
| |
|
|
| def audit_dataset(records: List[Dict]) -> None: |
| """Print per-feature statistics for the raw dataset.""" |
| all_features = [] |
| for r in records: |
| all_features.append(r["state_t"]["features"]) |
| all_features.append(r["state_t_next"]["features"]) |
|
|
| arr = np.array(all_features, dtype=np.float64) |
|
|
| print(f"\nTotal transitions: {len(records)}") |
| print(f"Total feature vectors: {len(all_features)}") |
| print(f"\n{'Idx':<5} {'Min':<22} {'Max':<22} {'Mean':<22} {'Std':<22} {'Zeros%':<10}") |
| print("-" * 103) |
| for i in range(24): |
| col = arr[:, i] |
| zero_pct = (col == 0).sum() / len(col) * 100 |
| print(f"{i:<5} {col.min():<22.2f} {col.max():<22.2f} {col.mean():<22.2f} {col.std():<22.2f} {zero_pct:<10.1f}") |
|
|
| print(f"\nNaN count: {np.isnan(arr).sum()}") |
| print(f"Inf count: {np.isinf(arr).sum()}") |
|
|
| actions = [r["action"] for r in records] |
| rewards = [r["reward"] for r in records] |
| print(f"\nAction — unique values: {sorted(set(actions))}") |
| print(f"Reward — min: {min(rewards)}, max: {max(rewards)}, mean: {np.mean(rewards):.2f}, std: {np.std(rewards):.2f}") |
|
|
|
|
| |
| |
| |
|
|
| def split_chronological(processed: List[Dict], train_ratio=0.8, val_ratio=0.1): |
| """Split processed records chronologically (NOT randomly).""" |
| processed.sort(key=lambda x: x["timestamp"]) |
| n = len(processed) |
| train_end = int(n * train_ratio) |
| val_end = int(n * (train_ratio + val_ratio)) |
| return processed[:train_end], processed[train_end:val_end], processed[val_end:] |
|
|
| |
| |
| |
|
|
| def run_pipeline(input_path: str, output_dir: str, audit: bool = True): |
| """Full preprocessing pipeline: audit -> scale -> split -> save.""" |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| print(f"Loading raw data from {input_path} ...") |
| records = [] |
| with open(input_path) as f: |
| for line in f: |
| line = line.strip() |
| if line: |
| records.append(json.loads(line)) |
|
|
| if not records: |
| print("ERROR: No records found in input file.") |
| sys.exit(1) |
|
|
| print(f"Loaded {len(records)} raw transitions.") |
|
|
| |
| if audit: |
| audit_dataset(records) |
|
|
| |
| print("\nPreprocessing (symlog scaling + active feature extraction) ...") |
| processed = [preprocess_record(r) for r in records] |
|
|
| |
| sample = processed[0] |
| print(f"\nSample preprocessed state (10D):") |
| print(f" {format_state(sample['state'])}") |
| print(f"Sample preprocessed next_state:") |
| print(f" {format_state(sample['next_state'])}") |
|
|
| |
| processed_path = output_dir / "processed_transitions.jsonl" |
| with open(processed_path, "w") as f: |
| for p in processed: |
| f.write(json.dumps(p) + "\n") |
| print(f"\nSaved {len(processed)} processed records to {processed_path}") |
|
|
| |
| train, val, test = split_chronological(processed) |
| for split_name, split_data in [("train", train), ("val", val), ("test", test)]: |
| split_path = output_dir / f"{split_name}.jsonl" |
| with open(split_path, "w") as f: |
| for item in split_data: |
| f.write(json.dumps(item) + "\n") |
| print(f"{split_name}: {len(split_data)} records -> {split_path}") |
|
|
| print("\nPreprocessing complete.") |
| return train, val, test |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="KernelX data preprocessing pipeline") |
| parser.add_argument("--input", required=True, help="Path to raw state_transitions.jsonl") |
| parser.add_argument("--output-dir", default=str(Path(__file__).parent), help="Output directory for processed data") |
| parser.add_argument("--no-audit", action="store_true", help="Skip the dataset audit step") |
| args = parser.parse_args() |
|
|
| run_pipeline(args.input, args.output_dir, audit=not args.no_audit) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|