|
|
""" |
|
|
Stochastic Differential Equation (SDE) Solvers |
|
|
|
|
|
Implements numerical methods for SDEs: |
|
|
dx_t = f(x_t, t)dt + g(x_t, t)dW_t |
|
|
|
|
|
where W_t is a Wiener process (Brownian motion). |
|
|
|
|
|
Methods: |
|
|
- Euler-Maruyama (order 0.5 strong convergence) |
|
|
- Milstein (order 1.0 strong convergence) |
|
|
- Runge-Kutta for SDEs |
|
|
- Jump-diffusion processes (Merton, Kou) |
|
|
|
|
|
Applications: |
|
|
- Continuous-time geopolitical dynamics |
|
|
- Financial contagion models |
|
|
- Regime transitions with stochastic shocks |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Callable, Optional, Tuple, List, Dict, Any |
|
|
from dataclasses import dataclass |
|
|
from scipy.stats import poisson, norm |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SDESolution: |
|
|
""" |
|
|
Solution to an SDE. |
|
|
|
|
|
Attributes |
|
|
---------- |
|
|
t : np.ndarray |
|
|
Time points |
|
|
x : np.ndarray |
|
|
State trajectories |
|
|
method : str |
|
|
Integration method used |
|
|
""" |
|
|
t: np.ndarray |
|
|
x: np.ndarray |
|
|
method: str |
|
|
|
|
|
|
|
|
class SDESolver: |
|
|
""" |
|
|
Base class for SDE solvers. |
|
|
|
|
|
Solves: dx_t = f(x_t, t)dt + g(x_t, t)dW_t |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
drift: Callable[[np.ndarray, float], np.ndarray], |
|
|
diffusion: Callable[[np.ndarray, float], np.ndarray], |
|
|
x0: np.ndarray, |
|
|
t0: float = 0.0 |
|
|
): |
|
|
""" |
|
|
Initialize SDE solver. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
drift : callable |
|
|
Drift function f(x, t) |
|
|
diffusion : callable |
|
|
Diffusion function g(x, t) |
|
|
x0 : np.ndarray |
|
|
Initial condition |
|
|
t0 : float |
|
|
Initial time |
|
|
""" |
|
|
self.drift = drift |
|
|
self.diffusion = diffusion |
|
|
self.x0 = np.asarray(x0) |
|
|
self.t0 = t0 |
|
|
self.dim = len(self.x0) |
|
|
|
|
|
def integrate( |
|
|
self, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> SDESolution: |
|
|
""" |
|
|
Integrate SDE. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of sample paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
SDESolution |
|
|
Solution object |
|
|
""" |
|
|
raise NotImplementedError("Subclasses must implement integrate()") |
|
|
|
|
|
|
|
|
class EulerMaruyama(SDESolver): |
|
|
""" |
|
|
Euler-Maruyama method for SDEs. |
|
|
|
|
|
Simplest method with order 0.5 strong convergence. |
|
|
|
|
|
x_{n+1} = x_n + f(x_n, t_n)Δt + g(x_n, t_n)ΔW_n |
|
|
|
|
|
where ΔW_n ~ N(0, Δt) |
|
|
""" |
|
|
|
|
|
def integrate( |
|
|
self, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> SDESolution: |
|
|
""" |
|
|
Integrate using Euler-Maruyama. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths to simulate |
|
|
|
|
|
Returns |
|
|
------- |
|
|
SDESolution |
|
|
Solution |
|
|
""" |
|
|
|
|
|
n_steps = int((T - self.t0) / dt) |
|
|
t = np.linspace(self.t0, T, n_steps + 1) |
|
|
|
|
|
|
|
|
x = np.zeros((n_paths, n_steps + 1, self.dim)) |
|
|
x[:, 0, :] = self.x0 |
|
|
|
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
|
|
|
for i in range(n_steps): |
|
|
t_current = t[i] |
|
|
|
|
|
for path in range(n_paths): |
|
|
x_current = x[path, i, :] |
|
|
|
|
|
|
|
|
drift_term = self.drift(x_current, t_current) * dt |
|
|
|
|
|
|
|
|
dW = np.random.randn(self.dim) * sqrt_dt |
|
|
diffusion_term = self.diffusion(x_current, t_current) * dW |
|
|
|
|
|
|
|
|
x[path, i + 1, :] = x_current + drift_term + diffusion_term |
|
|
|
|
|
return SDESolution(t=t, x=x, method='euler_maruyama') |
|
|
|
|
|
|
|
|
class Milstein(SDESolver): |
|
|
""" |
|
|
Milstein method for SDEs. |
|
|
|
|
|
Higher-order method with order 1.0 strong convergence. |
|
|
Requires derivative of diffusion term. |
|
|
|
|
|
x_{n+1} = x_n + f(x_n)Δt + g(x_n)ΔW_n |
|
|
+ 0.5 * g(x_n) * g'(x_n) * ((ΔW_n)^2 - Δt) |
|
|
|
|
|
where g'(x) = ∂g/∂x |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
drift: Callable, |
|
|
diffusion: Callable, |
|
|
diffusion_derivative: Callable, |
|
|
x0: np.ndarray, |
|
|
t0: float = 0.0 |
|
|
): |
|
|
""" |
|
|
Initialize Milstein solver. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
drift : callable |
|
|
Drift function |
|
|
diffusion : callable |
|
|
Diffusion function |
|
|
diffusion_derivative : callable |
|
|
Derivative of diffusion: ∂g/∂x |
|
|
x0 : np.ndarray |
|
|
Initial condition |
|
|
t0 : float |
|
|
Initial time |
|
|
""" |
|
|
super().__init__(drift, diffusion, x0, t0) |
|
|
self.diffusion_derivative = diffusion_derivative |
|
|
|
|
|
def integrate( |
|
|
self, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> SDESolution: |
|
|
""" |
|
|
Integrate using Milstein method. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
SDESolution |
|
|
Solution |
|
|
""" |
|
|
n_steps = int((T - self.t0) / dt) |
|
|
t = np.linspace(self.t0, T, n_steps + 1) |
|
|
|
|
|
x = np.zeros((n_paths, n_steps + 1, self.dim)) |
|
|
x[:, 0, :] = self.x0 |
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
for i in range(n_steps): |
|
|
t_current = t[i] |
|
|
|
|
|
for path in range(n_paths): |
|
|
x_current = x[path, i, :] |
|
|
|
|
|
|
|
|
drift_term = self.drift(x_current, t_current) * dt |
|
|
|
|
|
|
|
|
dW = np.random.randn(self.dim) * sqrt_dt |
|
|
g = self.diffusion(x_current, t_current) |
|
|
diffusion_term = g * dW |
|
|
|
|
|
|
|
|
g_prime = self.diffusion_derivative(x_current, t_current) |
|
|
correction = 0.5 * g * g_prime * ((dW**2) - dt) |
|
|
|
|
|
|
|
|
x[path, i + 1, :] = x_current + drift_term + diffusion_term + correction |
|
|
|
|
|
return SDESolution(t=t, x=x, method='milstein') |
|
|
|
|
|
|
|
|
class StochasticRungeKutta(SDESolver): |
|
|
""" |
|
|
Stochastic Runge-Kutta method. |
|
|
|
|
|
Higher-order method for SDEs with better accuracy. |
|
|
""" |
|
|
|
|
|
def integrate( |
|
|
self, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> SDESolution: |
|
|
""" |
|
|
Integrate using stochastic Runge-Kutta. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
SDESolution |
|
|
Solution |
|
|
""" |
|
|
n_steps = int((T - self.t0) / dt) |
|
|
t = np.linspace(self.t0, T, n_steps + 1) |
|
|
|
|
|
x = np.zeros((n_paths, n_steps + 1, self.dim)) |
|
|
x[:, 0, :] = self.x0 |
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
for i in range(n_steps): |
|
|
t_current = t[i] |
|
|
|
|
|
for path in range(n_paths): |
|
|
x_current = x[path, i, :] |
|
|
|
|
|
|
|
|
dW = np.random.randn(self.dim) * sqrt_dt |
|
|
|
|
|
|
|
|
k1_drift = self.drift(x_current, t_current) |
|
|
k1_diff = self.diffusion(x_current, t_current) |
|
|
|
|
|
|
|
|
x_pred = x_current + k1_drift * dt + k1_diff * dW |
|
|
|
|
|
k2_drift = self.drift(x_pred, t_current + dt) |
|
|
k2_diff = self.diffusion(x_pred, t_current + dt) |
|
|
|
|
|
|
|
|
drift_term = 0.5 * (k1_drift + k2_drift) * dt |
|
|
diffusion_term = 0.5 * (k1_diff + k2_diff) * dW |
|
|
|
|
|
x[path, i + 1, :] = x_current + drift_term + diffusion_term |
|
|
|
|
|
return SDESolution(t=t, x=x, method='stochastic_rk') |
|
|
|
|
|
|
|
|
class JumpDiffusionProcess: |
|
|
""" |
|
|
Jump-diffusion process (Merton model). |
|
|
|
|
|
Combines continuous diffusion with discrete jumps: |
|
|
dx_t = μ x_t dt + σ x_t dW_t + x_t dJ_t |
|
|
|
|
|
where J_t is a compound Poisson process: |
|
|
- Jumps occur with intensity λ |
|
|
- Jump sizes Y ~ N(μ_J, σ_J^2) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
drift: float, |
|
|
diffusion: float, |
|
|
jump_intensity: float, |
|
|
jump_mean: float, |
|
|
jump_std: float, |
|
|
x0: np.ndarray |
|
|
): |
|
|
""" |
|
|
Initialize jump-diffusion process. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
drift : float |
|
|
Drift coefficient μ |
|
|
diffusion : float |
|
|
Diffusion coefficient σ |
|
|
jump_intensity : float |
|
|
Jump intensity λ (expected number of jumps per unit time) |
|
|
jump_mean : float |
|
|
Mean jump size (log-normal) |
|
|
jump_std : float |
|
|
Jump size standard deviation |
|
|
x0 : np.ndarray |
|
|
Initial condition |
|
|
""" |
|
|
self.drift = drift |
|
|
self.diffusion = diffusion |
|
|
self.jump_intensity = jump_intensity |
|
|
self.jump_mean = jump_mean |
|
|
self.jump_std = jump_std |
|
|
self.x0 = np.asarray(x0) |
|
|
self.dim = len(self.x0) |
|
|
|
|
|
def simulate( |
|
|
self, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> SDESolution: |
|
|
""" |
|
|
Simulate jump-diffusion paths. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
SDESolution |
|
|
Solution |
|
|
""" |
|
|
n_steps = int(T / dt) |
|
|
t = np.linspace(0, T, n_steps + 1) |
|
|
|
|
|
x = np.zeros((n_paths, n_steps + 1, self.dim)) |
|
|
x[:, 0, :] = self.x0 |
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
for i in range(n_steps): |
|
|
for path in range(n_paths): |
|
|
x_current = x[path, i, :] |
|
|
|
|
|
|
|
|
dW = np.random.randn(self.dim) * sqrt_dt |
|
|
continuous = self.drift * x_current * dt + self.diffusion * x_current * dW |
|
|
|
|
|
|
|
|
n_jumps = poisson.rvs(self.jump_intensity * dt) |
|
|
|
|
|
jump_total = 0.0 |
|
|
if n_jumps > 0: |
|
|
|
|
|
jump_sizes = norm.rvs( |
|
|
loc=self.jump_mean, |
|
|
scale=self.jump_std, |
|
|
size=n_jumps |
|
|
) |
|
|
|
|
|
jump_total = x_current * np.sum(np.exp(jump_sizes) - 1) |
|
|
|
|
|
|
|
|
x[path, i + 1, :] = x_current + continuous + jump_total |
|
|
|
|
|
|
|
|
x[path, i + 1, :] = np.maximum(x[path, i + 1, :], 0) |
|
|
|
|
|
return SDESolution(t=t, x=x, method='jump_diffusion') |
|
|
|
|
|
|
|
|
class GeopoliticalSDE: |
|
|
""" |
|
|
Geopolitical system as continuous-time SDE. |
|
|
|
|
|
Models geopolitical variables as SDEs with: |
|
|
- Continuous dynamics (drift + diffusion) |
|
|
- Discrete shocks (jumps) |
|
|
- Regime-dependent parameters |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
variable_names: List[str], |
|
|
drift_functions: Dict[str, Callable], |
|
|
diffusion_functions: Dict[str, Callable], |
|
|
jump_intensities: Optional[Dict[str, float]] = None |
|
|
): |
|
|
""" |
|
|
Initialize geopolitical SDE. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
variable_names : list |
|
|
Names of state variables |
|
|
drift_functions : dict |
|
|
Drift function for each variable |
|
|
diffusion_functions : dict |
|
|
Diffusion function for each variable |
|
|
jump_intensities : dict, optional |
|
|
Jump intensities for discrete shocks |
|
|
""" |
|
|
self.variable_names = variable_names |
|
|
self.drift_functions = drift_functions |
|
|
self.diffusion_functions = diffusion_functions |
|
|
self.jump_intensities = jump_intensities or {} |
|
|
self.dim = len(variable_names) |
|
|
|
|
|
def simulate( |
|
|
self, |
|
|
x0: Dict[str, float], |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Simulate geopolitical dynamics. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
x0 : dict |
|
|
Initial conditions {variable: value} |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
Simulated trajectories {variable: array} |
|
|
""" |
|
|
|
|
|
x0_array = np.array([x0[var] for var in self.variable_names]) |
|
|
|
|
|
|
|
|
n_steps = int(T / dt) |
|
|
t = np.linspace(0, T, n_steps + 1) |
|
|
|
|
|
|
|
|
trajectories = {var: np.zeros((n_paths, n_steps + 1)) for var in self.variable_names} |
|
|
|
|
|
|
|
|
for i, var in enumerate(self.variable_names): |
|
|
trajectories[var][:, 0] = x0_array[i] |
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
|
|
|
for step in range(n_steps): |
|
|
t_current = t[step] |
|
|
|
|
|
for path in range(n_paths): |
|
|
|
|
|
x_current = { |
|
|
var: trajectories[var][path, step] |
|
|
for var in self.variable_names |
|
|
} |
|
|
|
|
|
|
|
|
for i, var in enumerate(self.variable_names): |
|
|
|
|
|
drift = self.drift_functions[var](x_current, t_current) * dt |
|
|
|
|
|
|
|
|
dW = np.random.randn() * sqrt_dt |
|
|
diffusion = self.diffusion_functions[var](x_current, t_current) * dW |
|
|
|
|
|
|
|
|
jump = 0.0 |
|
|
if var in self.jump_intensities: |
|
|
n_jumps = poisson.rvs(self.jump_intensities[var] * dt) |
|
|
if n_jumps > 0: |
|
|
|
|
|
jump = np.random.normal(0, 0.1) * n_jumps |
|
|
|
|
|
|
|
|
new_value = x_current[var] + drift + diffusion + jump |
|
|
|
|
|
|
|
|
new_value = np.clip(new_value, 0, 1) |
|
|
|
|
|
trajectories[var][path, step + 1] = new_value |
|
|
|
|
|
return trajectories |
|
|
|
|
|
|
|
|
def ornstein_uhlenbeck_process( |
|
|
theta: float, |
|
|
mu: float, |
|
|
sigma: float, |
|
|
x0: float, |
|
|
T: float, |
|
|
dt: float, |
|
|
n_paths: int = 1 |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Simulate Ornstein-Uhlenbeck process (mean-reverting). |
|
|
|
|
|
dx_t = θ(μ - x_t)dt + σ dW_t |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
theta : float |
|
|
Mean reversion speed |
|
|
mu : float |
|
|
Long-term mean |
|
|
sigma : float |
|
|
Volatility |
|
|
x0 : float |
|
|
Initial value |
|
|
T : float |
|
|
Final time |
|
|
dt : float |
|
|
Time step |
|
|
n_paths : int |
|
|
Number of paths |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
(time_grid, paths) |
|
|
""" |
|
|
n_steps = int(T / dt) |
|
|
t = np.linspace(0, T, n_steps + 1) |
|
|
x = np.zeros((n_paths, n_steps + 1)) |
|
|
x[:, 0] = x0 |
|
|
|
|
|
sqrt_dt = np.sqrt(dt) |
|
|
|
|
|
for i in range(n_steps): |
|
|
dW = np.random.randn(n_paths) * sqrt_dt |
|
|
x[:, i + 1] = x[:, i] + theta * (mu - x[:, i]) * dt + sigma * dW |
|
|
|
|
|
return t, x |
|
|
|