| """ |
| AirTrackLM - Data Pipeline |
| ========================== |
| Converts raw ADS-B (lat, lon, alt, timestamp) to model-ready tensors. |
| |
| Pipeline: |
| 1. Load trajectories from `traffic` library or raw CSV |
| 2. Resample to fixed time interval (default 5s) |
| 3. Convert lat/lon/alt to ENU (East-North-Up) using first lat/lon point as origin |
| 4. Compute velocity via 3-point central derivative on ENU positions |
| 5. Derive COG, SOG from x-y ground velocity; ROT from COG; altitude rate from z velocity |
| 6. Binary geohash encoding (40-bit per axis, following LLM4STP approach) |
| 7. Discretize features into bins |
| 8. Compute uncertainty scores |
| 9. Build sliding-window PyTorch Dataset |
| |
| KEY DESIGN CHOICES: |
| - Time: sub-second (fractional) resolution via float64 Unix timestamps + sinusoidal encoding |
| - Geohash: 40-bit binary per axis with PER-TRAJECTORY normalization for max spatial resolution |
| 40 bits → 2^40 ≈ 10^12 levels. For a 500km trajectory range, that's ~0.5μm resolution. |
| Tighter than any geohash resolution level. |
| - COG/SOG: derived from ENU velocity components via 3-point central derivative (not from raw lat/lon) |
| - ENU origin: first (lat, lon) of each trajectory |
| """ |
|
|
| import numpy as np |
| import torch |
| from torch.utils.data import Dataset, DataLoader |
| from typing import Optional, Tuple, List, Dict |
| import pyproj |
| from dataclasses import dataclass, field |
|
|
|
|
| |
| |
| |
|
|
| class ENUConverter: |
| """ |
| Convert WGS84 (lat, lon, alt) to local East-North-Up (ENU). |
| Origin = first point of each trajectory. |
| """ |
| |
| def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0.0): |
| self.origin_lat = origin_lat |
| self.origin_lon = origin_lon |
| self.origin_alt = origin_alt |
| |
| self.ecef = pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84') |
| self.lla = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') |
| self.transformer_to_ecef = pyproj.Transformer.from_proj(self.lla, self.ecef, always_xy=True) |
| self.transformer_to_lla = pyproj.Transformer.from_proj(self.ecef, self.lla, always_xy=True) |
| |
| self.x0, self.y0, self.z0 = self.transformer_to_ecef.transform( |
| origin_lon, origin_lat, origin_alt |
| ) |
| |
| lat_r = np.radians(origin_lat) |
| lon_r = np.radians(origin_lon) |
| self.R = np.array([ |
| [-np.sin(lon_r), np.cos(lon_r), 0 ], |
| [-np.sin(lat_r)*np.cos(lon_r), -np.sin(lat_r)*np.sin(lon_r), np.cos(lat_r)], |
| [ np.cos(lat_r)*np.cos(lon_r), np.cos(lat_r)*np.sin(lon_r), np.sin(lat_r)] |
| ]) |
| |
| def to_enu(self, lats, lons, alts): |
| x, y, z = self.transformer_to_ecef.transform(lons, lats, alts) |
| dx = x - self.x0 |
| dy = y - self.y0 |
| dz = z - self.z0 |
| ecef_delta = np.stack([dx, dy, dz], axis=0) |
| enu = self.R @ ecef_delta |
| return enu[0], enu[1], enu[2] |
| |
| def from_enu(self, east, north, up): |
| enu = np.stack([east, north, up], axis=0) |
| ecef_delta = self.R.T @ enu |
| x = ecef_delta[0] + self.x0 |
| y = ecef_delta[1] + self.y0 |
| z = ecef_delta[2] + self.z0 |
| lons, lats, alts = self.transformer_to_lla.transform(x, y, z) |
| return lats, lons, alts |
|
|
|
|
| |
| |
| |
|
|
| def three_point_derivative(values: np.ndarray, t: np.ndarray) -> np.ndarray: |
| """ |
| 3-point central derivative. Interior: (f(i+1) - f(i-1)) / (t(i+1) - t(i-1)) |
| Endpoints: forward/backward difference. |
| """ |
| N = len(values) |
| deriv = np.zeros(N) |
| if N < 2: |
| return deriv |
| |
| dt_fwd = t[1] - t[0] |
| if dt_fwd > 0: |
| deriv[0] = (values[1] - values[0]) / dt_fwd |
| |
| if N > 2: |
| dt_span = t[2:] - t[:-2] |
| mask = dt_span > 0 |
| val_diff = values[2:] - values[:-2] |
| deriv[1:-1] = np.where(mask, val_diff / np.maximum(dt_span, 1e-10), 0.0) |
| |
| dt_bwd = t[-1] - t[-2] |
| if dt_bwd > 0: |
| deriv[-1] = (values[-1] - values[-2]) / dt_bwd |
| |
| return deriv |
|
|
|
|
| |
| |
| |
|
|
| def derive_features_enu(east, north, up, timestamps): |
| """ |
| Derive COG, SOG, ROT, alt_rate from ENU positions using 3-point central derivatives. |
| |
| COG = atan2(vx_east, vy_north) → bearing from North, clockwise [0, 360) |
| SOG = sqrt(vx² + vy²) converted to knots |
| ROT = d(COG)/dt via 3-point derivative on unwrapped COG |
| alt_rate = vz converted to ft/min |
| """ |
| t = timestamps - timestamps[0] |
| |
| vx = three_point_derivative(east, t) |
| vy = three_point_derivative(north, t) |
| vz = three_point_derivative(up, t) |
| |
| sog_ms = np.sqrt(vx**2 + vy**2) |
| sog_knots = sog_ms * 1.94384 |
| |
| |
| cog_deg = np.degrees(np.arctan2(vx, vy)) % 360 |
| |
| |
| cog_unwrapped = np.unwrap(np.radians(cog_deg)) |
| rot_rad_s = three_point_derivative(cog_unwrapped, t) |
| rot_deg_s = np.degrees(rot_rad_s) |
| |
| |
| alt_rate_ftmin = vz * 196.85 |
| |
| return { |
| 'vx': vx, 'vy': vy, 'vz': vz, |
| 'COG': cog_deg, 'SOG': sog_knots, |
| 'ROT': rot_deg_s, 'alt_rate': alt_rate_ftmin, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def binary_geohash_encode(values, precision=40, v_min=0.0, v_max=1.0): |
| """Successive bisection encoding to binary. Matches LLM4STP num2bits().""" |
| N = len(values) |
| bits = np.zeros((N, precision), dtype=np.int64) |
| _min = np.full(N, v_min) |
| _max = np.full(N, v_max) |
| for p in range(precision): |
| mid = (_min + _max) / 2 |
| mask = values > mid |
| bits[:, p] = mask.astype(np.int64) |
| _min = np.where(mask, mid, _min) |
| _max = np.where(mask, _max, mid) |
| return bits |
|
|
|
|
| def binary_geohash_decode(bits, precision=40, v_min=0.0, v_max=1.0): |
| N = bits.shape[0] |
| _min = np.full(N, v_min) |
| _max = np.full(N, v_max) |
| for p in range(precision): |
| mid = (_min + _max) / 2 |
| mask = bits[:, p].astype(bool) |
| _min = np.where(mask, mid, _min) |
| _max = np.where(mask, _max, mid) |
| return (_min + _max) / 2 |
|
|
|
|
| class GeohashEncoder: |
| """ |
| 3D geohash encoder for ENU coordinates. |
| 40 bits per axis × 3 axes = 120 bits per timestep. |
| |
| Uses PER-TRAJECTORY normalization with a small margin so the full |
| bit range encodes just the spatial extent of each trajectory. |
| For a typical trajectory spanning ~200km, 40 bits gives ~0.2mm resolution. |
| """ |
| |
| def __init__(self, precision=40): |
| self.precision = precision |
| self.e_min = self.e_max = None |
| self.n_min = self.n_max = None |
| self.u_min = self.u_max = None |
| |
| def fit(self, east, north, up, margin=0.05): |
| """Fit normalization bounds with margin.""" |
| for attr, data in [('e', east), ('n', north), ('u', up)]: |
| drange = data.max() - data.min() |
| m = margin * max(drange, 100.0) |
| setattr(self, f'{attr}_min', data.min() - m) |
| setattr(self, f'{attr}_max', data.max() + m) |
| |
| def _normalize(self, values, v_min, v_max): |
| return np.clip((values - v_min) / max(v_max - v_min, 1e-10), 0.0, 1.0) |
| |
| def encode(self, east, north, up): |
| e_norm = self._normalize(east, self.e_min, self.e_max) |
| n_norm = self._normalize(north, self.n_min, self.n_max) |
| u_norm = self._normalize(up, self.u_min, self.u_max) |
| e_bits = binary_geohash_encode(e_norm, self.precision) |
| n_bits = binary_geohash_encode(n_norm, self.precision) |
| u_bits = binary_geohash_encode(u_norm, self.precision) |
| return np.concatenate([e_bits, n_bits, u_bits], axis=1) |
| |
| def get_bounds(self): |
| return { |
| 'e_min': self.e_min, 'e_max': self.e_max, |
| 'n_min': self.n_min, 'n_max': self.n_max, |
| 'u_min': self.u_min, 'u_max': self.u_max, |
| } |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class FeatureBins: |
| """Feature discretization configuration.""" |
| cog_edges: np.ndarray = field(default_factory=lambda: np.linspace(0, 360, 181)) |
| sog_edges: np.ndarray = field(default_factory=lambda: np.linspace(0, 600, 301)) |
| rot_edges: np.ndarray = field(default_factory=lambda: np.linspace(-6, 6, 121)) |
| alt_rate_edges: np.ndarray = field(default_factory=lambda: np.linspace(-6000, 6000, 121)) |
| |
| @property |
| def n_cog_bins(self): return len(self.cog_edges) - 1 |
| @property |
| def n_sog_bins(self): return len(self.sog_edges) - 1 |
| @property |
| def n_rot_bins(self): return len(self.rot_edges) - 1 |
| @property |
| def n_alt_rate_bins(self): return len(self.alt_rate_edges) - 1 |
| |
| def _digitize(self, values, edges): |
| return np.clip(np.digitize(values, edges) - 1, 0, len(edges) - 2) |
| |
| def encode_cog(self, cog): return self._digitize(cog, self.cog_edges) |
| def encode_sog(self, sog): return self._digitize(sog, self.sog_edges) |
| def encode_rot(self, rot): return self._digitize(np.clip(rot, -6, 6), self.rot_edges) |
| def encode_alt_rate(self, ar): return self._digitize(np.clip(ar, -6000, 6000), self.alt_rate_edges) |
|
|
|
|
| |
| |
| |
|
|
| def extract_temporal_features(timestamps): |
| """Extract temporal features preserving fractional second precision.""" |
| import datetime |
| |
| dts = [datetime.datetime.utcfromtimestamp(t) for t in timestamps] |
| hours = np.array([d.hour for d in dts], dtype=np.int64) |
| dows = np.array([d.weekday() for d in dts], dtype=np.int64) |
| months = np.array([d.month - 1 for d in dts], dtype=np.int64) |
| |
| |
| second_of_day = np.array([ |
| d.hour * 3600 + d.minute * 60 + d.second + d.microsecond / 1e6 |
| for d in dts |
| ], dtype=np.float64) |
| |
| dt = np.zeros(len(timestamps), dtype=np.float64) |
| dt[1:] = np.diff(timestamps) |
| |
| return { |
| 'second_of_day': second_of_day, |
| 'hour': hours, 'dow': dows, 'month': months, |
| 'dt': dt, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class TrajectoryProcessor: |
| def __init__(self, resample_dt=5.0, geohash_precision=40, n_uncertainty_bins=16, |
| feature_bins=None, min_trajectory_len=20): |
| self.resample_dt = resample_dt |
| self.geohash_precision = geohash_precision |
| self.n_uncertainty_bins = n_uncertainty_bins |
| self.feature_bins = feature_bins or FeatureBins() |
| self.min_trajectory_len = min_trajectory_len |
| self.geohash_encoder = GeohashEncoder(precision=geohash_precision) |
| self._fitted = False |
| |
| def resample_trajectory(self, timestamps, lats, lons, alts): |
| t_start, t_end = timestamps[0], timestamps[-1] |
| n_points = int((t_end - t_start) / self.resample_dt) + 1 |
| if n_points < 2: |
| return timestamps, lats, lons, alts |
| t_new = np.linspace(t_start, t_start + (n_points - 1) * self.resample_dt, n_points) |
| return t_new, np.interp(t_new, timestamps, lats), np.interp(t_new, timestamps, lons), np.interp(t_new, timestamps, alts) |
| |
| def process_trajectory(self, timestamps, lats, lons, alts, metadata=None): |
| sort_idx = np.argsort(timestamps) |
| timestamps, lats, lons, alts = timestamps[sort_idx], lats[sort_idx], lons[sort_idx], alts[sort_idx] |
| |
| timestamps, lats, lons, alts = self.resample_trajectory(timestamps, lats, lons, alts) |
| if len(timestamps) < self.min_trajectory_len: |
| return None |
| |
| |
| converter = ENUConverter(lats[0], lons[0], alts[0]) |
| east, north, up = converter.to_enu(lats, lons, alts) |
| |
| |
| features = derive_features_enu(east, north, up, timestamps) |
| |
| |
| if self.geohash_encoder.e_min is not None: |
| geohash_bits = self.geohash_encoder.encode(east, north, up) |
| else: |
| geohash_bits = np.zeros((len(east), self.geohash_precision * 3), dtype=np.int64) |
| |
| |
| cog_bins = self.feature_bins.encode_cog(features['COG']) |
| sog_bins = self.feature_bins.encode_sog(features['SOG']) |
| rot_bins = self.feature_bins.encode_rot(features['ROT']) |
| alt_rate_bins = self.feature_bins.encode_alt_rate(features['alt_rate']) |
| |
| |
| from uncertainty import compute_all_uncertainties, discretize_scores, UncertaintyConfig |
| uncert_config = UncertaintyConfig(n_bins=self.n_uncertainty_bins, window=5) |
| raw_uncert = compute_all_uncertainties( |
| east, north, up, timestamps, |
| features['COG'], features['SOG'], features['ROT'], features['alt_rate'], |
| config=uncert_config, |
| ) |
| uncert_methods = sorted(raw_uncert.keys()) |
| uncert_bins_multi = np.stack([ |
| discretize_scores(raw_uncert[m], self.n_uncertainty_bins) |
| for m in uncert_methods |
| ], axis=1) |
| uncert_bins = uncert_bins_multi[:, 0] if uncert_bins_multi.shape[1] > 0 else np.zeros(len(east), dtype=np.int64) |
| |
| temporal = extract_temporal_features(timestamps) |
| |
| return { |
| 'timestamps': timestamps, 'lats': lats, 'lons': lons, 'alts': alts, |
| 'east': east, 'north': north, 'up': up, |
| 'COG': features['COG'], 'SOG': features['SOG'], |
| 'ROT': features['ROT'], 'alt_rate': features['alt_rate'], |
| 'vx': features['vx'], 'vy': features['vy'], 'vz': features['vz'], |
| 'geohash_bits': geohash_bits, |
| 'cog_bins': cog_bins, 'sog_bins': sog_bins, |
| 'rot_bins': rot_bins, 'alt_rate_bins': alt_rate_bins, |
| 'uncert_bins': uncert_bins, 'uncert_bins_multi': uncert_bins_multi, |
| 'uncert_method_names': uncert_methods, |
| 'hour': temporal['hour'], 'dow': temporal['dow'], 'month': temporal['month'], |
| 'second_of_day': temporal['second_of_day'], 'dt': temporal['dt'], |
| 'enu_origin': (converter.origin_lat, converter.origin_lon, converter.origin_alt), |
| 'metadata': metadata or {}, |
| } |
| |
| def fit_geohash(self, all_east, all_north, all_up): |
| self.geohash_encoder.fit(all_east, all_north, all_up) |
| self._fitted = True |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class PromptTokens: |
| BOS: int = 0; EOS: int = 1; PAD: int = 2 |
| PREDICT: int = 3; CLASSIFY: int = 4; DETECT_ANOMALY: int = 5 |
| HEAVY: int = 6; LARGE: int = 7; SMALL: int = 8 |
| ROTORCRAFT: int = 9; GLIDER: int = 10; UAV: int = 11; AIRCRAFT_UNKNOWN: int = 12 |
| CLIMB: int = 13; CRUISE: int = 14; DESCENT: int = 15 |
| APPROACH: int = 16; GROUND: int = 17; PHASE_UNKNOWN: int = 18 |
| CONUS: int = 19; EUROPE: int = 20; ASIA: int = 21; REGION_OTHER: int = 22 |
| VOCAB_SIZE: int = 23 |
|
|
|
|
| |
| |
| |
|
|
| class AirTrackDataset(Dataset): |
| def __init__(self, trajectories, seq_len=128, stride=64, task='predict'): |
| self.seq_len = seq_len |
| self.stride = stride |
| self.task = task |
| self.windows = [] |
| self.trajectories = trajectories |
| |
| for traj_idx, traj in enumerate(trajectories): |
| n_points = len(traj['timestamps']) |
| if n_points < seq_len + 1: |
| if n_points >= 20: |
| self.windows.append((traj_idx, 0, n_points)) |
| continue |
| for start in range(0, n_points - seq_len, stride): |
| end = start + seq_len + 1 |
| if end <= n_points: |
| self.windows.append((traj_idx, start, end)) |
| |
| self.prompt_tokens = PromptTokens() |
| |
| def __len__(self): |
| return len(self.windows) |
| |
| def __getitem__(self, idx): |
| traj_idx, start, end = self.windows[idx] |
| traj = self.trajectories[traj_idx] |
| sl = slice(start, end) |
| |
| task_token = self.prompt_tokens.PREDICT if self.task == 'predict' else self.prompt_tokens.CLASSIFY |
| prompt = torch.tensor([ |
| self.prompt_tokens.BOS, task_token, |
| self.prompt_tokens.AIRCRAFT_UNKNOWN, |
| self.prompt_tokens.PHASE_UNKNOWN, |
| self.prompt_tokens.REGION_OTHER, |
| ], dtype=torch.long) |
| |
| return { |
| 'geohash_bits': torch.from_numpy(traj['geohash_bits'][sl]).float(), |
| 'cog_bins': torch.from_numpy(traj['cog_bins'][sl]).long(), |
| 'sog_bins': torch.from_numpy(traj['sog_bins'][sl]).long(), |
| 'rot_bins': torch.from_numpy(traj['rot_bins'][sl]).long(), |
| 'alt_rate_bins': torch.from_numpy(traj['alt_rate_bins'][sl]).long(), |
| 'uncert_bins': torch.from_numpy(traj['uncert_bins'][sl]).long(), |
| 'uncert_bins_multi': torch.from_numpy(traj['uncert_bins_multi'][sl]).long(), |
| 'hour': torch.from_numpy(traj['hour'][sl]).long(), |
| 'dow': torch.from_numpy(traj['dow'][sl]).long(), |
| 'month': torch.from_numpy(traj['month'][sl]).long(), |
| 'second_of_day': torch.from_numpy(traj['second_of_day'][sl]).float(), |
| 'dt': torch.from_numpy(traj['dt'][sl]).float(), |
| 'prompt': prompt, |
| 'east': torch.from_numpy(traj['east'][sl]).float(), |
| 'north': torch.from_numpy(traj['north'][sl]).float(), |
| 'up': torch.from_numpy(traj['up'][sl]).float(), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def load_traffic_sample(name='quickstart'): |
| import traffic.data.samples as samples |
| import pandas as pd |
| |
| data = getattr(samples, name) |
| trajectories = [] |
| flights = data if hasattr(data, '__iter__') else [data] |
| |
| for flight in flights: |
| try: |
| df = flight.data |
| except Exception: |
| continue |
| if df is None or len(df) < 20: |
| continue |
| |
| ts_series = pd.to_datetime(df['timestamp']) |
| if ts_series.dt.tz is not None: |
| ts_series = ts_series.dt.tz_convert('UTC').dt.tz_localize(None) |
| timestamps = ts_series.values.astype('int64').astype(np.float64) / 1e9 |
| lats = df['latitude'].values.astype(np.float64) |
| lons = df['longitude'].values.astype(np.float64) |
| |
| if 'altitude' in df.columns: |
| alts = df['altitude'].values.astype(np.float64) |
| elif 'baro_altitude' in df.columns: |
| alts = df['baro_altitude'].values.astype(np.float64) |
| else: |
| alts = np.zeros(len(df)) |
| |
| valid = ~(np.isnan(lats) | np.isnan(lons) | np.isnan(alts) | np.isnan(timestamps)) |
| if valid.sum() < 20: |
| continue |
| |
| trajectories.append({ |
| 'timestamps': timestamps[valid], |
| 'lats': lats[valid], 'lons': lons[valid], 'alts': alts[valid], |
| 'callsign': getattr(flight, 'callsign', 'UNKNOWN'), |
| 'icao24': getattr(flight, 'icao24', 'UNKNOWN'), |
| }) |
| |
| return trajectories |
|
|
|
|
| def build_dataset(raw_trajectories, processor, seq_len=128, stride=64, fit_geohash=True): |
| processed = [] |
| all_east, all_north, all_up = [], [], [] |
| |
| for raw in raw_trajectories: |
| result = processor.process_trajectory( |
| raw['timestamps'], raw['lats'], raw['lons'], raw['alts'], |
| metadata={k: v for k, v in raw.items() if k not in ['timestamps', 'lats', 'lons', 'alts']} |
| ) |
| if result is not None: |
| processed.append(result) |
| all_east.append(result['east']) |
| all_north.append(result['north']) |
| all_up.append(result['up']) |
| |
| if fit_geohash and processed: |
| all_e = np.concatenate(all_east) |
| all_n = np.concatenate(all_north) |
| all_u = np.concatenate(all_up) |
| processor.fit_geohash(all_e, all_n, all_u) |
| for traj in processed: |
| traj['geohash_bits'] = processor.geohash_encoder.encode( |
| traj['east'], traj['north'], traj['up'] |
| ) |
| |
| print(f"Processed {len(processed)}/{len(raw_trajectories)} trajectories") |
| dataset = AirTrackDataset(processed, seq_len=seq_len, stride=stride) |
| print(f"Created dataset with {len(dataset)} windows") |
| return dataset |
|
|
|
|
| if __name__ == '__main__': |
| print("Loading traffic sample data...") |
| raw_trajs = load_traffic_sample() |
| print(f"Loaded {len(raw_trajs)} raw trajectories") |
| |
| print("\nProcessing trajectories...") |
| processor = TrajectoryProcessor(resample_dt=5.0) |
| dataset = build_dataset(raw_trajs, processor, seq_len=64, stride=32) |
| |
| if len(dataset) > 0: |
| sample = dataset[0] |
| print("\nSample shapes:") |
| for k, v in sample.items(): |
| if isinstance(v, torch.Tensor): |
| print(f" {k}: {v.shape} ({v.dtype})") |
|
|