Oguzz07's picture
Add causal_selection/features/extractor.py
3b69a24 verified
"""
Meta-feature extraction module for discrete observational datasets.
Extracts ~34 features across 5 categories:
- Tier 1: Basic descriptors (6 features)
- Tier 2: Information-theoretic (8 features)
- Tier 3: Dependency structure (8 features)
- Tier 4: CI test landmark probes (6 features)
- Tier 5: Distribution shape (6 features)
"""
import numpy as np
import pandas as pd
from scipy.stats import entropy, chi2_contingency
from itertools import combinations
import warnings
import logging
warnings.filterwarnings('ignore')
logger = logging.getLogger(__name__)
def extract_all_features(df, n_probe_triplets=100, alpha=0.05):
"""Extract all meta-features from a discrete dataset.
Args:
df: pd.DataFrame with integer-encoded discrete columns
n_probe_triplets: number of random (X,Y,Z) triplets for CI probes
alpha: significance level for dependency tests
Returns:
dict of feature_name -> float
"""
features = {}
# Tier 1: Basic descriptors
features.update(_basic_features(df))
# Tier 2: Information-theoretic
features.update(_info_theory_features(df))
# Tier 3: Dependency structure
features.update(_dependency_features(df, alpha=alpha))
# Tier 4: CI test landmark probes
features.update(_ci_probe_features(df, n_probes=n_probe_triplets, alpha=alpha))
# Tier 5: Distribution shape
features.update(_distribution_features(df))
return features
def _basic_features(df):
"""Tier 1: Basic dataset descriptors."""
n_samples, n_vars = df.shape
cardinalities = df.nunique().values
return {
'n_samples': n_samples,
'n_variables': n_vars,
'n_over_p': n_samples / max(n_vars, 1),
'avg_cardinality': cardinalities.mean(),
'max_cardinality': cardinalities.max(),
'min_cardinality': cardinalities.min(),
}
def _info_theory_features(df):
"""Tier 2: Information-theoretic features."""
n, p = df.shape
# Per-variable entropy
entropies = []
for col in df.columns:
vc = df[col].value_counts(normalize=True)
entropies.append(entropy(vc))
entropies = np.array(entropies)
# Pairwise mutual information (subsample if too many pairs)
cols = list(range(p))
all_pairs = list(combinations(cols, 2))
# Limit pairs for large datasets
max_pairs = min(len(all_pairs), 500)
if len(all_pairs) > max_pairs:
rng = np.random.RandomState(42)
pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False)
pairs = [all_pairs[i] for i in pair_indices]
else:
pairs = all_pairs
mis = []
norm_mis = []
for i, j in pairs:
mi = _mutual_information(df.iloc[:, i].values, df.iloc[:, j].values)
mis.append(mi)
# Normalized MI
denom = np.sqrt(entropies[i] * entropies[j])
if denom > 1e-10:
norm_mis.append(mi / denom)
else:
norm_mis.append(0.0)
mis = np.array(mis)
norm_mis = np.array(norm_mis)
return {
'mean_entropy': entropies.mean(),
'std_entropy': entropies.std(),
'max_entropy': entropies.max(),
'mean_pairwise_MI': mis.mean(),
'std_pairwise_MI': mis.std(),
'max_pairwise_MI': mis.max(),
'mean_normalized_MI': norm_mis.mean(),
'frac_high_MI_pairs': (mis > 0.05).mean(), # threshold for "meaningful" MI
}
def _dependency_features(df, alpha=0.05):
"""Tier 3: Dependency structure features via chi-squared tests."""
n, p = df.shape
cols = list(range(p))
all_pairs = list(combinations(cols, 2))
# Limit pairs
max_pairs = min(len(all_pairs), 500)
if len(all_pairs) > max_pairs:
rng = np.random.RandomState(42)
pair_indices = rng.choice(len(all_pairs), max_pairs, replace=False)
pairs = [all_pairs[i] for i in pair_indices]
else:
pairs = all_pairs
chi2_stats = []
pvals = []
cramers_vs = []
for i, j in pairs:
try:
ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j])
if ct.shape[0] < 2 or ct.shape[1] < 2:
continue
chi2, pval, dof, expected = chi2_contingency(ct)
chi2_stats.append(chi2)
pvals.append(pval)
# Cramér's V
min_dim = min(ct.shape[0], ct.shape[1]) - 1
if min_dim > 0 and n > 0:
v = np.sqrt(chi2 / (n * min_dim))
cramers_vs.append(v)
except Exception:
continue
chi2_stats = np.array(chi2_stats) if chi2_stats else np.array([0.0])
pvals = np.array(pvals) if pvals else np.array([1.0])
cramers_vs = np.array(cramers_vs) if cramers_vs else np.array([0.0])
return {
'density_proxy': (pvals < alpha).mean(),
'mean_chi2_stat': chi2_stats.mean(),
'std_chi2_stat': chi2_stats.std(),
'max_chi2_stat': chi2_stats.max(),
'mean_cramers_v': cramers_vs.mean(),
'max_cramers_v': cramers_vs.max(),
'frac_weak_deps': (cramers_vs < 0.1).mean(),
'frac_strong_deps': (cramers_vs > 0.3).mean(),
}
def _ci_probe_features(df, n_probes=100, alpha=0.05):
"""Tier 4: Conditional independence test landmark probes.
Sample random (X, Y, Z) triplets:
- Test X ⊥ Y (marginal)
- Test X ⊥ Y | Z (conditional)
Summarize test statistics.
"""
n, p = df.shape
if p < 3:
return {
'mean_pval_marginal': 0.5,
'frac_dep_marginal': 0.5,
'mean_pval_conditional': 0.5,
'frac_dep_conditional': 0.5,
'v_structure_proxy': 0.0,
'faithfulness_proxy': 0.0,
}
rng = np.random.RandomState(42)
n_probes = min(n_probes, p * (p - 1) * (p - 2) // 6) # cap at actual triplets
pvals_marginal = []
pvals_conditional = []
for _ in range(n_probes):
try:
idxs = rng.choice(p, size=3, replace=False)
i, j, k = idxs
# Marginal test: X ⊥ Y
ct = pd.crosstab(df.iloc[:, i], df.iloc[:, j])
if ct.shape[0] >= 2 and ct.shape[1] >= 2:
_, pval, _, _ = chi2_contingency(ct)
pvals_marginal.append(pval)
# Conditional test: X ⊥ Y | Z
# Stratify by Z values
z_vals = df.iloc[:, k].unique()
cond_pvals = []
for z_val in z_vals:
mask = df.iloc[:, k] == z_val
if mask.sum() < 5:
continue
ct_cond = pd.crosstab(df.iloc[:, i][mask], df.iloc[:, j][mask])
if ct_cond.shape[0] >= 2 and ct_cond.shape[1] >= 2:
try:
_, pval_c, _, _ = chi2_contingency(ct_cond)
cond_pvals.append(pval_c)
except Exception:
pass
if cond_pvals:
# Use Fisher's method or mean p-value
pvals_conditional.append(np.mean(cond_pvals))
except Exception:
continue
pvals_marginal = np.array(pvals_marginal) if pvals_marginal else np.array([0.5])
pvals_conditional = np.array(pvals_conditional) if pvals_conditional else np.array([0.5])
frac_dep_m = (pvals_marginal < alpha).mean()
frac_dep_c = (pvals_conditional < alpha).mean()
return {
'mean_pval_marginal': pvals_marginal.mean(),
'frac_dep_marginal': frac_dep_m,
'mean_pval_conditional': pvals_conditional.mean(),
'frac_dep_conditional': frac_dep_c,
'v_structure_proxy': frac_dep_m - frac_dep_c, # v-structures weaken conditional deps
'faithfulness_proxy': abs(frac_dep_m - frac_dep_c), # divergence between marginal/conditional
}
def _distribution_features(df):
"""Tier 5: Distribution shape features."""
n, p = df.shape
mode_freqs = []
balance_scores = []
cardinalities = []
for col in df.columns:
vc = df[col].value_counts(normalize=True)
mode_freqs.append(vc.iloc[0]) # frequency of most common value
card = len(vc)
cardinalities.append(card)
# Balance: entropy / log(cardinality) — 1.0 = perfectly uniform
if card > 1:
h = entropy(vc)
max_h = np.log(card)
balance_scores.append(h / max_h if max_h > 0 else 0)
else:
balance_scores.append(0.0)
mode_freqs = np.array(mode_freqs)
balance_scores = np.array(balance_scores)
cardinalities = np.array(cardinalities)
return {
'mean_mode_frequency': mode_freqs.mean(),
'std_mode_frequency': mode_freqs.std(),
'mean_balance': balance_scores.mean(),
'uniformity_score': balance_scores.mean(), # alias
'frac_binary_vars': (cardinalities == 2).mean(),
'frac_high_card_vars': (cardinalities > 5).mean(),
}
def _mutual_information(x, y):
"""Compute mutual information between two discrete arrays."""
# Joint distribution
from collections import Counter
n = len(x)
joint = Counter(zip(x, y))
marginal_x = Counter(x)
marginal_y = Counter(y)
mi = 0.0
for (xi, yi), count in joint.items():
p_xy = count / n
p_x = marginal_x[xi] / n
p_y = marginal_y[yi] / n
if p_xy > 0 and p_x > 0 and p_y > 0:
mi += p_xy * np.log(p_xy / (p_x * p_y))
return max(mi, 0.0)
# Feature names for consistent ordering
FEATURE_NAMES = [
# Tier 1: Basic
'n_samples', 'n_variables', 'n_over_p', 'avg_cardinality', 'max_cardinality', 'min_cardinality',
# Tier 2: Info-theoretic
'mean_entropy', 'std_entropy', 'max_entropy', 'mean_pairwise_MI', 'std_pairwise_MI',
'max_pairwise_MI', 'mean_normalized_MI', 'frac_high_MI_pairs',
# Tier 3: Dependency
'density_proxy', 'mean_chi2_stat', 'std_chi2_stat', 'max_chi2_stat',
'mean_cramers_v', 'max_cramers_v', 'frac_weak_deps', 'frac_strong_deps',
# Tier 4: CI probes
'mean_pval_marginal', 'frac_dep_marginal', 'mean_pval_conditional',
'frac_dep_conditional', 'v_structure_proxy', 'faithfulness_proxy',
# Tier 5: Distribution
'mean_mode_frequency', 'std_mode_frequency', 'mean_balance', 'uniformity_score',
'frac_binary_vars', 'frac_high_card_vars',
]
def features_to_vector(features_dict):
"""Convert feature dict to ordered numpy vector."""
return np.array([features_dict.get(name, 0.0) for name in FEATURE_NAMES])
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
from causal_selection.data.generator import load_bn_model, sample_dataset
model = load_bn_model('asia')
df = sample_dataset(model, 1000, seed=0)
print(f"Extracting features from ASIA (N=1000)...")
features = extract_all_features(df)
for name in FEATURE_NAMES:
val = features.get(name, 'MISSING')
if isinstance(val, float):
print(f" {name:30s}: {val:10.4f}")
else:
print(f" {name:30s}: {val}")
print(f"\nTotal features: {len(FEATURE_NAMES)}")