panacea-api / src /data /maneuver_classifier.py
DTanzillo's picture
Upload folder using huggingface_hub
a4b5ecb verified
"""Classify detected satellite maneuvers into avoidance vs routine.
Enriches each maneuver with:
- magnitude_class: micro/small/medium/large based on delta-v
- constellation: starlink/oneweb/iridium/other
- is_stationkeeping: regularity-based detection from maneuver history
- likely_avoidance: heuristic combining all signals
These enrichments improve training label quality for PI-TFT fine-tuning
without changing the model's feature space.
"""
import re
import numpy as np
from datetime import datetime
# Delta-v magnitude bins (m/s)
MAGNITUDE_BINS = [
("micro", 0.0, 0.5),
("small", 0.5, 2.0),
("medium", 2.0, 10.0),
("large", 10.0, float("inf")),
]
# Constellation name patterns
CONSTELLATION_PATTERNS = [
("starlink", re.compile(r"STARLINK", re.IGNORECASE)),
("oneweb", re.compile(r"ONEWEB", re.IGNORECASE)),
("iridium", re.compile(r"IRIDIUM", re.IGNORECASE)),
]
# Stationkeeping regularity threshold (coefficient of variation of intervals)
STATIONKEEPING_CV_THRESHOLD = 0.3
MIN_HISTORY_FOR_SK = 3 # Need at least 3 past maneuvers to detect pattern
def classify_magnitude(delta_v_m_s: float) -> str:
"""Bin delta-v into magnitude class."""
dv = abs(delta_v_m_s)
for label, lo, hi in MAGNITUDE_BINS:
if lo <= dv < hi:
return label
return "large"
def detect_constellation(name: str) -> str:
"""Identify constellation from satellite name."""
for constellation, pattern in CONSTELLATION_PATTERNS:
if pattern.search(name):
return constellation
return "other"
def detect_stationkeeping(history: list[dict]) -> bool:
"""Detect stationkeeping from regularity of past maneuver intervals.
If the coefficient of variation (std/mean) of time intervals between
consecutive maneuvers is below threshold, it's likely stationkeeping.
Args:
history: Past maneuver records for this NORAD ID, each with
'detected_at' ISO timestamp.
Returns:
True if maneuver pattern suggests stationkeeping.
"""
if not history or len(history) < MIN_HISTORY_FOR_SK:
return False
# Parse timestamps and sort
timestamps = []
for h in history:
ts_str = h.get("detected_at", "")
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
timestamps.append(ts.timestamp())
except (ValueError, TypeError):
continue
if len(timestamps) < MIN_HISTORY_FOR_SK:
return False
timestamps.sort()
intervals = np.diff(timestamps)
if len(intervals) < 2:
return False
mean_interval = np.mean(intervals)
if mean_interval <= 0:
return False
cv = np.std(intervals) / mean_interval
return cv < STATIONKEEPING_CV_THRESHOLD
def classify_maneuver(maneuver: dict, history: list[dict] = None) -> dict:
"""Classify a detected maneuver with enrichment flags.
Args:
maneuver: Maneuver dict from detect_maneuvers() with keys:
norad_id, name, delta_v_m_s, delta_a_m, etc.
history: Past maneuver records for same NORAD ID (optional).
Returns:
Dict with enrichment fields added to the original maneuver.
"""
delta_v = maneuver.get("delta_v_m_s", 0.0)
name = maneuver.get("name", "")
magnitude_class = classify_magnitude(delta_v)
constellation = detect_constellation(name)
is_sk = detect_stationkeeping(history) if history else False
# Likely avoidance heuristic
likely_avoidance = False
if not is_sk and magnitude_class in ("micro", "small") and delta_v < 5.0:
likely_avoidance = True
# Starlink CAMs are typically very small (< 1 m/s)
if constellation == "starlink" and delta_v < 1.0:
likely_avoidance = True
enriched = dict(maneuver)
enriched.update({
"magnitude_class": magnitude_class,
"constellation": constellation,
"is_stationkeeping": is_sk,
"likely_avoidance": likely_avoidance,
"enrichment_version": 1,
# Phase B/C defaults — overwritten later if data is available
"has_cdm": False,
"cdm_pc": None,
"cdm_miss_distance_km": None,
"counterfactual_min_distance_km": None,
"would_have_collided": False,
"counterfactual_closest_norad": None,
})
return enriched