|
|
""" |
|
|
Hawkes Processes and Point Process Models for Conflict Modeling |
|
|
|
|
|
Hawkes processes are self-exciting point processes where past events increase |
|
|
the probability of future events. Critical for modeling: |
|
|
- Conflict contagion and escalation dynamics |
|
|
- Terrorist attack clustering |
|
|
- Diplomatic incident cascades |
|
|
- Arms race dynamics |
|
|
- Protest contagion |
|
|
|
|
|
Mathematical foundation: |
|
|
λ(t) = μ + ∫_{-∞}^t φ(t - s) dN(s) |
|
|
|
|
|
where: |
|
|
- λ(t): instantaneous event rate (intensity) |
|
|
- μ: baseline rate |
|
|
- φ(t): excitation kernel (how past events affect current rate) |
|
|
- N(s): counting process of past events |
|
|
|
|
|
Key concepts: |
|
|
- Branching ratio: Expected number of offspring events per parent |
|
|
- If branching ratio < 1: process is stable (subcritical) |
|
|
- If branching ratio ≥ 1: process is explosive (supercritical) |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from scipy import optimize, stats, integrate |
|
|
from typing import List, Tuple, Optional, Callable, Dict |
|
|
from dataclasses import dataclass |
|
|
import warnings |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class HawkesParameters: |
|
|
"""Parameters for a Hawkes process.""" |
|
|
mu: float |
|
|
alpha: float |
|
|
beta: float |
|
|
|
|
|
@property |
|
|
def branching_ratio(self) -> float: |
|
|
"""Expected number of offspring per event.""" |
|
|
return self.alpha / self.beta |
|
|
|
|
|
@property |
|
|
def is_stable(self) -> bool: |
|
|
"""Check if process is subcritical (stable).""" |
|
|
return self.branching_ratio < 1.0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class HawkesFitResult: |
|
|
"""Results from fitting a Hawkes process.""" |
|
|
params: HawkesParameters |
|
|
log_likelihood: float |
|
|
aic: float |
|
|
bic: float |
|
|
n_events: int |
|
|
time_span: float |
|
|
intensity_trace: Optional[np.ndarray] = None |
|
|
times: Optional[np.ndarray] = None |
|
|
|
|
|
|
|
|
class UnivariateHawkesProcess: |
|
|
""" |
|
|
Univariate (1-dimensional) Hawkes Process. |
|
|
|
|
|
Intensity function: |
|
|
λ(t) = μ + α ∑_{t_i < t} exp(-β(t - t_i)) |
|
|
|
|
|
This is a self-exciting process where each event increases future intensity. |
|
|
|
|
|
Example: |
|
|
>>> hawkes = UnivariateHawkesProcess() |
|
|
>>> events = hawkes.simulate(mu=0.5, alpha=0.8, beta=1.5, T=100.0) |
|
|
>>> result = hawkes.fit(events, T=100.0) |
|
|
>>> print(f"Branching ratio: {result.params.branching_ratio:.3f}") |
|
|
>>> prediction = hawkes.predict_intensity(events, result.params, t=105.0) |
|
|
""" |
|
|
|
|
|
def __init__(self, kernel: str = 'exponential'): |
|
|
""" |
|
|
Initialize Hawkes process. |
|
|
|
|
|
Args: |
|
|
kernel: Excitation kernel type ('exponential', 'power_law') |
|
|
""" |
|
|
self.kernel = kernel |
|
|
|
|
|
def simulate(self, mu: float, alpha: float, beta: float, T: float, |
|
|
max_events: int = 10000) -> np.ndarray: |
|
|
""" |
|
|
Simulate Hawkes process using Ogata's thinning algorithm. |
|
|
|
|
|
Args: |
|
|
mu: Baseline intensity |
|
|
alpha: Excitation amplitude |
|
|
beta: Decay rate |
|
|
T: Time horizon |
|
|
max_events: Maximum number of events to generate |
|
|
|
|
|
Returns: |
|
|
Array of event times |
|
|
""" |
|
|
events = [] |
|
|
t = 0.0 |
|
|
lambda_star = mu |
|
|
|
|
|
while t < T and len(events) < max_events: |
|
|
|
|
|
lambda_star = self._compute_intensity(t, events, mu, alpha, beta) |
|
|
|
|
|
|
|
|
lambda_star = lambda_star * 1.1 + 0.01 |
|
|
|
|
|
|
|
|
u = np.random.uniform() |
|
|
if lambda_star <= 0: |
|
|
break |
|
|
t = t - np.log(u) / lambda_star |
|
|
|
|
|
if t > T: |
|
|
break |
|
|
|
|
|
|
|
|
lambda_t = self._compute_intensity(t, events, mu, alpha, beta) |
|
|
D = np.random.uniform() |
|
|
|
|
|
if D * lambda_star <= lambda_t: |
|
|
events.append(t) |
|
|
|
|
|
return np.array(events) |
|
|
|
|
|
def fit(self, events: np.ndarray, T: float, |
|
|
initial_guess: Optional[Tuple[float, float, float]] = None) -> HawkesFitResult: |
|
|
""" |
|
|
Fit Hawkes process parameters using maximum likelihood. |
|
|
|
|
|
Args: |
|
|
events: Array of event times |
|
|
T: Time horizon (observation period end) |
|
|
initial_guess: Initial parameter guess (mu, alpha, beta) |
|
|
|
|
|
Returns: |
|
|
HawkesFitResult with estimated parameters |
|
|
""" |
|
|
events = np.asarray(events) |
|
|
events = np.sort(events) |
|
|
n_events = len(events) |
|
|
|
|
|
if initial_guess is None: |
|
|
|
|
|
mu_init = n_events / T |
|
|
alpha_init = mu_init * 0.5 |
|
|
beta_init = 1.0 |
|
|
initial_guess = (mu_init, alpha_init, beta_init) |
|
|
|
|
|
|
|
|
def neg_log_likelihood(params): |
|
|
mu, alpha, beta = params |
|
|
|
|
|
|
|
|
if mu <= 0 or alpha <= 0 or beta <= 0: |
|
|
return 1e10 |
|
|
|
|
|
|
|
|
if alpha / beta >= 1.0: |
|
|
return 1e10 |
|
|
|
|
|
return -self._log_likelihood(events, T, mu, alpha, beta) |
|
|
|
|
|
|
|
|
bounds = [(1e-6, None), (1e-6, None), (1e-6, None)] |
|
|
result = optimize.minimize( |
|
|
neg_log_likelihood, |
|
|
x0=initial_guess, |
|
|
method='L-BFGS-B', |
|
|
bounds=bounds |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
warnings.warn(f"Optimization did not converge: {result.message}") |
|
|
|
|
|
mu_opt, alpha_opt, beta_opt = result.x |
|
|
log_likelihood = -result.fun |
|
|
|
|
|
|
|
|
n_params = 3 |
|
|
aic = -2 * log_likelihood + 2 * n_params |
|
|
bic = -2 * log_likelihood + np.log(n_events) * n_params |
|
|
|
|
|
params = HawkesParameters(mu=mu_opt, alpha=alpha_opt, beta=beta_opt) |
|
|
|
|
|
return HawkesFitResult( |
|
|
params=params, |
|
|
log_likelihood=log_likelihood, |
|
|
aic=aic, |
|
|
bic=bic, |
|
|
n_events=n_events, |
|
|
time_span=T |
|
|
) |
|
|
|
|
|
def predict_intensity(self, events: np.ndarray, params: HawkesParameters, |
|
|
t: float) -> float: |
|
|
""" |
|
|
Predict intensity at time t given past events. |
|
|
|
|
|
Args: |
|
|
events: Past event times (must be < t) |
|
|
params: Hawkes parameters |
|
|
t: Time to predict intensity |
|
|
|
|
|
Returns: |
|
|
Intensity λ(t) |
|
|
""" |
|
|
return self._compute_intensity(t, events, params.mu, params.alpha, params.beta) |
|
|
|
|
|
def _compute_intensity(self, t: float, events: List[float], |
|
|
mu: float, alpha: float, beta: float) -> float: |
|
|
"""Compute intensity at time t.""" |
|
|
if len(events) == 0: |
|
|
return mu |
|
|
|
|
|
events_array = np.asarray(events) |
|
|
past_events = events_array[events_array < t] |
|
|
|
|
|
if len(past_events) == 0: |
|
|
return mu |
|
|
|
|
|
|
|
|
excitation = alpha * np.sum(np.exp(-beta * (t - past_events))) |
|
|
|
|
|
return mu + excitation |
|
|
|
|
|
def _log_likelihood(self, events: np.ndarray, T: float, |
|
|
mu: float, alpha: float, beta: float) -> float: |
|
|
""" |
|
|
Compute log-likelihood for Hawkes process. |
|
|
|
|
|
LL = ∑_i log(λ(t_i)) - ∫_0^T λ(s) ds |
|
|
""" |
|
|
n_events = len(events) |
|
|
|
|
|
if n_events == 0: |
|
|
return -mu * T |
|
|
|
|
|
|
|
|
log_sum = 0.0 |
|
|
for i, t_i in enumerate(events): |
|
|
lambda_i = self._compute_intensity(t_i, events[:i], mu, alpha, beta) |
|
|
if lambda_i <= 0: |
|
|
return -np.inf |
|
|
log_sum += np.log(lambda_i) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
integral = mu * T |
|
|
integral += alpha * np.sum(1 - np.exp(-beta * (T - events))) / beta |
|
|
|
|
|
return log_sum - integral |
|
|
|
|
|
|
|
|
class MultivariateHawkesProcess: |
|
|
""" |
|
|
Multivariate Hawkes Process for multiple interacting event streams. |
|
|
|
|
|
For K event types, the intensity of type k is: |
|
|
λ_k(t) = μ_k + ∑_{j=1}^K α_{kj} ∑_{t_i^j < t} φ_{kj}(t - t_i^j) |
|
|
|
|
|
This captures cross-excitation between different event types. |
|
|
Example: Conflict in country A affects conflict probability in country B. |
|
|
|
|
|
Example: |
|
|
>>> # Model 3 countries with mutual excitation |
|
|
>>> hawkes = MultivariateHawkesProcess(n_dimensions=3) |
|
|
>>> events = hawkes.simulate( |
|
|
... mu=np.array([0.5, 0.3, 0.4]), |
|
|
... alpha=np.array([[0.2, 0.1, 0.05], |
|
|
... [0.15, 0.3, 0.1], |
|
|
... [0.1, 0.1, 0.25]]), |
|
|
... beta=np.ones((3, 3)), |
|
|
... T=100.0 |
|
|
... ) |
|
|
>>> result = hawkes.fit(events, T=100.0) |
|
|
""" |
|
|
|
|
|
def __init__(self, n_dimensions: int, kernel: str = 'exponential'): |
|
|
""" |
|
|
Initialize multivariate Hawkes process. |
|
|
|
|
|
Args: |
|
|
n_dimensions: Number of event types (dimensions) |
|
|
kernel: Excitation kernel type |
|
|
""" |
|
|
self.n_dimensions = n_dimensions |
|
|
self.kernel = kernel |
|
|
|
|
|
def simulate(self, mu: np.ndarray, alpha: np.ndarray, beta: np.ndarray, |
|
|
T: float, max_events: int = 10000) -> List[List[float]]: |
|
|
""" |
|
|
Simulate multivariate Hawkes process. |
|
|
|
|
|
Args: |
|
|
mu: Baseline intensities, shape (K,) |
|
|
alpha: Excitation matrix, shape (K, K) |
|
|
alpha[i,j] = effect of event type j on type i |
|
|
beta: Decay rates, shape (K, K) |
|
|
T: Time horizon |
|
|
max_events: Maximum total events |
|
|
|
|
|
Returns: |
|
|
List of event lists, one per dimension |
|
|
""" |
|
|
K = self.n_dimensions |
|
|
events = [[] for _ in range(K)] |
|
|
total_events = 0 |
|
|
|
|
|
t = 0.0 |
|
|
lambda_star = np.sum(mu) * 2 |
|
|
|
|
|
while t < T and total_events < max_events: |
|
|
|
|
|
intensities = self._compute_intensities(t, events, mu, alpha, beta) |
|
|
lambda_star = max(np.sum(intensities) * 1.5, 0.01) |
|
|
|
|
|
|
|
|
u = np.random.uniform() |
|
|
t = t - np.log(u) / lambda_star |
|
|
|
|
|
if t > T: |
|
|
break |
|
|
|
|
|
|
|
|
intensities_t = self._compute_intensities(t, events, mu, alpha, beta) |
|
|
total_intensity = np.sum(intensities_t) |
|
|
|
|
|
D = np.random.uniform() |
|
|
if D * lambda_star <= total_intensity: |
|
|
|
|
|
probs = intensities_t / total_intensity |
|
|
dimension = np.random.choice(K, p=probs) |
|
|
events[dimension].append(t) |
|
|
total_events += 1 |
|
|
|
|
|
return events |
|
|
|
|
|
def fit(self, events: List[List[float]], T: float) -> Dict: |
|
|
""" |
|
|
Fit multivariate Hawkes process. |
|
|
|
|
|
Args: |
|
|
events: List of event lists, one per dimension |
|
|
T: Time horizon |
|
|
|
|
|
Returns: |
|
|
Dictionary with estimated parameters |
|
|
""" |
|
|
K = self.n_dimensions |
|
|
|
|
|
|
|
|
events_arrays = [np.asarray(e) for e in events] |
|
|
|
|
|
|
|
|
n_events = [len(e) for e in events_arrays] |
|
|
mu_init = np.array([n / T for n in n_events]) |
|
|
|
|
|
|
|
|
alpha_init = np.zeros((K, K)) |
|
|
for i in range(K): |
|
|
alpha_init[i, i] = mu_init[i] * 0.3 |
|
|
for j in range(K): |
|
|
if i != j: |
|
|
alpha_init[i, j] = mu_init[i] * 0.1 |
|
|
|
|
|
beta_init = np.ones((K, K)) |
|
|
|
|
|
|
|
|
def pack_params(mu, alpha, beta): |
|
|
return np.concatenate([mu.flatten(), alpha.flatten(), beta.flatten()]) |
|
|
|
|
|
def unpack_params(x): |
|
|
mu = x[:K] |
|
|
alpha = x[K:K + K*K].reshape(K, K) |
|
|
beta = x[K + K*K:].reshape(K, K) |
|
|
return mu, alpha, beta |
|
|
|
|
|
|
|
|
def neg_log_likelihood(x): |
|
|
mu, alpha, beta = unpack_params(x) |
|
|
|
|
|
|
|
|
if np.any(mu <= 0) or np.any(alpha < 0) or np.any(beta <= 0): |
|
|
return 1e10 |
|
|
|
|
|
|
|
|
branching_ratios = alpha / beta |
|
|
if np.max(np.linalg.eigvals(branching_ratios).real) >= 0.99: |
|
|
return 1e10 |
|
|
|
|
|
return -self._log_likelihood(events_arrays, T, mu, alpha, beta) |
|
|
|
|
|
|
|
|
x0 = pack_params(mu_init, alpha_init, beta_init) |
|
|
bounds = [(1e-6, None)] * len(x0) |
|
|
|
|
|
result = optimize.minimize( |
|
|
neg_log_likelihood, |
|
|
x0=x0, |
|
|
method='L-BFGS-B', |
|
|
bounds=bounds |
|
|
) |
|
|
|
|
|
mu_opt, alpha_opt, beta_opt = unpack_params(result.x) |
|
|
log_likelihood = -result.fun |
|
|
|
|
|
|
|
|
n_params = len(x0) |
|
|
total_events = sum(n_events) |
|
|
aic = -2 * log_likelihood + 2 * n_params |
|
|
bic = -2 * log_likelihood + np.log(total_events) * n_params |
|
|
|
|
|
return { |
|
|
'mu': mu_opt, |
|
|
'alpha': alpha_opt, |
|
|
'beta': beta_opt, |
|
|
'branching_matrix': alpha_opt / beta_opt, |
|
|
'spectral_radius': np.max(np.abs(np.linalg.eigvals(alpha_opt / beta_opt))), |
|
|
'log_likelihood': log_likelihood, |
|
|
'aic': aic, |
|
|
'bic': bic, |
|
|
'n_events': n_events, |
|
|
'converged': result.success |
|
|
} |
|
|
|
|
|
def predict_intensities(self, events: List[List[float]], |
|
|
mu: np.ndarray, alpha: np.ndarray, beta: np.ndarray, |
|
|
t: float) -> np.ndarray: |
|
|
""" |
|
|
Predict intensities for all dimensions at time t. |
|
|
|
|
|
Args: |
|
|
events: Past events |
|
|
mu, alpha, beta: Parameters |
|
|
t: Time to predict |
|
|
|
|
|
Returns: |
|
|
Intensity vector, shape (K,) |
|
|
""" |
|
|
return self._compute_intensities(t, events, mu, alpha, beta) |
|
|
|
|
|
def _compute_intensities(self, t: float, events: List[List[float]], |
|
|
mu: np.ndarray, alpha: np.ndarray, |
|
|
beta: np.ndarray) -> np.ndarray: |
|
|
"""Compute intensity vector at time t.""" |
|
|
K = self.n_dimensions |
|
|
intensities = mu.copy() |
|
|
|
|
|
for k in range(K): |
|
|
for j in range(K): |
|
|
if len(events[j]) > 0: |
|
|
events_j = np.asarray(events[j]) |
|
|
past_events = events_j[events_j < t] |
|
|
if len(past_events) > 0: |
|
|
excitation = alpha[k, j] * np.sum( |
|
|
np.exp(-beta[k, j] * (t - past_events)) |
|
|
) |
|
|
intensities[k] += excitation |
|
|
|
|
|
return intensities |
|
|
|
|
|
def _log_likelihood(self, events: List[np.ndarray], T: float, |
|
|
mu: np.ndarray, alpha: np.ndarray, |
|
|
beta: np.ndarray) -> float: |
|
|
"""Compute log-likelihood for multivariate process.""" |
|
|
K = self.n_dimensions |
|
|
log_sum = 0.0 |
|
|
|
|
|
|
|
|
for k in range(K): |
|
|
if len(events[k]) == 0: |
|
|
continue |
|
|
|
|
|
for i, t_i in enumerate(events[k]): |
|
|
|
|
|
events_up_to_i = [[] for _ in range(K)] |
|
|
for j in range(K): |
|
|
events_up_to_i[j] = events[j][events[j] < t_i].tolist() |
|
|
|
|
|
intensities = self._compute_intensities(t_i, events_up_to_i, mu, alpha, beta) |
|
|
lambda_k = intensities[k] |
|
|
|
|
|
if lambda_k <= 0: |
|
|
return -np.inf |
|
|
log_sum += np.log(lambda_k) |
|
|
|
|
|
|
|
|
integral = np.sum(mu) * T |
|
|
|
|
|
for k in range(K): |
|
|
for j in range(K): |
|
|
if len(events[j]) > 0: |
|
|
integral += alpha[k, j] * np.sum( |
|
|
(1 - np.exp(-beta[k, j] * (T - events[j]))) / beta[k, j] |
|
|
) |
|
|
|
|
|
return log_sum - integral |
|
|
|
|
|
|
|
|
class ConflictContagionModel: |
|
|
""" |
|
|
Specialized Hawkes model for geopolitical conflict contagion. |
|
|
|
|
|
Features: |
|
|
- Models both self-excitation (conflict escalation within a country) |
|
|
- Models cross-excitation (conflict spreading between countries) |
|
|
- Incorporates spatial/network structure |
|
|
- Estimates contagion risk and early warning indicators |
|
|
|
|
|
Example: |
|
|
>>> countries = ['Syria', 'Iraq', 'Turkey'] |
|
|
>>> model = ConflictContagionModel(countries=countries) |
|
|
>>> |
|
|
>>> # Fit to historical conflict events |
|
|
>>> events = { |
|
|
... 'Syria': [1.2, 5.3, 10.1, ...], |
|
|
... 'Iraq': [3.4, 8.9, ...], |
|
|
... 'Turkey': [12.3, ...] |
|
|
... } |
|
|
>>> result = model.fit(events, T=365.0) # 1 year |
|
|
>>> |
|
|
>>> # Predict contagion risk |
|
|
>>> risk = model.contagion_risk(events, result, t=370.0) |
|
|
>>> print(f"Syria conflict risk in next 5 days: {risk['Syria']:.2%}") |
|
|
""" |
|
|
|
|
|
def __init__(self, countries: List[str]): |
|
|
""" |
|
|
Initialize conflict contagion model. |
|
|
|
|
|
Args: |
|
|
countries: List of country names |
|
|
""" |
|
|
self.countries = countries |
|
|
self.n_countries = len(countries) |
|
|
self.hawkes = MultivariateHawkesProcess(n_dimensions=self.n_countries) |
|
|
|
|
|
def fit(self, events: Dict[str, List[float]], T: float) -> Dict: |
|
|
""" |
|
|
Fit contagion model to conflict events. |
|
|
|
|
|
Args: |
|
|
events: Dictionary mapping country name to list of event times |
|
|
T: Observation period |
|
|
|
|
|
Returns: |
|
|
Fitted parameters with interpretation |
|
|
""" |
|
|
|
|
|
events_list = [events[country] for country in self.countries] |
|
|
|
|
|
|
|
|
result = self.hawkes.fit(events_list, T) |
|
|
|
|
|
|
|
|
result['countries'] = self.countries |
|
|
result['self_excitation'] = np.diag(result['alpha']) |
|
|
result['cross_excitation_mean'] = np.mean( |
|
|
result['alpha'][~np.eye(self.n_countries, dtype=bool)] |
|
|
) |
|
|
|
|
|
|
|
|
outgoing_contagion = np.sum(result['alpha'], axis=0) - np.diag(result['alpha']) |
|
|
incoming_contagion = np.sum(result['alpha'], axis=1) - np.diag(result['alpha']) |
|
|
|
|
|
result['most_contagious_source'] = self.countries[np.argmax(outgoing_contagion)] |
|
|
result['most_vulnerable_target'] = self.countries[np.argmax(incoming_contagion)] |
|
|
|
|
|
return result |
|
|
|
|
|
def contagion_risk(self, events: Dict[str, List[float]], |
|
|
params: Dict, t: float, horizon: float = 5.0) -> Dict[str, float]: |
|
|
""" |
|
|
Estimate contagion risk over next time period. |
|
|
|
|
|
Args: |
|
|
events: Historical events |
|
|
params: Fitted parameters |
|
|
t: Current time |
|
|
horizon: Risk horizon (time units) |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping country to probability of conflict |
|
|
""" |
|
|
events_list = [events[country] for country in self.countries] |
|
|
|
|
|
|
|
|
intensities = self.hawkes.predict_intensities( |
|
|
events_list, params['mu'], params['alpha'], params['beta'], t |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
risks = {} |
|
|
for i, country in enumerate(self.countries): |
|
|
|
|
|
expected_events = intensities[i] * horizon |
|
|
prob_no_event = np.exp(-expected_events) |
|
|
prob_at_least_one = 1 - prob_no_event |
|
|
risks[country] = prob_at_least_one |
|
|
|
|
|
return risks |
|
|
|
|
|
def identify_contagion_pathways(self, params: Dict, threshold: float = 0.1) -> List[Tuple[str, str, float]]: |
|
|
""" |
|
|
Identify significant contagion pathways between countries. |
|
|
|
|
|
Args: |
|
|
params: Fitted parameters |
|
|
threshold: Minimum branching ratio to report |
|
|
|
|
|
Returns: |
|
|
List of (source, target, branching_ratio) tuples |
|
|
""" |
|
|
alpha = params['alpha'] |
|
|
beta = params['beta'] |
|
|
branching = alpha / beta |
|
|
|
|
|
pathways = [] |
|
|
for i in range(self.n_countries): |
|
|
for j in range(self.n_countries): |
|
|
if i != j and branching[i, j] > threshold: |
|
|
pathways.append(( |
|
|
self.countries[j], |
|
|
self.countries[i], |
|
|
branching[i, j] |
|
|
)) |
|
|
|
|
|
|
|
|
pathways.sort(key=lambda x: x[2], reverse=True) |
|
|
|
|
|
return pathways |
|
|
|
|
|
|
|
|
def estimate_branching_ratio(events: np.ndarray, T: float) -> float: |
|
|
""" |
|
|
Quick estimate of branching ratio for stability assessment. |
|
|
|
|
|
Args: |
|
|
events: Event times |
|
|
T: Time horizon |
|
|
|
|
|
Returns: |
|
|
Estimated branching ratio |
|
|
""" |
|
|
hawkes = UnivariateHawkesProcess() |
|
|
result = hawkes.fit(events, T) |
|
|
return result.params.branching_ratio |
|
|
|
|
|
|
|
|
def detect_explosive_regime(events: np.ndarray, T: float, window: float = 10.0) -> List[Tuple[float, float]]: |
|
|
""" |
|
|
Detect time periods where process became explosive (supercritical). |
|
|
|
|
|
Args: |
|
|
events: Event times |
|
|
T: Total time horizon |
|
|
window: Rolling window size |
|
|
|
|
|
Returns: |
|
|
List of (start_time, branching_ratio) for explosive periods |
|
|
""" |
|
|
events = np.sort(events) |
|
|
explosive_periods = [] |
|
|
|
|
|
t = window |
|
|
while t <= T: |
|
|
|
|
|
window_events = events[(events >= t - window) & (events <= t)] |
|
|
|
|
|
if len(window_events) > 5: |
|
|
br = estimate_branching_ratio(window_events - (t - window), window) |
|
|
|
|
|
if br >= 0.9: |
|
|
explosive_periods.append((t, br)) |
|
|
|
|
|
t += window / 2 |
|
|
|
|
|
return explosive_periods |
|
|
|