| | |
| | """Generate an offline-ready dataset from the CPTAC ovarian cohort. |
| | |
| | This utility downloads (if necessary) the CPTAC ovarian multi-modal dataset |
| | and converts the proteomics, transcriptomics, and clinical tables into a |
| | normalized feature table compatible with the RLDT offline pipeline. |
| | """ |
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import json |
| | import os |
| | import sys |
| | from pathlib import Path |
| | from typing import Dict, Iterable, List, Tuple |
| |
|
| | |
| | |
| | os.environ.setdefault("PANDAS_USE_PYARROW_BACKEND", "0") |
| | os.environ.setdefault("PANDAS_USE_PYARROW_EXTENSION_ARRAY", "0") |
| |
|
| | import numpy as np |
| | from RL0910.pandas_compat import get_pandas |
| | pd = get_pandas() |
| |
|
| |
|
| | def _import_cptac(): |
| | """Import the cptac package with helpful diagnostics.""" |
| | try: |
| | import cptac |
| | except Exception as exc: |
| | if exc.__class__.__name__ == "NoInternetError": |
| | print( |
| | "[ERROR] CPTAC index missing. Connect to the internet and rerun,\n" |
| | "or execute `python -m cptac download` once to bootstrap the index.", |
| | file=sys.stderr, |
| | ) |
| | sys.exit(1) |
| | raise |
| |
|
| | from cptac.exceptions import ( |
| | NoInternetError, |
| | DataSourceNotFoundError, |
| | MissingFileError, |
| | ) |
| |
|
| | return cptac, NoInternetError, DataSourceNotFoundError, MissingFileError |
| |
|
| |
|
| | def _ensure_dataset( |
| | cptac_module, |
| | no_internet_error, |
| | datasource_error, |
| | missing_file_error, |
| | ): |
| | """Load the CPTAC ovarian dataset, downloading it if necessary.""" |
| | download_attempted = False |
| | try: |
| | dataset = cptac_module.Ov() |
| | return dataset |
| | except no_internet_error as err: |
| | print(f"[ERROR] {err}", file=sys.stderr) |
| | print( |
| | "Please connect to the CPTAC data portal and rerun, or download the dataset manually.", |
| | file=sys.stderr, |
| | ) |
| | sys.exit(1) |
| | except (datasource_error, missing_file_error, FileNotFoundError): |
| | download_attempted = True |
| | except Exception as err: |
| | print(f"[WARN] Initial CPTAC load failed: {err}", file=sys.stderr) |
| | download_attempted = True |
| |
|
| | if download_attempted: |
| | print("Attempting to download CPTAC ovarian assets (requires internet)...") |
| | last_error: Exception | None = None |
| | for cancer_key in ("ov", "ovarian", "Ovarian"): |
| | try: |
| | cptac_module.download_cancer(cancer_key) |
| | last_error = None |
| | break |
| | except Exception as inner: |
| | last_error = inner |
| |
|
| | if last_error is not None: |
| | print(f"[ERROR] Unable to download CPTAC ovarian dataset: {last_error}", file=sys.stderr) |
| | sys.exit(1) |
| |
|
| | try: |
| | dataset = cptac_module.Ov() |
| | return dataset |
| | except Exception as err: |
| | print(f"[ERROR] CPTAC dataset still unavailable after download: {err}", file=sys.stderr) |
| | sys.exit(1) |
| |
|
| | raise RuntimeError("Unexpected CPTAC import state") |
| |
|
| |
|
| | def _standardize_index(df: pd.DataFrame) -> pd.DataFrame: |
| | """Return a copy with a flat string index keyed by patient identifier.""" |
| | out = df.copy() |
| | if isinstance(out.index, pd.MultiIndex): |
| | out.index = out.index.get_level_values(0) |
| | out.index = out.index.astype(str) |
| | out = out[~out.index.duplicated(keep="first")] |
| | return out |
| |
|
| |
|
| | def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame: |
| | """Flatten MultiIndex columns to single strings.""" |
| | if isinstance(df.columns, pd.MultiIndex): |
| | flat = [] |
| | for col in df.columns: |
| | pieces = [str(level) for level in col if level not in (None, "", "nan")] |
| | label = "_".join(pieces).strip("_") or "feature" |
| | flat.append(label) |
| | df = df.copy() |
| | df.columns = flat |
| | else: |
| | df = df.copy() |
| | df.columns = [str(c) for c in df.columns] |
| | return df |
| |
|
| |
|
| | def _safe_label(label: str) -> str: |
| | """Create a filesystem and YAML friendly slug.""" |
| | cleaned = "".join(ch if ch.isalnum() else "_" for ch in label) |
| | cleaned = cleaned.strip("_").lower() |
| | return cleaned or "feature" |
| |
|
| |
|
| | def _unique_name(base: str, registry: set[str]) -> str: |
| | name = base |
| | counter = 2 |
| | while name in registry: |
| | name = f"{base}_{counter}" |
| | counter += 1 |
| | registry.add(name) |
| | return name |
| |
|
| |
|
| | def _select_top_variance(df: pd.DataFrame, top_n: int) -> pd.DataFrame: |
| | if top_n <= 0 or df.empty: |
| | return df |
| | if df.shape[1] <= top_n: |
| | return df |
| | variances = df.var(axis=0, skipna=True) |
| | keep = variances.sort_values(ascending=False).head(top_n).index |
| | return df.loc[:, keep] |
| |
|
| |
|
| | def _prepare_modality( |
| | raw_df: pd.DataFrame, |
| | modality_name: str, |
| | prefix: str, |
| | top_n: int, |
| | min_coverage: float, |
| | name_registry: set[str], |
| | ) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float | str]]]: |
| | """Convert a CPTAC table into numeric features and collect statistics.""" |
| | if raw_df is None or raw_df.empty: |
| | return pd.DataFrame(), {} |
| |
|
| | df = _flatten_columns(_standardize_index(raw_df)) |
| | if df.empty: |
| | return df, {} |
| |
|
| | numeric = {} |
| | for col in df.columns: |
| | series = df[col] |
| | if pd.api.types.is_numeric_dtype(series): |
| | numeric[col] = pd.to_numeric(series, errors="coerce") |
| | else: |
| | numeric[col] = series.astype("category").cat.codes.replace(-1, np.nan) |
| | numeric_df = pd.DataFrame(numeric, index=df.index).astype(float) |
| | if numeric_df.empty: |
| | return numeric_df, {} |
| |
|
| | coverage = numeric_df.notna().mean() |
| | keep_cols = coverage[coverage >= min_coverage].index.tolist() |
| | numeric_df = numeric_df.loc[:, keep_cols] |
| | if numeric_df.empty: |
| | return numeric_df, {} |
| |
|
| | medians = numeric_df.median() |
| | filled = numeric_df.fillna(medians) |
| | filtered = _select_top_variance(filled, top_n) |
| | medians = medians.loc[filtered.columns] |
| | coverage = coverage.loc[filtered.columns] |
| | variances = filtered.var(axis=0, skipna=True) |
| |
|
| | rename_map: Dict[str, str] = {} |
| | stats: Dict[str, Dict[str, float | str]] = {} |
| | for original in filtered.columns: |
| | base = _safe_label(original) |
| | unique = _unique_name(f"{prefix}_{base}", name_registry) |
| | state_name = f"state_{unique}" |
| | rename_map[original] = state_name |
| | stats[state_name] = { |
| | "original_name": original, |
| | "modality": modality_name, |
| | "coverage": float(coverage.get(original, float("nan"))), |
| | "median": float(medians.get(original, float("nan"))), |
| | "variance": float(variances.get(original, float("nan"))), |
| | } |
| |
|
| | prepared = filtered.rename(columns=rename_map) |
| | return prepared, stats |
| |
|
| |
|
| | def _combine_modalities(frames: Iterable[pd.DataFrame]) -> pd.DataFrame: |
| | indices = pd.Index([]) |
| | dataframes = [df for df in frames if not df.empty] |
| | for df in dataframes: |
| | indices = indices.union(df.index) |
| | combined = pd.DataFrame(index=indices.sort_values()) |
| | for df in dataframes: |
| | combined = combined.join(df, how="left") |
| | return combined |
| |
|
| |
|
| | def _normalize_features( |
| | df: pd.DataFrame, |
| | apply_minmax: bool, |
| | ) -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: |
| | stats: Dict[str, Dict[str, float]] = {} |
| | if df.empty: |
| | return df, stats |
| |
|
| | if apply_minmax: |
| | mins = df.min(axis=0) |
| | maxs = df.max(axis=0) |
| | denom = (maxs - mins).replace(0, np.nan) |
| | scaled = (df - mins) / denom |
| | scaled = scaled.clip(0.0, 1.0) |
| | scaled = scaled.fillna(0.5) |
| | for col in scaled.columns: |
| | stats[col] = { |
| | "min": float(mins.get(col, float("nan"))), |
| | "max": float(maxs.get(col, float("nan"))), |
| | } |
| | return scaled, stats |
| |
|
| | filled = df.copy() |
| | medians = filled.median(axis=0) |
| | filled = filled.fillna(medians) |
| | for col in filled.columns: |
| | stats[col] = { |
| | "mean": float(filled[col].mean()), |
| | "std": float(filled[col].std(ddof=0)), |
| | } |
| | return filled, stats |
| |
|
| |
|
| | def _derive_actions( |
| | risk: pd.Series, |
| | desired_actions: int, |
| | ) -> Tuple[pd.Series, List[str]]: |
| | if risk.empty: |
| | return pd.Series(dtype=int), ["Undefined"] |
| |
|
| | unique_values = risk.nunique(dropna=True) |
| | n_actions = max(1, min(desired_actions, unique_values)) |
| |
|
| | if n_actions <= 1: |
| | actions = pd.Series(0, index=risk.index, dtype=int) |
| | return actions, ["Single bucket"] |
| |
|
| | quantiles = pd.qcut( |
| | risk.rank(method="first"), |
| | q=n_actions, |
| | labels=False, |
| | duplicates="drop", |
| | ).astype(int) |
| | action_names = [f"Risk quantile {i + 1}/{len(quantiles.unique())}" for i in range(len(quantiles.unique()))] |
| | return quantiles, action_names |
| |
|
| |
|
| | def _yaml_quote(value: str) -> str: |
| | if value == "": |
| | return "''" |
| | if any(ch in value for ch in ":{}[]&,#|>!-?%*@\"'\t\n\r "): |
| | return json.dumps(value, ensure_ascii=False) |
| | return value |
| |
|
| |
|
| | def build_dataset(args: argparse.Namespace) -> None: |
| | cptac_module, no_internet_error, datasource_error, missing_file_error = _import_cptac() |
| | dataset = _ensure_dataset(cptac_module, no_internet_error, datasource_error, missing_file_error) |
| |
|
| | np.random.seed(args.seed) |
| |
|
| | proteomics = dataset.get_proteomics() |
| | transcriptomics = dataset.get_transcriptomics() |
| | clinical = dataset.get_clinical() |
| |
|
| | name_registry: set[str] = set() |
| | prot_df, prot_stats = _prepare_modality( |
| | proteomics, |
| | "proteomics", |
| | "prot", |
| | args.max_proteins, |
| | args.min_coverage, |
| | name_registry, |
| | ) |
| | rna_df, rna_stats = _prepare_modality( |
| | transcriptomics, |
| | "transcriptomics", |
| | "rna", |
| | args.max_transcripts, |
| | args.min_coverage, |
| | name_registry, |
| | ) |
| | clin_df, clin_stats = _prepare_modality( |
| | clinical, |
| | "clinical", |
| | "clin", |
| | args.max_clinical, |
| | max(0.35, args.min_coverage * 0.5), |
| | name_registry, |
| | ) |
| |
|
| | combined = _combine_modalities([prot_df, rna_df, clin_df]) |
| | if combined.empty: |
| | print( |
| | "[ERROR] No features satisfied the filtering criteria. Consider lowering --min-coverage or increasing --max-* limits.", |
| | file=sys.stderr, |
| | ) |
| | sys.exit(1) |
| |
|
| | |
| | medians = combined.median(axis=0) |
| | combined = combined.fillna(medians) |
| |
|
| | normalized, scaling_stats = _normalize_features(combined, not args.no_normalize) |
| | state_columns = list(normalized.columns) |
| | if not state_columns: |
| | print("[ERROR] Normalized feature matrix is empty after preprocessing.", file=sys.stderr) |
| | sys.exit(1) |
| |
|
| | risk_score = normalized[state_columns].mean(axis=1) |
| | actions, action_names = _derive_actions(risk_score, args.actions) |
| | rewards = (0.5 - risk_score) * 2.0 |
| | rewards = rewards.clip(-1.0, 1.0) |
| |
|
| | output_dir = Path(args.output_dir).expanduser().resolve() |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | dataset_df = pd.DataFrame( |
| | { |
| | "patient_id": normalized.index.astype(str), |
| | "timestep": 0, |
| | "action": actions.reindex(normalized.index).fillna(0).astype(int), |
| | "reward": rewards.reindex(normalized.index).astype(float), |
| | "terminal": True, |
| | } |
| | ) |
| | dataset_df = dataset_df.join(normalized[state_columns]) |
| |
|
| | csv_path = output_dir / "ovarian_offline_dataset.csv" |
| | dataset_df.to_csv(csv_path, index=False) |
| |
|
| | npz_path = output_dir / "ovarian_offline_dataset.npz" |
| | np.savez( |
| | npz_path, |
| | states=normalized[state_columns].to_numpy(dtype=np.float32), |
| | actions=dataset_df["action"].to_numpy(dtype=np.int64), |
| | rewards=dataset_df["reward"].to_numpy(dtype=np.float32), |
| | terminals=np.ones(len(dataset_df), dtype=np.bool_), |
| | patient_ids=dataset_df["patient_id"].to_numpy(dtype="U"), |
| | timesteps=dataset_df["timestep"].to_numpy(dtype=np.int64), |
| | state_columns=np.array(state_columns), |
| | action_names=np.array(action_names, dtype="U"), |
| | risk_scores=risk_score.reindex(normalized.index).to_numpy(dtype=np.float32), |
| | ) |
| |
|
| | feature_catalog = {**prot_stats, **rna_stats, **clin_stats} |
| | for column, stats in feature_catalog.items(): |
| | stats.update(scaling_stats.get(column, {})) |
| |
|
| | feature_labels = { |
| | column: f"{stats['modality']}::{stats['original_name']}" |
| | for column, stats in feature_catalog.items() |
| | } |
| |
|
| | schema_lines = [ |
| | "data_type: tabular", |
| | "mapping:", |
| | " trajectory_id: patient_id", |
| | " timestep: timestep", |
| | " action: action", |
| | " reward: reward", |
| | " terminal: terminal", |
| | " feature_cols:", |
| | ] |
| | for col in state_columns: |
| | schema_lines.append(f" - {col}") |
| |
|
| | if args.no_normalize: |
| | schema_lines.extend([ |
| | "normalization:", |
| | " method: none", |
| | ]) |
| | else: |
| | schema_lines.extend([ |
| | "normalization:", |
| | " method: minmax", |
| | " clip_min: 0.0", |
| | " clip_max: 1.0", |
| | ]) |
| |
|
| | schema_lines.append("action_names:") |
| | for name in action_names: |
| | schema_lines.append(f" - {_yaml_quote(name)}") |
| |
|
| | schema_lines.append("feature_names:") |
| | for col in state_columns: |
| | schema_lines.append(f" - {_yaml_quote(feature_labels.get(col, col))}") |
| |
|
| | schema_lines.extend( |
| | [ |
| | "reward_spec:", |
| | " expression: normalized_risk_score", |
| | " window_agg: last", |
| | ] |
| | ) |
| |
|
| | schema_path = output_dir / "ovarian_offline_schema.yaml" |
| | schema_path.write_text("\n".join(schema_lines) + "\n", encoding="utf-8") |
| |
|
| | metadata = { |
| | "dataset": "CPTAC Ovarian (proteomics + transcriptomics + clinical)", |
| | "patients": int(dataset_df["patient_id"].nunique()), |
| | "records": int(len(dataset_df)), |
| | "features": len(state_columns), |
| | "actions": { |
| | "count": len(action_names), |
| | "names": action_names, |
| | "definition": "Risk quantiles derived from the normalized multi-modal feature mean", |
| | }, |
| | "reward": { |
| | "range": [-1.0, 1.0], |
| | "definition": "Centered risk score (higher is better) from combined normalized features", |
| | }, |
| | "normalization": "minmax" if not args.no_normalize else "none", |
| | "feature_catalog": feature_catalog, |
| | "files": { |
| | "csv": csv_path.name, |
| | "npz": npz_path.name, |
| | "schema": schema_path.name, |
| | }, |
| | } |
| |
|
| | metadata_path = output_dir / "ovarian_offline_metadata.json" |
| | metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
|
| | print("✔ Dataset prepared") |
| | print(f" • CSV table : {csv_path}") |
| | print(f" • Numpy archive : {npz_path}") |
| | print(f" • Schema YAML : {schema_path}") |
| | print(f" • Metadata JSON : {metadata_path}") |
| |
|
| |
|
| | def parse_args(argv: List[str] | None = None) -> argparse.Namespace: |
| | parser = argparse.ArgumentParser( |
| | description="Download and format the CPTAC ovarian cohort for RLDT offline workflows", |
| | ) |
| | parser.add_argument( |
| | "--output-dir", |
| | default="RL0910/data/cptac_ovarian", |
| | help="Directory to store the generated dataset files (default: %(default)s)", |
| | ) |
| | parser.add_argument( |
| | "--max-proteins", |
| | type=int, |
| | default=75, |
| | help="Maximum number of proteomic features to retain (variance-ranked).", |
| | ) |
| | parser.add_argument( |
| | "--max-transcripts", |
| | type=int, |
| | default=75, |
| | help="Maximum number of transcriptomic features to retain (variance-ranked).", |
| | ) |
| | parser.add_argument( |
| | "--max-clinical", |
| | type=int, |
| | default=40, |
| | help="Maximum number of engineered clinical features to retain.", |
| | ) |
| | parser.add_argument( |
| | "--min-coverage", |
| | type=float, |
| | default=0.65, |
| | help="Minimum fraction of patients required for a feature to be kept (0-1).", |
| | ) |
| | parser.add_argument( |
| | "--actions", |
| | type=int, |
| | default=5, |
| | help="Number of action buckets derived from the risk score quantiles.", |
| | ) |
| | parser.add_argument( |
| | "--no-normalize", |
| | action="store_true", |
| | help="Disable min-max normalization (features remain on their native scale).", |
| | ) |
| | parser.add_argument( |
| | "--seed", |
| | type=int, |
| | default=42, |
| | help="Random seed used when deriving surrogate actions.", |
| | ) |
| |
|
| | args = parser.parse_args(argv) |
| | if not 0 < args.min_coverage <= 1: |
| | parser.error("--min-coverage must be within (0, 1].") |
| | if args.actions <= 0: |
| | parser.error("--actions must be a positive integer.") |
| | return args |
| |
|
| |
|
| | def main(argv: List[str] | None = None) -> int: |
| | args = parse_args(argv) |
| | build_dataset(args) |
| | return 0 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | raise SystemExit(main()) |
| |
|