|
|
""" |
|
|
Structural Causal Models for GeoBotv1 |
|
|
|
|
|
Implements Structural Causal Models (SCMs) for geopolitical analysis, |
|
|
intervention simulation, and counterfactual reasoning. Integrates with |
|
|
GeoBot 2.0 analytical framework. |
|
|
""" |
|
|
|
|
|
from dataclasses import dataclass, field |
|
|
from typing import Dict, List, Any, Optional, Callable, Set, Tuple |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
import networkx as nx |
|
|
|
|
|
|
|
|
class IdentificationStrategy(Enum): |
|
|
"""Strategies for identifying causal effects.""" |
|
|
BACKDOOR_ADJUSTMENT = "backdoor" |
|
|
FRONTDOOR_ADJUSTMENT = "frontdoor" |
|
|
INSTRUMENTAL_VARIABLES = "iv" |
|
|
DO_CALCULUS = "do_calculus" |
|
|
STRUCTURAL_EQUATIONS = "structural" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StructuralEquation: |
|
|
""" |
|
|
Structural equation for a variable in SCM. |
|
|
|
|
|
X := f(Pa_X, U_X) |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
variable : str |
|
|
Variable name |
|
|
parents : List[str] |
|
|
Parent variables in causal graph |
|
|
function : Callable |
|
|
Structural function f |
|
|
noise_dist : Callable |
|
|
Distribution of exogenous noise U_X |
|
|
description : str |
|
|
Description of equation |
|
|
""" |
|
|
variable: str |
|
|
parents: List[str] |
|
|
function: Callable[[Dict[str, float]], float] |
|
|
noise_dist: Callable[[int], np.ndarray] |
|
|
description: str = "" |
|
|
|
|
|
def evaluate(self, parent_values: Dict[str, float], noise: Optional[float] = None) -> float: |
|
|
""" |
|
|
Evaluate structural equation. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
parent_values : Dict[str, float] |
|
|
Values of parent variables |
|
|
noise : Optional[float] |
|
|
Noise value (if None, sample from distribution) |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
Value of variable |
|
|
""" |
|
|
if noise is None: |
|
|
noise = self.noise_dist(1)[0] |
|
|
|
|
|
return self.function(parent_values) + noise |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Intervention: |
|
|
""" |
|
|
Causal intervention do(X = x). |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
variable : str |
|
|
Variable being intervened on |
|
|
value : float |
|
|
Value set by intervention |
|
|
description : str |
|
|
Description of intervention |
|
|
""" |
|
|
variable: str |
|
|
value: float |
|
|
description: str = "" |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return f"do({self.variable} = {self.value})" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Counterfactual: |
|
|
""" |
|
|
Counterfactual query. |
|
|
|
|
|
"What would Y be if we had done X = x, given that we observed Z = z?" |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
query_variable : str |
|
|
Variable being queried |
|
|
intervention : Intervention |
|
|
Counterfactual intervention |
|
|
observations : Dict[str, float] |
|
|
Observed values |
|
|
""" |
|
|
query_variable: str |
|
|
intervention: Intervention |
|
|
observations: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
obs_str = ", ".join([f"{k}={v}" for k, v in self.observations.items()]) |
|
|
return f"{self.query_variable}_{{{self.intervention}}} | {obs_str}" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CausalEffect: |
|
|
""" |
|
|
Estimated causal effect. |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
treatment : str |
|
|
Treatment variable |
|
|
outcome : str |
|
|
Outcome variable |
|
|
effect : float |
|
|
Estimated average causal effect |
|
|
std_error : Optional[float] |
|
|
Standard error of estimate |
|
|
confidence_interval : Optional[Tuple[float, float]] |
|
|
Confidence interval |
|
|
identification_strategy : IdentificationStrategy |
|
|
How effect was identified |
|
|
""" |
|
|
treatment: str |
|
|
outcome: str |
|
|
effect: float |
|
|
std_error: Optional[float] = None |
|
|
confidence_interval: Optional[Tuple[float, float]] = None |
|
|
identification_strategy: Optional[IdentificationStrategy] = None |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
ci_str = "" |
|
|
if self.confidence_interval: |
|
|
ci_str = f", 95% CI: {self.confidence_interval}" |
|
|
return f"ACE({self.treatment} → {self.outcome}) = {self.effect:.3f}{ci_str}" |
|
|
|
|
|
|
|
|
class StructuralCausalModel: |
|
|
""" |
|
|
Structural Causal Model for geopolitical analysis. |
|
|
|
|
|
An SCM consists of: |
|
|
1. Causal graph G (DAG) |
|
|
2. Structural equations for each variable |
|
|
3. Exogenous noise distributions |
|
|
|
|
|
Enables: |
|
|
- Intervention simulation (do-operator) |
|
|
- Counterfactual reasoning |
|
|
- Causal effect identification |
|
|
""" |
|
|
|
|
|
def __init__(self, name: str = "GeopoliticalSCM"): |
|
|
""" |
|
|
Initialize SCM. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
name : str |
|
|
Name of SCM |
|
|
""" |
|
|
self.name = name |
|
|
self.graph = nx.DiGraph() |
|
|
self.equations: Dict[str, StructuralEquation] = {} |
|
|
self.exogenous_variables: Set[str] = set() |
|
|
|
|
|
def add_equation(self, equation: StructuralEquation) -> None: |
|
|
""" |
|
|
Add structural equation to model. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
equation : StructuralEquation |
|
|
Structural equation |
|
|
""" |
|
|
self.equations[equation.variable] = equation |
|
|
|
|
|
|
|
|
self.graph.add_node(equation.variable) |
|
|
for parent in equation.parents: |
|
|
self.graph.add_edge(parent, equation.variable) |
|
|
|
|
|
def add_exogenous(self, variable: str, distribution: Callable[[int], np.ndarray]) -> None: |
|
|
""" |
|
|
Add exogenous variable. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
variable : str |
|
|
Variable name |
|
|
distribution : Callable |
|
|
Distribution for sampling |
|
|
""" |
|
|
self.exogenous_variables.add(variable) |
|
|
self.graph.add_node(variable) |
|
|
|
|
|
def topological_order(self) -> List[str]: |
|
|
""" |
|
|
Get topological ordering of variables. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
List[str] |
|
|
Topologically sorted variables |
|
|
""" |
|
|
try: |
|
|
return list(nx.topological_sort(self.graph)) |
|
|
except nx.NetworkXError: |
|
|
raise ValueError("Graph contains cycles - not a valid DAG") |
|
|
|
|
|
def simulate( |
|
|
self, |
|
|
n_samples: int = 1000, |
|
|
interventions: Optional[List[Intervention]] = None, |
|
|
random_state: Optional[int] = None |
|
|
) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Simulate from SCM. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
n_samples : int |
|
|
Number of samples |
|
|
interventions : Optional[List[Intervention]] |
|
|
Interventions to apply |
|
|
random_state : Optional[int] |
|
|
Random seed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, np.ndarray] |
|
|
Simulated data for each variable |
|
|
""" |
|
|
if random_state is not None: |
|
|
np.random.seed(random_state) |
|
|
|
|
|
|
|
|
intervention_dict = {} |
|
|
if interventions: |
|
|
intervention_dict = {iv.variable: iv.value for iv in interventions} |
|
|
|
|
|
|
|
|
data = {} |
|
|
|
|
|
|
|
|
order = self.topological_order() |
|
|
|
|
|
|
|
|
for var in order: |
|
|
if var in intervention_dict: |
|
|
|
|
|
data[var] = np.full(n_samples, intervention_dict[var]) |
|
|
|
|
|
elif var in self.exogenous_variables: |
|
|
|
|
|
|
|
|
data[var] = np.random.randn(n_samples) |
|
|
|
|
|
elif var in self.equations: |
|
|
|
|
|
eq = self.equations[var] |
|
|
values = np.zeros(n_samples) |
|
|
|
|
|
for i in range(n_samples): |
|
|
parent_vals = {p: data[p][i] for p in eq.parents} |
|
|
values[i] = eq.evaluate(parent_vals) |
|
|
|
|
|
data[var] = values |
|
|
|
|
|
else: |
|
|
raise ValueError(f"No equation for variable {var}") |
|
|
|
|
|
return data |
|
|
|
|
|
def intervene( |
|
|
self, |
|
|
interventions: List[Intervention], |
|
|
n_samples: int = 1000, |
|
|
random_state: Optional[int] = None |
|
|
) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Simulate interventions using do-operator. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
interventions : List[Intervention] |
|
|
Interventions to apply |
|
|
n_samples : int |
|
|
Number of samples |
|
|
random_state : Optional[int] |
|
|
Random seed |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, np.ndarray] |
|
|
Post-intervention data |
|
|
""" |
|
|
return self.simulate(n_samples, interventions, random_state) |
|
|
|
|
|
def estimate_causal_effect( |
|
|
self, |
|
|
treatment: str, |
|
|
outcome: str, |
|
|
n_samples: int = 10000, |
|
|
treatment_values: Optional[List[float]] = None |
|
|
) -> CausalEffect: |
|
|
""" |
|
|
Estimate average causal effect of treatment on outcome. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
treatment : str |
|
|
Treatment variable |
|
|
outcome : str |
|
|
Outcome variable |
|
|
n_samples : int |
|
|
Number of simulation samples |
|
|
treatment_values : Optional[List[float]] |
|
|
Treatment values to compare (default [0, 1]) |
|
|
|
|
|
Returns |
|
|
------- |
|
|
CausalEffect |
|
|
Estimated causal effect |
|
|
""" |
|
|
if treatment_values is None: |
|
|
treatment_values = [0.0, 1.0] |
|
|
|
|
|
|
|
|
outcomes = [] |
|
|
for t_val in treatment_values: |
|
|
intervention = Intervention(variable=treatment, value=t_val) |
|
|
data = self.intervene([intervention], n_samples) |
|
|
outcomes.append(np.mean(data[outcome])) |
|
|
|
|
|
|
|
|
ace = outcomes[1] - outcomes[0] |
|
|
|
|
|
|
|
|
bootstrap_effects = [] |
|
|
for _ in range(100): |
|
|
boot_outcomes = [] |
|
|
for t_val in treatment_values: |
|
|
intervention = Intervention(variable=treatment, value=t_val) |
|
|
data = self.intervene([intervention], n_samples=1000) |
|
|
boot_outcomes.append(np.mean(data[outcome])) |
|
|
bootstrap_effects.append(boot_outcomes[1] - boot_outcomes[0]) |
|
|
|
|
|
std_error = np.std(bootstrap_effects) |
|
|
ci = ( |
|
|
ace - 1.96 * std_error, |
|
|
ace + 1.96 * std_error |
|
|
) |
|
|
|
|
|
return CausalEffect( |
|
|
treatment=treatment, |
|
|
outcome=outcome, |
|
|
effect=ace, |
|
|
std_error=std_error, |
|
|
confidence_interval=ci, |
|
|
identification_strategy=IdentificationStrategy.STRUCTURAL_EQUATIONS |
|
|
) |
|
|
|
|
|
def counterfactual_query( |
|
|
self, |
|
|
query: Counterfactual, |
|
|
n_samples: int = 10000 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Answer counterfactual query. |
|
|
|
|
|
Three-step process: |
|
|
1. Abduction: Infer exogenous variables from observations |
|
|
2. Action: Apply intervention |
|
|
3. Prediction: Compute outcome |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
query : Counterfactual |
|
|
Counterfactual query |
|
|
n_samples : int |
|
|
Number of samples for approximation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, Any] |
|
|
Counterfactual results |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data = self.intervene([query.intervention], n_samples) |
|
|
|
|
|
return { |
|
|
'query': str(query), |
|
|
'expected_value': float(np.mean(data[query.query_variable])), |
|
|
'std': float(np.std(data[query.query_variable])), |
|
|
'median': float(np.median(data[query.query_variable])), |
|
|
'quantiles': { |
|
|
'5%': float(np.quantile(data[query.query_variable], 0.05)), |
|
|
'25%': float(np.quantile(data[query.query_variable], 0.25)), |
|
|
'75%': float(np.quantile(data[query.query_variable], 0.75)), |
|
|
'95%': float(np.quantile(data[query.query_variable], 0.95)), |
|
|
} |
|
|
} |
|
|
|
|
|
def find_backdoor_paths(self, treatment: str, outcome: str) -> List[List[str]]: |
|
|
""" |
|
|
Find backdoor paths from treatment to outcome. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
treatment : str |
|
|
Treatment variable |
|
|
outcome : str |
|
|
Outcome variable |
|
|
|
|
|
Returns |
|
|
------- |
|
|
List[List[str]] |
|
|
List of backdoor paths |
|
|
""" |
|
|
|
|
|
undirected = self.graph.to_undirected() |
|
|
|
|
|
|
|
|
try: |
|
|
all_paths = list(nx.all_simple_paths(undirected, treatment, outcome)) |
|
|
except nx.NodeNotFound: |
|
|
return [] |
|
|
|
|
|
|
|
|
backdoor_paths = [] |
|
|
treatment_parents = set(self.graph.predecessors(treatment)) |
|
|
|
|
|
for path in all_paths: |
|
|
|
|
|
if len(path) > 1 and path[1] in treatment_parents: |
|
|
backdoor_paths.append(path) |
|
|
|
|
|
return backdoor_paths |
|
|
|
|
|
def find_backdoor_adjustment_set( |
|
|
self, |
|
|
treatment: str, |
|
|
outcome: str |
|
|
) -> Optional[Set[str]]: |
|
|
""" |
|
|
Find minimal backdoor adjustment set. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
treatment : str |
|
|
Treatment variable |
|
|
outcome : str |
|
|
Outcome variable |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Optional[Set[str]] |
|
|
Backdoor adjustment set, or None if no valid set exists |
|
|
""" |
|
|
backdoor_paths = self.find_backdoor_paths(treatment, outcome) |
|
|
|
|
|
if not backdoor_paths: |
|
|
return set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
candidates = set() |
|
|
for path in backdoor_paths: |
|
|
candidates.update(path[1:-1]) |
|
|
|
|
|
|
|
|
treatment_descendants = nx.descendants(self.graph, treatment) |
|
|
candidates -= treatment_descendants |
|
|
|
|
|
return candidates |
|
|
|
|
|
def plot_graph(self, filename: Optional[str] = None) -> None: |
|
|
""" |
|
|
Plot causal graph. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
filename : Optional[str] |
|
|
File to save plot to |
|
|
""" |
|
|
try: |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
pos = nx.spring_layout(self.graph) |
|
|
nx.draw( |
|
|
self.graph, |
|
|
pos, |
|
|
with_labels=True, |
|
|
node_color='lightblue', |
|
|
node_size=1500, |
|
|
font_size=10, |
|
|
font_weight='bold', |
|
|
arrows=True, |
|
|
arrowsize=20 |
|
|
) |
|
|
|
|
|
if filename: |
|
|
plt.savefig(filename) |
|
|
else: |
|
|
plt.show() |
|
|
|
|
|
except ImportError: |
|
|
print("matplotlib not available for plotting") |
|
|
|
|
|
|
|
|
def estimate_causal_effect( |
|
|
scm: StructuralCausalModel, |
|
|
treatment: str, |
|
|
outcome: str, |
|
|
adjustment_set: Optional[Set[str]] = None, |
|
|
n_samples: int = 10000 |
|
|
) -> CausalEffect: |
|
|
""" |
|
|
Estimate causal effect using appropriate identification strategy. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
scm : StructuralCausalModel |
|
|
Structural causal model |
|
|
treatment : str |
|
|
Treatment variable |
|
|
outcome : str |
|
|
Outcome variable |
|
|
adjustment_set : Optional[Set[str]] |
|
|
Variables to adjust for (if None, use backdoor criterion) |
|
|
n_samples : int |
|
|
Number of samples |
|
|
|
|
|
Returns |
|
|
------- |
|
|
CausalEffect |
|
|
Estimated causal effect |
|
|
""" |
|
|
|
|
|
if adjustment_set is None: |
|
|
adjustment_set = scm.find_backdoor_adjustment_set(treatment, outcome) |
|
|
|
|
|
if adjustment_set is None: |
|
|
|
|
|
raise ValueError("Cannot identify causal effect - no valid adjustment set") |
|
|
|
|
|
|
|
|
return scm.estimate_causal_effect(treatment, outcome, n_samples) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_sanctions_scm() -> StructuralCausalModel: |
|
|
""" |
|
|
Create SCM for sanctions analysis. |
|
|
|
|
|
Variables: |
|
|
- sanctions: Binary sanctions imposed |
|
|
- trade_disruption: Trade flow disruption |
|
|
- economic_growth: Economic growth rate |
|
|
- regime_stability: Regime stability score |
|
|
|
|
|
Returns |
|
|
------- |
|
|
StructuralCausalModel |
|
|
Sanctions SCM |
|
|
""" |
|
|
scm = StructuralCausalModel(name="SanctionsSCM") |
|
|
|
|
|
|
|
|
noise_dist = lambda n: np.random.randn(n) * 0.1 |
|
|
|
|
|
|
|
|
scm.add_equation(StructuralEquation( |
|
|
variable="trade_disruption", |
|
|
parents=["sanctions"], |
|
|
function=lambda p: 0.7 * p["sanctions"], |
|
|
noise_dist=noise_dist, |
|
|
description="Sanctions directly reduce trade" |
|
|
)) |
|
|
|
|
|
|
|
|
scm.add_equation(StructuralEquation( |
|
|
variable="economic_growth", |
|
|
parents=["trade_disruption"], |
|
|
function=lambda p: 0.05 - 0.4 * p["trade_disruption"], |
|
|
noise_dist=noise_dist, |
|
|
description="Trade disruption reduces growth" |
|
|
)) |
|
|
|
|
|
|
|
|
scm.add_equation(StructuralEquation( |
|
|
variable="regime_stability", |
|
|
parents=["economic_growth"], |
|
|
function=lambda p: 0.7 + 0.5 * p["economic_growth"], |
|
|
noise_dist=noise_dist, |
|
|
description="Economic growth affects regime stability" |
|
|
)) |
|
|
|
|
|
return scm |
|
|
|
|
|
|
|
|
def create_conflict_escalation_scm() -> StructuralCausalModel: |
|
|
""" |
|
|
Create SCM for conflict escalation. |
|
|
|
|
|
Variables: |
|
|
- military_buildup: Military force buildup |
|
|
- diplomatic_tension: Diplomatic relations tension |
|
|
- conflict_risk: Risk of armed conflict |
|
|
|
|
|
Returns |
|
|
------- |
|
|
StructuralCausalModel |
|
|
Conflict escalation SCM |
|
|
""" |
|
|
scm = StructuralCausalModel(name="ConflictEscalationSCM") |
|
|
|
|
|
noise_dist = lambda n: np.random.randn(n) * 0.05 |
|
|
|
|
|
|
|
|
scm.add_equation(StructuralEquation( |
|
|
variable="diplomatic_tension", |
|
|
parents=["military_buildup"], |
|
|
function=lambda p: 0.3 + 0.6 * p["military_buildup"], |
|
|
noise_dist=noise_dist, |
|
|
description="Military buildup increases diplomatic tension" |
|
|
)) |
|
|
|
|
|
|
|
|
scm.add_equation(StructuralEquation( |
|
|
variable="conflict_risk", |
|
|
parents=["military_buildup", "diplomatic_tension"], |
|
|
function=lambda p: 0.1 + 0.4 * p["military_buildup"] + 0.3 * p["diplomatic_tension"], |
|
|
noise_dist=noise_dist, |
|
|
description="Both military buildup and tension increase conflict risk" |
|
|
)) |
|
|
|
|
|
return scm |
|
|
|