Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| """ | |
| Zarr storage layer for efficient tensor serialization. | |
| Stores instrumentation data to disk using Zarr with Blosc compression: | |
| - Attention tensors: chunked by (layer, head) for fast slice access | |
| - Residual norms, logits: standard chunking | |
| - Metadata: JSON files | |
| Storage structure: | |
| /tmp/runs/{run_id}/ | |
| βββ tensors/ | |
| β βββ attention.zarr/ | |
| β βββ residuals.zarr/ | |
| β βββ logits.zarr/ | |
| βββ metadata.json | |
| βββ telemetry.jsonl | |
| """ | |
| # Optional imports for zarr storage (not available on ARM64) | |
| try: | |
| import zarr | |
| import numcodecs | |
| ZARR_AVAILABLE = True | |
| except ImportError: | |
| zarr = None | |
| numcodecs = None | |
| ZARR_AVAILABLE = False | |
| import numpy as np | |
| import torch | |
| import json | |
| import os | |
| import shutil | |
| from typing import Dict, Any, Optional, List | |
| from pathlib import Path | |
| from datetime import datetime | |
| import logging | |
| from .instrumentation import InstrumentationData, TokenMetadata, LayerMetadata | |
| logger = logging.getLogger(__name__) | |
| class ZarrStorage: | |
| """ | |
| Manages Zarr storage for instrumentation data. | |
| Features: | |
| - Blosc compression (>3x compression ratio) | |
| - Chunking optimized for visualization access patterns | |
| - Lazy loading support | |
| - Export to zip bundles for study reproducibility | |
| """ | |
| def __init__(self, run_id: str, base_dir: str = "/tmp/runs"): | |
| self.run_id = run_id | |
| self.base_dir = Path(base_dir) | |
| self.run_dir = self.base_dir / run_id | |
| self.tensor_dir = self.run_dir / "tensors" | |
| # Create directories | |
| self.tensor_dir.mkdir(parents=True, exist_ok=True) | |
| # Blosc compressor for efficient compression (if zarr available) | |
| if ZARR_AVAILABLE: | |
| self.compressor = numcodecs.Blosc( | |
| cname='zstd', # zstd algorithm (good compression + speed) | |
| clevel=5, # Compression level (1-9, 5 is balanced) | |
| shuffle=numcodecs.Blosc.SHUFFLE # Byte shuffle for better compression | |
| ) | |
| else: | |
| self.compressor = None | |
| logger.warning("Zarr not available - tensor storage will be disabled") | |
| def save_instrumentation_data(self, data: InstrumentationData) -> Dict[str, Any]: | |
| """ | |
| Save complete instrumentation data to Zarr + JSON. | |
| Args: | |
| data: InstrumentationData from ModelInstrumentor | |
| Returns: | |
| Dictionary with file paths and sizes | |
| """ | |
| if not ZARR_AVAILABLE: | |
| logger.warning("Zarr not available - skipping instrumentation data save") | |
| return {'run_id': self.run_id, 'error': 'zarr_not_available'} | |
| logger.info(f"Saving instrumentation data for run {self.run_id}...") | |
| result = { | |
| 'run_id': self.run_id, | |
| 'paths': {}, | |
| 'sizes_mb': {} | |
| } | |
| # 1. Save attention tensors (largest data) | |
| if data.attention_tensors is not None: | |
| attn_path = self._save_attention_tensors(data.attention_tensors) | |
| result['paths']['attention'] = str(attn_path) | |
| result['sizes_mb']['attention'] = self._get_dir_size_mb(attn_path) | |
| # 2. Save metadata (JSON) | |
| metadata_path = self._save_metadata(data) | |
| result['paths']['metadata'] = str(metadata_path) | |
| result['sizes_mb']['metadata'] = self._get_file_size_mb(metadata_path) | |
| # 3. Save token data (JSON) | |
| tokens_path = self._save_token_data(data.tokens) | |
| result['paths']['tokens'] = str(tokens_path) | |
| result['sizes_mb']['tokens'] = self._get_file_size_mb(tokens_path) | |
| # 4. Save layer metadata (JSON) | |
| layer_meta_path = self._save_layer_metadata(data.layer_metadata) | |
| result['paths']['layer_metadata'] = str(layer_meta_path) | |
| result['sizes_mb']['layer_metadata'] = self._get_file_size_mb(layer_meta_path) | |
| # Summary | |
| total_size = sum(result['sizes_mb'].values()) | |
| result['total_size_mb'] = total_size | |
| logger.info(f"β Saved {total_size:.2f} MB to {self.run_dir}") | |
| return result | |
| def _save_attention_tensors(self, attention_tensor: torch.Tensor) -> Path: | |
| """ | |
| Save attention tensors with optimal chunking. | |
| Input shape: [num_tokens, num_layers, num_heads, seq_len, seq_len] | |
| Chunking: (1, 1, 1, seq_len, seq_len) - one chunk per layer/head | |
| This allows fast loading of individual head attention without | |
| loading the entire tensor. | |
| """ | |
| path = self.tensor_dir / "attention.zarr" | |
| # Convert to numpy (Zarr doesn't support torch tensors directly) | |
| attn_np = attention_tensor.cpu().numpy() | |
| # Determine chunk shape | |
| num_tokens, num_layers, num_heads, seq_len, _ = attn_np.shape | |
| chunk_shape = (1, 1, 1, seq_len, seq_len) # One chunk per layer/head | |
| # Save with compression | |
| z = zarr.open( | |
| str(path), | |
| mode='w', | |
| shape=attn_np.shape, | |
| chunks=chunk_shape, | |
| dtype=attn_np.dtype, | |
| compressor=self.compressor | |
| ) | |
| z[:] = attn_np | |
| logger.info(f" Attention: shape={attn_np.shape}, chunks={chunk_shape}") | |
| return path | |
| def _save_metadata(self, data: InstrumentationData) -> Path: | |
| """Save run metadata as JSON""" | |
| path = self.run_dir / "metadata.json" | |
| metadata = { | |
| 'run_id': data.run_id, | |
| 'seed': data.seed, | |
| 'model_name': data.model_name, | |
| 'timestamp': data.timestamp, | |
| 'timestamp_iso': datetime.fromtimestamp(data.timestamp).isoformat(), | |
| 'prompt': data.prompt, | |
| 'max_tokens': data.max_tokens, | |
| 'temperature': data.temperature, | |
| 'top_k': data.top_k, | |
| 'top_p': data.top_p, | |
| 'total_time_ms': data.total_time_ms, | |
| 'num_layers': data.num_layers, | |
| 'num_heads': data.num_heads, | |
| 'seq_length': data.seq_length, | |
| 'num_generated_tokens': len(data.tokens) | |
| } | |
| with open(path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| return path | |
| def _save_token_data(self, tokens: List[TokenMetadata]) -> Path: | |
| """Save token metadata as JSON""" | |
| path = self.run_dir / "tokens.json" | |
| tokens_data = [ | |
| { | |
| 'token_id': t.token_id, | |
| 'text': t.text, | |
| 'position': t.position, | |
| 'logprob': t.logprob, | |
| 'entropy': t.entropy, | |
| 'top_k_tokens': t.top_k_tokens, | |
| 'byte_length': t.byte_length, | |
| 'timestamp_ms': t.timestamp_ms | |
| } | |
| for t in tokens | |
| ] | |
| with open(path, 'w') as f: | |
| json.dump(tokens_data, f, indent=2) | |
| return path | |
| def _save_layer_metadata(self, layer_metadata: List[List[LayerMetadata]]) -> Path: | |
| """Save layer-level metadata as JSON""" | |
| path = self.run_dir / "layer_metadata.json" | |
| # Convert to serializable format | |
| layer_data = [ | |
| [ | |
| { | |
| 'layer_idx': lm.layer_idx, | |
| 'residual_norm': lm.residual_norm, | |
| 'time_ms': lm.time_ms, | |
| 'attention_output_norm': lm.attention_output_norm, | |
| 'ffn_output_norm': lm.ffn_output_norm | |
| } | |
| for lm in token_layers | |
| ] | |
| for token_layers in layer_metadata | |
| ] | |
| with open(path, 'w') as f: | |
| json.dump(layer_data, f, indent=2) | |
| return path | |
| def load_attention_slice(self, layer_idx: int, head_idx: int, token_idx: int = 0) -> np.ndarray: | |
| """ | |
| Load a single attention head's matrix for a specific token. | |
| Args: | |
| layer_idx: Layer index (0-31 for Code Llama) | |
| head_idx: Head index (0-31 for Code Llama) | |
| token_idx: Token generation step (default 0 = first token) | |
| Returns: | |
| Attention matrix [seq_len, seq_len] | |
| """ | |
| if not ZARR_AVAILABLE: | |
| raise RuntimeError("Zarr not available - cannot load attention data") | |
| path = self.tensor_dir / "attention.zarr" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Attention data not found at {path}") | |
| # Open in read mode | |
| z = zarr.open(str(path), mode='r') | |
| # Load specific slice | |
| # Shape: [num_tokens, num_layers, num_heads, seq_len, seq_len] | |
| attention_matrix = z[token_idx, layer_idx, head_idx, :, :] | |
| return attention_matrix | |
| def load_metadata(self) -> Dict[str, Any]: | |
| """Load run metadata""" | |
| path = self.run_dir / "metadata.json" | |
| with open(path, 'r') as f: | |
| return json.load(f) | |
| def load_tokens(self) -> List[Dict[str, Any]]: | |
| """Load token metadata""" | |
| path = self.run_dir / "tokens.json" | |
| with open(path, 'r') as f: | |
| return json.load(f) | |
| def export_bundle(self, output_path: Optional[Path] = None) -> Path: | |
| """ | |
| Create a zip bundle of the entire run directory for export. | |
| Args: | |
| output_path: Optional custom output path (default: /tmp/run_{run_id}.zip) | |
| Returns: | |
| Path to created zip file | |
| """ | |
| if output_path is None: | |
| output_path = self.base_dir / f"run_{self.run_id}.zip" | |
| logger.info(f"Creating export bundle: {output_path}") | |
| # Create zip archive | |
| shutil.make_archive( | |
| str(output_path.with_suffix('')), # Remove .zip, make_archive adds it | |
| 'zip', | |
| self.run_dir | |
| ) | |
| bundle_size_mb = self._get_file_size_mb(output_path) | |
| logger.info(f"β Created bundle: {bundle_size_mb:.2f} MB") | |
| return output_path | |
| def cleanup(self): | |
| """Delete run directory and all contents""" | |
| if self.run_dir.exists(): | |
| shutil.rmtree(self.run_dir) | |
| logger.info(f"Cleaned up run directory: {self.run_dir}") | |
| def _get_dir_size_mb(self, path: Path) -> float: | |
| """Get total size of directory in MB""" | |
| total_size = sum( | |
| f.stat().st_size for f in path.rglob('*') if f.is_file() | |
| ) | |
| return total_size / (1024 * 1024) | |
| def _get_file_size_mb(self, path: Path) -> float: | |
| """Get file size in MB""" | |
| return path.stat().st_size / (1024 * 1024) | |
| def generate_run_id() -> str: | |
| """ | |
| Generate unique Run ID. | |
| Format: R{YYYY-MM-DD}-{HHMM}-{hash} | |
| Example: R2025-11-01-1430-a7f3 | |
| """ | |
| now = datetime.now() | |
| date_str = now.strftime("%Y-%m-%d") | |
| time_str = now.strftime("%H%M") | |
| # Short hash from timestamp microseconds | |
| hash_str = hex(now.microsecond)[-4:] | |
| return f"R{date_str}-{time_str}-{hash_str}" | |
| def create_telemetry_log(run_id: str, base_dir: str = "/tmp/runs") -> Path: | |
| """ | |
| Create telemetry JSONL file for logging events. | |
| Returns path to telemetry file. | |
| """ | |
| run_dir = Path(base_dir) / run_id | |
| run_dir.mkdir(parents=True, exist_ok=True) | |
| telemetry_path = run_dir / "telemetry.jsonl" | |
| # Initialize with run.start event | |
| with open(telemetry_path, 'w') as f: | |
| f.write(json.dumps({ | |
| 'event': 'run.start', | |
| 'run_id': run_id, | |
| 'timestamp': datetime.now().timestamp() | |
| }) + '\n') | |
| return telemetry_path | |
| def log_telemetry_event(run_id: str, event: str, data: Dict[str, Any], | |
| base_dir: str = "/tmp/runs"): | |
| """ | |
| Append telemetry event to JSONL log. | |
| Args: | |
| run_id: Run identifier | |
| event: Event name (e.g., 'token.emit', 'ablation.run') | |
| data: Event-specific data | |
| base_dir: Base directory for runs | |
| """ | |
| telemetry_path = Path(base_dir) / run_id / "telemetry.jsonl" | |
| event_data = { | |
| 'event': event, | |
| 'timestamp': datetime.now().timestamp(), | |
| **data | |
| } | |
| with open(telemetry_path, 'a') as f: | |
| f.write(json.dumps(event_data) + '\n') | |
| # Example usage | |
| if __name__ == "__main__": | |
| print("Storage module loaded successfully") | |
| # Example: Create a mock run | |
| run_id = generate_run_id() | |
| print(f"Generated Run ID: {run_id}") | |
| storage = ZarrStorage(run_id) | |
| print(f"Storage directory: {storage.run_dir}") | |