| |
| """Generate an offline-ready dataset from the CPTAC ovarian cohort. |
| |
| This utility downloads (if necessary) the CPTAC ovarian multi-modal dataset |
| and converts the proteomics, transcriptomics, and clinical tables into a |
| normalized feature table compatible with the RLDT offline pipeline. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Tuple |
|
|
| |
| |
| os.environ.setdefault("PANDAS_USE_PYARROW_BACKEND", "0") |
| os.environ.setdefault("PANDAS_USE_PYARROW_EXTENSION_ARRAY", "0") |
|
|
| import numpy as np |
| from RL0910.pandas_compat import get_pandas |
| pd = get_pandas() |
|
|
|
|
| def _import_cptac(): |
| """Import the cptac package with helpful diagnostics.""" |
| try: |
| import cptac |
| except Exception as exc: |
| if exc.__class__.__name__ == "NoInternetError": |
| print( |
| "[ERROR] CPTAC index missing. Connect to the internet and rerun,\n" |
| "or execute `python -m cptac download` once to bootstrap the index.", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
| raise |
|
|
| from cptac.exceptions import ( |
| NoInternetError, |
| DataSourceNotFoundError, |
| MissingFileError, |
| ) |
|
|
| return cptac, NoInternetError, DataSourceNotFoundError, MissingFileError |
|
|
|
|
| def _ensure_dataset( |
| cptac_module, |
| no_internet_error, |
| datasource_error, |
| missing_file_error, |
| ): |
| """Load the CPTAC ovarian dataset, downloading it if necessary.""" |
| download_attempted = False |
| try: |
| dataset = cptac_module.Ov() |
| return dataset |
| except no_internet_error as err: |
| print(f"[ERROR] {err}", file=sys.stderr) |
| print( |
| "Please connect to the CPTAC data portal and rerun, or download the dataset manually.", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
| except (datasource_error, missing_file_error, FileNotFoundError): |
| download_attempted = True |
| except Exception as err: |
| print(f"[WARN] Initial CPTAC load failed: {err}", file=sys.stderr) |
| download_attempted = True |
|
|
| if download_attempted: |
| print("Attempting to download CPTAC ovarian assets (requires internet)...") |
| last_error: Exception | None = None |
| for cancer_key in ("ov", "ovarian", "Ovarian"): |
| try: |
| cptac_module.download_cancer(cancer_key) |
| last_error = None |
| break |
| except Exception as inner: |
| last_error = inner |
|
|
| if last_error is not None: |
| print(f"[ERROR] Unable to download CPTAC ovarian dataset: {last_error}", file=sys.stderr) |
| sys.exit(1) |
|
|
| try: |
| dataset = cptac_module.Ov() |
| return dataset |
| except Exception as err: |
| print(f"[ERROR] CPTAC dataset still unavailable after download: {err}", file=sys.stderr) |
| sys.exit(1) |
|
|
| raise RuntimeError("Unexpected CPTAC import state") |
|
|
|
|
| def _standardize_index(df: pd.DataFrame) -> pd.DataFrame: |
| """Return a copy with a flat string index keyed by patient identifier.""" |
| out = df.copy() |
| if isinstance(out.index, pd.MultiIndex): |
| out.index = out.index.get_level_values(0) |
| out.index = out.index.astype(str) |
| out = out[~out.index.duplicated(keep="first")] |
| return out |
|
|
|
|
| def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame: |
| """Flatten MultiIndex columns to single strings.""" |
| if isinstance(df.columns, pd.MultiIndex): |
| flat = [] |
| for col in df.columns: |
| pieces = [str(level) for level in col if level not in (None, "", "nan")] |
| label = "_".join(pieces).strip("_") or "feature" |
| flat.append(label) |
| df = df.copy() |
| df.columns = flat |
| else: |
| df = df.copy() |
| df.columns = [str(c) for c in df.columns] |
| return df |
|
|
|
|
| def _safe_label(label: str) -> str: |
| """Create a filesystem and YAML friendly slug.""" |
| cleaned = "".join(ch if ch.isalnum() else "_" for ch in label) |
| cleaned = cleaned.strip("_").lower() |
| return cleaned or "feature" |
|
|
|
|
| def _unique_name(base: str, registry: set[str]) -> str: |
| name = base |
| counter = 2 |
| while name in registry: |
| name = f"{base}_{counter}" |
| counter += 1 |
| registry.add(name) |
| return name |
|
|
|
|
| def _select_top_variance(df: pd.DataFrame, top_n: int) -> pd.DataFrame: |
| if top_n <= 0 or df.empty: |
| return df |
| if df.shape[1] <= top_n: |
| return df |
| variances = df.var(axis=0, skipna=True) |
| keep = variances.sort_values(ascending=False).head(top_n).index |
| return df.loc[:, keep] |
|
|
|
|
| def _prepare_modality( |
| raw_df: pd.DataFrame, |
| modality_name: str, |
| prefix: str, |
| top_n: int, |
| min_coverage: float, |
| name_registry: set[str], |
| ) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float | str]]]: |
| """Convert a CPTAC table into numeric features and collect statistics.""" |
| if raw_df is None or raw_df.empty: |
| return pd.DataFrame(), {} |
|
|
| df = _flatten_columns(_standardize_index(raw_df)) |
| if df.empty: |
| return df, {} |
|
|
| numeric = {} |
| for col in df.columns: |
| series = df[col] |
| if pd.api.types.is_numeric_dtype(series): |
| numeric[col] = pd.to_numeric(series, errors="coerce") |
| else: |
| numeric[col] = series.astype("category").cat.codes.replace(-1, np.nan) |
| numeric_df = pd.DataFrame(numeric, index=df.index).astype(float) |
| if numeric_df.empty: |
| return numeric_df, {} |
|
|
| coverage = numeric_df.notna().mean() |
| keep_cols = coverage[coverage >= min_coverage].index.tolist() |
| numeric_df = numeric_df.loc[:, keep_cols] |
| if numeric_df.empty: |
| return numeric_df, {} |
|
|
| medians = numeric_df.median() |
| filled = numeric_df.fillna(medians) |
| filtered = _select_top_variance(filled, top_n) |
| medians = medians.loc[filtered.columns] |
| coverage = coverage.loc[filtered.columns] |
| variances = filtered.var(axis=0, skipna=True) |
|
|
| rename_map: Dict[str, str] = {} |
| stats: Dict[str, Dict[str, float | str]] = {} |
| for original in filtered.columns: |
| base = _safe_label(original) |
| unique = _unique_name(f"{prefix}_{base}", name_registry) |
| state_name = f"state_{unique}" |
| rename_map[original] = state_name |
| stats[state_name] = { |
| "original_name": original, |
| "modality": modality_name, |
| "coverage": float(coverage.get(original, float("nan"))), |
| "median": float(medians.get(original, float("nan"))), |
| "variance": float(variances.get(original, float("nan"))), |
| } |
|
|
| prepared = filtered.rename(columns=rename_map) |
| return prepared, stats |
|
|
|
|
| def _combine_modalities(frames: Iterable[pd.DataFrame]) -> pd.DataFrame: |
| indices = pd.Index([]) |
| dataframes = [df for df in frames if not df.empty] |
| for df in dataframes: |
| indices = indices.union(df.index) |
| combined = pd.DataFrame(index=indices.sort_values()) |
| for df in dataframes: |
| combined = combined.join(df, how="left") |
| return combined |
|
|
|
|
| def _normalize_features( |
| df: pd.DataFrame, |
| apply_minmax: bool, |
| ) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: |
| stats: Dict[str, Dict[str, float]] = {} |
| if df.empty: |
| return df, stats |
|
|
| if apply_minmax: |
| mins = df.min(axis=0) |
| maxs = df.max(axis=0) |
| denom = (maxs - mins).replace(0, np.nan) |
| scaled = (df - mins) / denom |
| scaled = scaled.clip(0.0, 1.0) |
| scaled = scaled.fillna(0.5) |
| for col in scaled.columns: |
| stats[col] = { |
| "min": float(mins.get(col, float("nan"))), |
| "max": float(maxs.get(col, float("nan"))), |
| } |
| return scaled, stats |
|
|
| filled = df.copy() |
| medians = filled.median(axis=0) |
| filled = filled.fillna(medians) |
| for col in filled.columns: |
| stats[col] = { |
| "mean": float(filled[col].mean()), |
| "std": float(filled[col].std(ddof=0)), |
| } |
| return filled, stats |
|
|
|
|
| def _derive_actions( |
| risk: pd.Series, |
| desired_actions: int, |
| ) -> Tuple[pd.Series, List[str]]: |
| if risk.empty: |
| return pd.Series(dtype=int), ["Undefined"] |
|
|
| unique_values = risk.nunique(dropna=True) |
| n_actions = max(1, min(desired_actions, unique_values)) |
|
|
| if n_actions <= 1: |
| actions = pd.Series(0, index=risk.index, dtype=int) |
| return actions, ["Single bucket"] |
|
|
| quantiles = pd.qcut( |
| risk.rank(method="first"), |
| q=n_actions, |
| labels=False, |
| duplicates="drop", |
| ).astype(int) |
| action_names = [f"Risk quantile {i + 1}/{len(quantiles.unique())}" for i in range(len(quantiles.unique()))] |
| return quantiles, action_names |
|
|
|
|
| def _yaml_quote(value: str) -> str: |
| if value == "": |
| return "''" |
| if any(ch in value for ch in ":{}[]&,#|>!-?%*@\"'\t\n\r "): |
| return json.dumps(value, ensure_ascii=False) |
| return value |
|
|
|
|
| def build_dataset(args: argparse.Namespace) -> None: |
| cptac_module, no_internet_error, datasource_error, missing_file_error = _import_cptac() |
| dataset = _ensure_dataset(cptac_module, no_internet_error, datasource_error, missing_file_error) |
|
|
| np.random.seed(args.seed) |
|
|
| proteomics = dataset.get_proteomics() |
| transcriptomics = dataset.get_transcriptomics() |
| clinical = dataset.get_clinical() |
|
|
| name_registry: set[str] = set() |
| prot_df, prot_stats = _prepare_modality( |
| proteomics, |
| "proteomics", |
| "prot", |
| args.max_proteins, |
| args.min_coverage, |
| name_registry, |
| ) |
| rna_df, rna_stats = _prepare_modality( |
| transcriptomics, |
| "transcriptomics", |
| "rna", |
| args.max_transcripts, |
| args.min_coverage, |
| name_registry, |
| ) |
| clin_df, clin_stats = _prepare_modality( |
| clinical, |
| "clinical", |
| "clin", |
| args.max_clinical, |
| max(0.35, args.min_coverage * 0.5), |
| name_registry, |
| ) |
|
|
| combined = _combine_modalities([prot_df, rna_df, clin_df]) |
| if combined.empty: |
| print( |
| "[ERROR] No features satisfied the filtering criteria. Consider lowering --min-coverage or increasing --max-* limits.", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
|
|
| |
| medians = combined.median(axis=0) |
| combined = combined.fillna(medians) |
|
|
| normalized, scaling_stats = _normalize_features(combined, not args.no_normalize) |
| state_columns = list(normalized.columns) |
| if not state_columns: |
| print("[ERROR] Normalized feature matrix is empty after preprocessing.", file=sys.stderr) |
| sys.exit(1) |
|
|
| risk_score = normalized[state_columns].mean(axis=1) |
| actions, action_names = _derive_actions(risk_score, args.actions) |
| rewards = (0.5 - risk_score) * 2.0 |
| rewards = rewards.clip(-1.0, 1.0) |
|
|
| output_dir = Path(args.output_dir).expanduser().resolve() |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| dataset_df = pd.DataFrame( |
| { |
| "patient_id": normalized.index.astype(str), |
| "timestep": 0, |
| "action": actions.reindex(normalized.index).fillna(0).astype(int), |
| "reward": rewards.reindex(normalized.index).astype(float), |
| "terminal": True, |
| } |
| ) |
| dataset_df = dataset_df.join(normalized[state_columns]) |
|
|
| csv_path = output_dir / "ovarian_offline_dataset.csv" |
| dataset_df.to_csv(csv_path, index=False) |
|
|
| npz_path = output_dir / "ovarian_offline_dataset.npz" |
| np.savez( |
| npz_path, |
| states=normalized[state_columns].to_numpy(dtype=np.float32), |
| actions=dataset_df["action"].to_numpy(dtype=np.int64), |
| rewards=dataset_df["reward"].to_numpy(dtype=np.float32), |
| terminals=np.ones(len(dataset_df), dtype=np.bool_), |
| patient_ids=dataset_df["patient_id"].to_numpy(dtype="U"), |
| timesteps=dataset_df["timestep"].to_numpy(dtype=np.int64), |
| state_columns=np.array(state_columns), |
| action_names=np.array(action_names, dtype="U"), |
| risk_scores=risk_score.reindex(normalized.index).to_numpy(dtype=np.float32), |
| ) |
|
|
| feature_catalog = {**prot_stats, **rna_stats, **clin_stats} |
| for column, stats in feature_catalog.items(): |
| stats.update(scaling_stats.get(column, {})) |
|
|
| feature_labels = { |
| column: f"{stats['modality']}::{stats['original_name']}" |
| for column, stats in feature_catalog.items() |
| } |
|
|
| schema_lines = [ |
| "data_type: tabular", |
| "mapping:", |
| " trajectory_id: patient_id", |
| " timestep: timestep", |
| " action: action", |
| " reward: reward", |
| " terminal: terminal", |
| " feature_cols:", |
| ] |
| for col in state_columns: |
| schema_lines.append(f" - {col}") |
|
|
| if args.no_normalize: |
| schema_lines.extend([ |
| "normalization:", |
| " method: none", |
| ]) |
| else: |
| schema_lines.extend([ |
| "normalization:", |
| " method: minmax", |
| " clip_min: 0.0", |
| " clip_max: 1.0", |
| ]) |
|
|
| schema_lines.append("action_names:") |
| for name in action_names: |
| schema_lines.append(f" - {_yaml_quote(name)}") |
|
|
| schema_lines.append("feature_names:") |
| for col in state_columns: |
| schema_lines.append(f" - {_yaml_quote(feature_labels.get(col, col))}") |
|
|
| schema_lines.extend( |
| [ |
| "reward_spec:", |
| " expression: normalized_risk_score", |
| " window_agg: last", |
| ] |
| ) |
|
|
| schema_path = output_dir / "ovarian_offline_schema.yaml" |
| schema_path.write_text("\n".join(schema_lines) + "\n", encoding="utf-8") |
|
|
| metadata = { |
| "dataset": "CPTAC Ovarian (proteomics + transcriptomics + clinical)", |
| "patients": int(dataset_df["patient_id"].nunique()), |
| "records": int(len(dataset_df)), |
| "features": len(state_columns), |
| "actions": { |
| "count": len(action_names), |
| "names": action_names, |
| "definition": "Risk quantiles derived from the normalized multi-modal feature mean", |
| }, |
| "reward": { |
| "range": [-1.0, 1.0], |
| "definition": "Centered risk score (higher is better) from combined normalized features", |
| }, |
| "normalization": "minmax" if not args.no_normalize else "none", |
| "feature_catalog": feature_catalog, |
| "files": { |
| "csv": csv_path.name, |
| "npz": npz_path.name, |
| "schema": schema_path.name, |
| }, |
| } |
|
|
| metadata_path = output_dir / "ovarian_offline_metadata.json" |
| metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
|
|
| print("✔ Dataset prepared") |
| print(f" • CSV table : {csv_path}") |
| print(f" • Numpy archive : {npz_path}") |
| print(f" • Schema YAML : {schema_path}") |
| print(f" • Metadata JSON : {metadata_path}") |
|
|
|
|
| def parse_args(argv: List[str] | None = None) -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Download and format the CPTAC ovarian cohort for RLDT offline workflows", |
| ) |
| parser.add_argument( |
| "--output-dir", |
| default="RL0910/data/cptac_ovarian", |
| help="Directory to store the generated dataset files (default: %(default)s)", |
| ) |
| parser.add_argument( |
| "--max-proteins", |
| type=int, |
| default=75, |
| help="Maximum number of proteomic features to retain (variance-ranked).", |
| ) |
| parser.add_argument( |
| "--max-transcripts", |
| type=int, |
| default=75, |
| help="Maximum number of transcriptomic features to retain (variance-ranked).", |
| ) |
| parser.add_argument( |
| "--max-clinical", |
| type=int, |
| default=40, |
| help="Maximum number of engineered clinical features to retain.", |
| ) |
| parser.add_argument( |
| "--min-coverage", |
| type=float, |
| default=0.65, |
| help="Minimum fraction of patients required for a feature to be kept (0-1).", |
| ) |
| parser.add_argument( |
| "--actions", |
| type=int, |
| default=5, |
| help="Number of action buckets derived from the risk score quantiles.", |
| ) |
| parser.add_argument( |
| "--no-normalize", |
| action="store_true", |
| help="Disable min-max normalization (features remain on their native scale).", |
| ) |
| parser.add_argument( |
| "--seed", |
| type=int, |
| default=42, |
| help="Random seed used when deriving surrogate actions.", |
| ) |
|
|
| args = parser.parse_args(argv) |
| if not 0 < args.min_coverage <= 1: |
| parser.error("--min-coverage must be within (0, 1].") |
| if args.actions <= 0: |
| parser.error("--actions must be a positive integer.") |
| return args |
|
|
|
|
| def main(argv: List[str] | None = None) -> int: |
| args = parse_args(argv) |
| build_dataset(args) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|