Spaces:
Sleeping
Sleeping
| # Generated by Claude Code — 2026-02-13 | |
| """Orbital density features derived from the CRASH Clock framework. | |
| Computes population-level orbital density metrics for each conjunction event, | |
| based on the altitude distribution of all events in the training set. | |
| The key insight from Thiele et al. (2025) "An Orbital House of Cards": | |
| collision rate scales as n² * A_col * v_r — so a conjunction at a crowded | |
| altitude (550 km Starlink shell) is fundamentally riskier than the same | |
| miss_distance at a sparse altitude (1200 km). | |
| These features are computed from the TRAINING set only and applied to | |
| validation/test sets to prevent data leakage. | |
| """ | |
| import json | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| # Physical constants | |
| EARTH_RADIUS_KM = 6371.0 | |
| GM_M3_S2 = 3.986004418e14 # Earth gravitational parameter (m³/s²) | |
| # CRASH Clock cross-sections from Thiele et al. Table (10m-5m-10cm) | |
| A_COL_SAT_SAT = 300.0 # m² (satellite-satellite, 10m approach) | |
| A_COL_SAT_DEBRIS = 79.0 # m² (satellite-debris, 5m approach) | |
| # Altitude binning | |
| BIN_WIDTH_KM = 25 # km per altitude bin | |
| ALT_MIN_KM = 150 | |
| ALT_MAX_KM = 2100 | |
| # Feature names that will be added to DataFrames | |
| DENSITY_FEATURES = [ | |
| "shell_density", # events per km³ in altitude bin | |
| "shell_collision_rate", # Γ from CRASH Clock Eq. 2 (per second) | |
| "local_crash_clock_log", # log10(seconds to expected collision in shell) | |
| "altitude_percentile", # CDF position in event altitude distribution | |
| "n_events_in_shell", # raw count of training events at this altitude | |
| "shell_risk_rate", # fraction of high-risk events in this altitude bin | |
| ] | |
| def _orbital_speed_kms(altitude_km: float) -> float: | |
| """Circular orbital speed in km/s at a given altitude.""" | |
| r_m = (EARTH_RADIUS_KM + altitude_km) * 1000.0 | |
| return np.sqrt(GM_M3_S2 / r_m) / 1000.0 # m/s → km/s | |
| def _mean_relative_speed_kms(altitude_km: float) -> float: | |
| """Average relative encounter speed: v_r = (4/3) * v_orbital (Eq. 7).""" | |
| return (4.0 / 3.0) * _orbital_speed_kms(altitude_km) | |
| def _shell_volume_km3(altitude_km: float, width_km: float) -> float: | |
| """Volume of a spherical shell at given altitude with given width.""" | |
| r_inner = EARTH_RADIUS_KM + altitude_km - width_km / 2.0 | |
| r_outer = EARTH_RADIUS_KM + altitude_km + width_km / 2.0 | |
| return (4.0 / 3.0) * np.pi * (r_outer**3 - r_inner**3) | |
| class OrbitalDensityComputer: | |
| """Computes orbital density features from a training DataFrame. | |
| Fit on training data, then transform any DataFrame (train/val/test) | |
| to add density-based static features per event. | |
| The density is computed from event altitudes, NOT from a full TLE | |
| catalog, so it represents the conjunction density distribution rather | |
| than the full RSO population. For the Kelvins dataset, this captures | |
| where conjunction events cluster (which correlates with RSO density). | |
| """ | |
| def __init__(self, bin_width_km: float = BIN_WIDTH_KM): | |
| self.bin_width_km = bin_width_km | |
| self.bin_edges = np.arange(ALT_MIN_KM, ALT_MAX_KM + bin_width_km, bin_width_km) | |
| self.bin_centers = (self.bin_edges[:-1] + self.bin_edges[1:]) / 2.0 | |
| self.n_bins = len(self.bin_centers) | |
| # Fitted state (populated by fit()) | |
| self.event_counts = None # events per bin | |
| self.density_per_bin = None # events / km³ per bin | |
| self.collision_rate = None # Γ per bin (events/s) | |
| self.crash_clock_log = None # log10(seconds to collision) per bin | |
| self.risk_rate_per_bin = None # fraction high-risk per bin | |
| self.altitude_cdf = None # cumulative distribution | |
| self.is_fitted = False | |
| def _event_altitude(self, df: pd.DataFrame) -> np.ndarray: | |
| """Compute conjunction altitude for each event (last CDM row). | |
| Uses mean of target and chaser perigee altitudes as the approximate | |
| conjunction altitude. Falls back to semi-major axis minus Earth radius. | |
| """ | |
| event_df = df.groupby("event_id").last() | |
| # Primary: mean of perigee altitudes (where most conjunctions happen) | |
| t_alt = np.zeros(len(event_df)) | |
| c_alt = np.zeros(len(event_df)) | |
| if "t_h_per" in event_df.columns: | |
| t_alt = event_df["t_h_per"].fillna(0).values | |
| elif "t_j2k_sma" in event_df.columns: | |
| t_alt = event_df["t_j2k_sma"].fillna(EARTH_RADIUS_KM).values - EARTH_RADIUS_KM | |
| if "c_h_per" in event_df.columns: | |
| c_alt = event_df["c_h_per"].fillna(0).values | |
| elif "c_j2k_sma" in event_df.columns: | |
| c_alt = event_df["c_j2k_sma"].fillna(EARTH_RADIUS_KM).values - EARTH_RADIUS_KM | |
| altitudes = (t_alt + c_alt) / 2.0 | |
| # Clamp to valid range | |
| altitudes = np.clip(altitudes, ALT_MIN_KM, ALT_MAX_KM - 1) | |
| return altitudes, event_df.index.values | |
| def fit(self, train_df: pd.DataFrame) -> "OrbitalDensityComputer": | |
| """Fit density distribution from training data. | |
| Must be called before transform(). Only uses training data | |
| to prevent information leakage into validation/test sets. | |
| """ | |
| altitudes, event_ids = self._event_altitude(train_df) | |
| # Histogram: count events per altitude bin | |
| self.event_counts, _ = np.histogram(altitudes, bins=self.bin_edges) | |
| # Density: events per km³ in each shell | |
| volumes = np.array([ | |
| _shell_volume_km3(c, self.bin_width_km) | |
| for c in self.bin_centers | |
| ]) | |
| self.density_per_bin = self.event_counts / np.maximum(volumes, 1e-6) | |
| # Collision rate per shell: Γ = (1/2) * n² * A_col * v_r * V | |
| # Using satellite-satellite cross-section as the primary concern | |
| self.collision_rate = np.zeros(self.n_bins) | |
| for i, (center, density, volume) in enumerate( | |
| zip(self.bin_centers, self.density_per_bin, volumes) | |
| ): | |
| v_r = _mean_relative_speed_kms(center) # km/s | |
| # Convert A_col from m² to km², v_r already in km/s | |
| a_col_km2 = A_COL_SAT_SAT / 1e6 # m² → km² | |
| # Γ = 0.5 * n² * A * v_r * V (units: per second) | |
| gamma = 0.5 * density**2 * a_col_km2 * v_r * volume | |
| self.collision_rate[i] = gamma | |
| # CRASH Clock per shell: τ = 1/Γ (in seconds), log10 for feature | |
| with np.errstate(divide="ignore"): | |
| tau = 1.0 / np.maximum(self.collision_rate, 1e-30) | |
| self.crash_clock_log = np.log10(np.clip(tau, 1.0, 1e15)) | |
| # Risk rate per bin: fraction of positive events | |
| risk_per_event = train_df.groupby("event_id")["risk"].last() | |
| is_high_risk = (risk_per_event > -5).astype(float).values | |
| self.risk_rate_per_bin = np.zeros(self.n_bins) | |
| for i in range(self.n_bins): | |
| mask = (altitudes >= self.bin_edges[i]) & (altitudes < self.bin_edges[i + 1]) | |
| if mask.sum() > 0: | |
| self.risk_rate_per_bin[i] = is_high_risk[mask].mean() | |
| # Cumulative altitude distribution for percentile feature | |
| sorted_alts = np.sort(altitudes) | |
| self.altitude_cdf = sorted_alts | |
| self.is_fitted = True | |
| print(f" OrbitalDensityComputer fitted on {len(event_ids)} events") | |
| print(f" Altitude range: {altitudes.min():.0f} - {altitudes.max():.0f} km") | |
| print(f" Peak density bin: {self.bin_centers[np.argmax(self.density_per_bin)]:.0f} km " | |
| f"({self.event_counts.max()} events)") | |
| peak_idx = np.argmax(self.collision_rate) | |
| if self.collision_rate[peak_idx] > 0: | |
| print(f" Highest collision rate: {self.bin_centers[peak_idx]:.0f} km " | |
| f"(tau = {10**self.crash_clock_log[peak_idx]:.0f} s)") | |
| return self | |
| def _get_bin_index(self, altitudes: np.ndarray) -> np.ndarray: | |
| """Map altitudes to bin indices.""" | |
| indices = np.digitize(altitudes, self.bin_edges) - 1 | |
| return np.clip(indices, 0, self.n_bins - 1) | |
| def _altitude_percentile(self, altitudes: np.ndarray) -> np.ndarray: | |
| """Compute percentile in the training altitude distribution.""" | |
| return np.searchsorted(self.altitude_cdf, altitudes) / len(self.altitude_cdf) | |
| def transform(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Add density features to a CDM DataFrame. | |
| Features are computed per event_id and broadcast to all CDM rows | |
| (they're static features — same for every CDM in the sequence). | |
| """ | |
| if not self.is_fitted: | |
| raise RuntimeError("Must call fit() before transform()") | |
| df = df.copy() | |
| altitudes, event_ids = self._event_altitude(df) | |
| bin_indices = self._get_bin_index(altitudes) | |
| # Build event-level features | |
| event_features = {} | |
| for i, eid in enumerate(event_ids): | |
| bi = bin_indices[i] | |
| event_features[eid] = { | |
| "shell_density": self.density_per_bin[bi], | |
| "shell_collision_rate": self.collision_rate[bi], | |
| "local_crash_clock_log": self.crash_clock_log[bi], | |
| "altitude_percentile": self._altitude_percentile( | |
| np.array([altitudes[i]]) | |
| )[0], | |
| "n_events_in_shell": float(self.event_counts[bi]), | |
| "shell_risk_rate": self.risk_rate_per_bin[bi], | |
| } | |
| # Map features to all CDM rows via event_id | |
| for col in DENSITY_FEATURES: | |
| df[col] = df["event_id"].map( | |
| {eid: feats[col] for eid, feats in event_features.items()} | |
| ).fillna(0.0) | |
| return df | |
| def save(self, path: Path): | |
| """Save fitted state to JSON for inference.""" | |
| if not self.is_fitted: | |
| raise RuntimeError("Must call fit() before save()") | |
| state = { | |
| "bin_width_km": self.bin_width_km, | |
| "bin_edges": self.bin_edges.tolist(), | |
| "bin_centers": self.bin_centers.tolist(), | |
| "event_counts": self.event_counts.tolist(), | |
| "density_per_bin": self.density_per_bin.tolist(), | |
| "collision_rate": self.collision_rate.tolist(), | |
| "crash_clock_log": self.crash_clock_log.tolist(), | |
| "risk_rate_per_bin": self.risk_rate_per_bin.tolist(), | |
| "altitude_cdf": self.altitude_cdf.tolist(), | |
| } | |
| Path(path).parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w") as f: | |
| json.dump(state, f, indent=2) | |
| def load(cls, path: Path) -> "OrbitalDensityComputer": | |
| """Load fitted state from JSON.""" | |
| with open(path) as f: | |
| state = json.load(f) | |
| obj = cls(bin_width_km=state["bin_width_km"]) | |
| obj.bin_edges = np.array(state["bin_edges"]) | |
| obj.bin_centers = np.array(state["bin_centers"]) | |
| obj.n_bins = len(obj.bin_centers) | |
| obj.event_counts = np.array(state["event_counts"]) | |
| obj.density_per_bin = np.array(state["density_per_bin"]) | |
| obj.collision_rate = np.array(state["collision_rate"]) | |
| obj.crash_clock_log = np.array(state["crash_clock_log"]) | |
| obj.risk_rate_per_bin = np.array(state["risk_rate_per_bin"]) | |
| obj.altitude_cdf = np.array(state["altitude_cdf"]) | |
| obj.is_fitted = True | |
| return obj | |