ecflow / generate_tpd_data.py
Bing Yan
Initial ECFlow deployment
d6b782e
"""
TPD/TPR Simulator and Dataset Generation for Heterogeneous Catalysis.
Implements 6 temperature-programmed desorption/reaction mechanisms:
1. FirstOrder - first-order desorption (Polanyi-Wigner, n=1)
2. SecondOrder - second-order/recombinative desorption (n=2)
3. LH_Surface - Langmuir-Hinshelwood bimolecular surface reaction
4. MvK - Mars-van Krevelen lattice oxygen mechanism
5. FirstOrderCovDep - first-order with coverage-dependent activation energy
6. DiffLimited - diffusion-limited desorption from porous materials
Each mechanism is solved as an ODE with a linear temperature ramp T = T0 + beta*t.
Supports multi-heating-rate generation (analogous to multi-scan-rate CVs in EC).
Data format mirrors the electrochemistry pipeline for compatibility.
"""
import os
import sys
import json
import argparse
import numpy as np
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
import scipy.integrate
# =============================================================================
# Mechanism registry
# =============================================================================
TPD_MECHANISM_LIST = [
'FirstOrder', 'SecondOrder', 'LH_Surface', 'MvK',
'FirstOrderCovDep', 'DiffLimited',
]
TPD_MECHANISM_TO_ID = {m: i for i, m in enumerate(TPD_MECHANISM_LIST)}
TPD_MECHANISM_PARAMS = {
'FirstOrder': {
'names': ['Ed', 'log10(nu)', 'theta_0'],
'dim': 3,
},
'SecondOrder': {
'names': ['Ed', 'log10(nu)', 'theta_0'],
'dim': 3,
},
'LH_Surface': {
'names': ['Ea', 'log10(nu)', 'theta_A0', 'theta_B0'],
'dim': 4,
},
'MvK': {
'names': ['Ea_red', 'Ea_reox', 'log10(nu_red)', 'theta_O0'],
'dim': 4,
},
'FirstOrderCovDep': {
'names': ['Ed0', 'alpha_cov', 'log10(nu)', 'theta_0'],
'dim': 4,
},
'DiffLimited': {
'names': ['Ed', 'log10(nu)', 'log10(D0)', 'E_diff', 'theta_0'],
'dim': 5,
},
}
# =============================================================================
# Simulation constants
# =============================================================================
# We work in dimensionless temperature units: T_dimless = T_physical / T_ref.
# Activation energies are also dimensionless: Ed_dimless = Ed_physical / (R * T_ref).
# With T_ref = 1 K conceptually, T values are just Kelvin and Ed = Ea/(R) in K.
# In practice we use T in Kelvin directly and Ed in units of K (= Ea/R).
# This avoids carrying R everywhere.
T_REF = 1.0 # Reference temperature (K); energies in K (= Ea/R)
N_POINTS_DEFAULT = 500 # time steps per TPD curve
# =============================================================================
# ODE Simulators
# =============================================================================
def run_tpd_first_order(Ed, nu, theta_0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT):
"""
First-order desorption: d(theta)/dt = -nu * theta * exp(-Ed / T(t))
where T(t) = T_start + beta * t.
Parameters
----------
Ed : float
Dimensionless desorption energy (= Ea / R, in Kelvin).
nu : float
Pre-exponential factor (s^-1).
theta_0 : float
Initial fractional surface coverage [0, 1].
beta : float
Heating rate (K/s).
T_start, T_end : float
Temperature ramp range (K).
n_points : int
Number of output points.
Returns
-------
dict with 'temperature', 'rate', 'time', 'coverage', 'params'
"""
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
def rhs(t, y):
theta = y[0]
T = T_start + beta * t
if T < 1.0:
T = 1.0
rate = -nu * theta * np.exp(-Ed / T)
return [rate]
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], [theta_0], t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta = sol.y[0]
temperature = T_start + beta * sol.t
rate = nu * np.maximum(theta, 0) * np.exp(-Ed / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': theta.astype(np.float32),
'params': {
'mechanism': 'FirstOrder',
'Ed': float(Ed),
'nu': float(nu),
'theta_0': float(theta_0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
},
}
def run_tpd_second_order(Ed, nu, theta_0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT):
"""
Second-order (recombinative) desorption:
d(theta)/dt = -nu * theta^2 * exp(-Ed / T(t))
Peak position shifts to lower T with increasing theta_0 (key diagnostic).
"""
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
def rhs(t, y):
theta = y[0]
T = T_start + beta * t
if T < 1.0:
T = 1.0
rate = -nu * theta ** 2 * np.exp(-Ed / T)
return [rate]
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], [theta_0], t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta = sol.y[0]
temperature = T_start + beta * sol.t
rate = nu * np.maximum(theta, 0) ** 2 * np.exp(-Ed / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': theta.astype(np.float32),
'params': {
'mechanism': 'SecondOrder',
'Ed': float(Ed),
'nu': float(nu),
'theta_0': float(theta_0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
},
}
def run_tpd_lh_surface(Ea, nu, theta_A0, theta_B0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT):
"""
Langmuir-Hinshelwood bimolecular surface reaction:
A(ads) + B(ads) -> products
d(theta_A)/dt = -nu * theta_A * theta_B * exp(-Ea / T)
d(theta_B)/dt = -nu * theta_A * theta_B * exp(-Ea / T)
rate = nu * theta_A * theta_B * exp(-Ea / T)
"""
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
def rhs(t, y):
theta_A, theta_B = y
T = T_start + beta * t
if T < 1.0:
T = 1.0
r = -nu * theta_A * theta_B * np.exp(-Ea / T)
return [r, r]
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], [theta_A0, theta_B0], t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta_A = sol.y[0]
theta_B = sol.y[1]
temperature = T_start + beta * sol.t
rate = nu * np.maximum(theta_A, 0) * np.maximum(theta_B, 0) * \
np.exp(-Ea / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': np.stack([theta_A, theta_B], axis=-1).astype(np.float32),
'params': {
'mechanism': 'LH_Surface',
'Ea': float(Ea),
'nu': float(nu),
'theta_A0': float(theta_A0),
'theta_B0': float(theta_B0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
},
}
def run_tpd_mvk(Ea_red, Ea_reox, nu_red, theta_O0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT):
"""
Mars-van Krevelen lattice oxygen mechanism:
Reduction: rate_red = nu_red * theta_O * exp(-Ea_red / T)
Reoxidation: rate_reox = nu_reox * (1 - theta_O) * exp(-Ea_reox / T)
d(theta_O)/dt = rate_reox - rate_red
Observable rate = rate_red (consumption of lattice oxygen)
nu_reox is fixed at nu_red * 0.1 to reduce parameter count while
keeping the two-process competition that creates the distinctive MvK
peak shapes.
"""
nu_reox = nu_red * 0.1
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
def rhs(t, y):
theta_O = y[0]
T = T_start + beta * t
if T < 1.0:
T = 1.0
r_red = nu_red * theta_O * np.exp(-Ea_red / T)
r_reox = nu_reox * (1.0 - theta_O) * np.exp(-Ea_reox / T)
return [r_reox - r_red]
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], [theta_O0], t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta_O = sol.y[0]
temperature = T_start + beta * sol.t
rate = nu_red * np.maximum(theta_O, 0) * \
np.exp(-Ea_red / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': theta_O.astype(np.float32),
'params': {
'mechanism': 'MvK',
'Ea_red': float(Ea_red),
'Ea_reox': float(Ea_reox),
'nu_red': float(nu_red),
'nu_reox': float(nu_reox),
'theta_O0': float(theta_O0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
},
}
def run_tpd_first_order_covdep(Ed0, alpha_cov, nu, theta_0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT):
"""
First-order desorption with coverage-dependent activation energy:
Ed(theta) = Ed0 + alpha_cov * theta
d(theta)/dt = -nu * theta * exp(-Ed(theta) / T(t))
Repulsive lateral interactions (alpha_cov > 0) cause the peak to sharpen
and shift to lower T as coverage decreases — a signature that standard
Redhead analysis misinterprets as a change in Ed.
Parameters
----------
Ed0 : float
Zero-coverage desorption energy (K, = Ea0/R).
alpha_cov : float
Coverage-dependence coefficient (K). Positive = repulsive interactions.
nu : float
Pre-exponential factor (s^-1).
theta_0 : float
Initial fractional surface coverage [0, 1].
beta : float
Heating rate (K/s).
"""
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
def rhs(t, y):
theta = y[0]
T = T_start + beta * t
if T < 1.0:
T = 1.0
Ed_eff = Ed0 + alpha_cov * theta
rate = -nu * theta * np.exp(-Ed_eff / T)
return [rate]
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], [theta_0], t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta = sol.y[0]
temperature = T_start + beta * sol.t
Ed_eff = Ed0 + alpha_cov * np.maximum(theta, 0)
rate = nu * np.maximum(theta, 0) * np.exp(-Ed_eff / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': theta.astype(np.float32),
'params': {
'mechanism': 'FirstOrderCovDep',
'Ed0': float(Ed0),
'alpha_cov': float(alpha_cov),
'nu': float(nu),
'theta_0': float(theta_0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
},
}
def run_tpd_diff_limited(Ed, nu, D0, E_diff, theta_0, beta, T_start, T_end,
n_points=N_POINTS_DEFAULT, n_shells=20):
"""
Diffusion-limited desorption from a porous/layered material.
Models a 1D spherical particle with n_shells concentric shells.
Surface desorption follows first-order kinetics; replenishment of the
surface layer is limited by intra-particle diffusion with an
Arrhenius-type diffusivity D(T) = D0 * exp(-E_diff / T).
This produces characteristic broadened, asymmetric peaks with long
high-temperature tails that traditional Redhead/Kissinger methods
cannot fit — the apparent activation energy depends on particle size
and diffusivity.
Parameters
----------
Ed : float
Surface desorption energy (K, = Ea/R).
nu : float
Pre-exponential factor for desorption (s^-1).
D0 : float
Diffusion pre-exponential (s^-1, dimensionless Fourier units).
E_diff : float
Diffusion activation energy (K, = Ea_diff/R).
theta_0 : float
Initial uniform loading in all shells [0, 1].
beta : float
Heating rate (K/s).
n_shells : int
Number of radial shells for the discretized diffusion.
"""
t_end = (T_end - T_start) / beta
t_eval = np.linspace(0, t_end, n_points)
# Radial grid: shells at r_i/R = (i+0.5)/n_shells, i=0..n_shells-1
# Shell 0 = center, shell n_shells-1 = surface
dr = 1.0 / n_shells
r = np.array([(i + 0.5) * dr for i in range(n_shells)])
r_face = np.array([i * dr for i in range(n_shells + 1)])
# Shell volumes (spherical): V_i = 4/3 pi (r_face[i+1]^3 - r_face[i]^3)
vol = (4.0 / 3.0) * np.pi * (r_face[1:] ** 3 - r_face[:-1] ** 3)
y0 = np.full(n_shells, theta_0)
def rhs(t, y):
T = T_start + beta * t
if T < 1.0:
T = 1.0
D = D0 * np.exp(-E_diff / T)
dydt = np.zeros(n_shells)
# Diffusion between adjacent shells (spherical coordinates)
for i in range(n_shells - 1):
area = 4.0 * np.pi * r_face[i + 1] ** 2
flux = D * area * (y[i] - y[i + 1]) / dr
dydt[i] -= flux / vol[i]
dydt[i + 1] += flux / vol[i + 1]
# Surface desorption from outermost shell only
k_des = nu * np.exp(-Ed / T)
dydt[-1] -= k_des * y[-1]
return dydt
sol = scipy.integrate.solve_ivp(
rhs, [0, t_end], y0, t_eval=t_eval,
method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50,
)
theta_all = sol.y # [n_shells, n_points]
temperature = T_start + beta * sol.t
# Observable rate = surface desorption rate
theta_surf = np.maximum(theta_all[-1], 0)
rate = nu * theta_surf * np.exp(-Ed / np.maximum(temperature, 1.0))
return {
'temperature': temperature.astype(np.float32),
'rate': rate.astype(np.float32),
'time': sol.t.astype(np.float32),
'coverage': theta_surf.astype(np.float32),
'params': {
'mechanism': 'DiffLimited',
'Ed': float(Ed),
'nu': float(nu),
'D0': float(D0),
'E_diff': float(E_diff),
'theta_0': float(theta_0),
'beta': float(beta),
'T_start': float(T_start),
'T_end': float(T_end),
'n_shells': n_shells,
},
}
# =============================================================================
# Simulation dispatch
# =============================================================================
def _run_single_tpd(params):
"""Run a single TPD simulation, dispatching to the correct mechanism."""
mech = params['mechanism']
beta = params['beta']
T_start = params['T_start']
T_end = params['T_end']
n_points = params.get('n_points', N_POINTS_DEFAULT)
if mech == 'FirstOrder':
return run_tpd_first_order(
params['Ed'], params['nu'], params['theta_0'],
beta, T_start, T_end, n_points,
)
elif mech == 'SecondOrder':
return run_tpd_second_order(
params['Ed'], params['nu'], params['theta_0'],
beta, T_start, T_end, n_points,
)
elif mech == 'LH_Surface':
return run_tpd_lh_surface(
params['Ea'], params['nu'], params['theta_A0'], params['theta_B0'],
beta, T_start, T_end, n_points,
)
elif mech == 'MvK':
return run_tpd_mvk(
params['Ea_red'], params['Ea_reox'], params['nu_red'],
params['theta_O0'], beta, T_start, T_end, n_points,
)
elif mech == 'FirstOrderCovDep':
return run_tpd_first_order_covdep(
params['Ed0'], params['alpha_cov'], params['nu'],
params['theta_0'], beta, T_start, T_end, n_points,
)
elif mech == 'DiffLimited':
return run_tpd_diff_limited(
params['Ed'], params['nu'], params['D0'], params['E_diff'],
params['theta_0'], beta, T_start, T_end, n_points,
)
else:
raise ValueError(f"Unknown TPD mechanism: {mech}")
# =============================================================================
# Parameter sampling
# =============================================================================
def _sample_common_tpd_params(rng):
"""Sample common TPD experiment parameters."""
T_start = rng.uniform(300, 400)
T_end = rng.uniform(900, 1200)
return T_start, T_end
def _estimate_T_peak(Ed, log10_nu, beta):
"""Redhead estimate of peak temperature for first-order TPD.
Solves Ed/T_peak^2 ≈ (nu/beta) * exp(-Ed/T_peak) iteratively.
Good enough for rejection sampling."""
# Initial guess: T_peak ≈ Ed / (ln(nu*Ed/beta) - ln(T_peak^2))
# Simplified: T_peak ≈ Ed / (log10_nu * ln(10) - ln(Ed/beta))
ln_nu = log10_nu * np.log(10)
T_est = Ed / (ln_nu - np.log(max(Ed / max(beta, 0.01), 1.0)))
T_est = np.clip(T_est, 100, 2000)
# One Newton step
for _ in range(3):
exp_term = np.exp(-Ed / max(T_est, 1.0))
f = Ed / (T_est ** 2) - (10 ** log10_nu / max(beta, 0.01)) * exp_term
df = -2 * Ed / (T_est ** 3) - (10 ** log10_nu / max(beta, 0.01)) * exp_term * Ed / (T_est ** 2)
if abs(df) > 1e-30:
T_est = T_est - f / df
T_est = np.clip(T_est, 100, 2000)
return T_est
def _sample_Ed_nu_beta(rng, T_start, T_end, max_attempts=50):
"""Sample (Ed, nu, beta) ensuring the estimated peak temperature
falls within the measurement window [T_start+margin, T_end-margin]."""
T_lo = T_start + 0.15 * (T_end - T_start)
T_hi = T_end - 0.10 * (T_end - T_start)
for _ in range(max_attempts):
log10_nu = rng.uniform(12, 16)
log10_beta = rng.uniform(-0.5, 1.5)
beta = 10 ** log10_beta
# Sample T_peak target, then back-solve for Ed
T_peak_target = rng.uniform(T_lo, T_hi)
ln_nu = log10_nu * np.log(10)
# From Redhead: Ed ≈ T_peak * (ln(nu*T_peak/beta) - ln(T_peak))
# ≈ T_peak * (ln_nu + ln(T_peak) - ln(beta) - 3.64)
Ed = T_peak_target * (ln_nu + np.log(T_peak_target) - np.log(beta) - 3.64)
if Ed < 3000 or Ed > 50000:
continue
T_est = _estimate_T_peak(Ed, log10_nu, beta)
if T_lo <= T_est <= T_hi:
return Ed, 10 ** log10_nu, beta
# Fallback: conservative parameters that always produce a mid-range peak
beta = 10 ** rng.uniform(0.0, 1.0)
T_peak_target = rng.uniform(T_lo, T_hi)
log10_nu = 13.0
Ed = T_peak_target * (13.0 * np.log(10) + np.log(T_peak_target) - np.log(beta) - 3.64)
return Ed, 10 ** log10_nu, beta
def sample_first_order_params(rng):
"""Sample parameters for first-order desorption."""
T_start, T_end = _sample_common_tpd_params(rng)
Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
theta_0 = rng.uniform(0.1, 1.0)
return {
'mechanism': 'FirstOrder',
'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_second_order_params(rng):
"""Sample parameters for second-order desorption."""
T_start, T_end = _sample_common_tpd_params(rng)
Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
theta_0 = rng.uniform(0.1, 1.0)
return {
'mechanism': 'SecondOrder',
'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_lh_surface_params(rng):
"""Sample parameters for LH bimolecular surface reaction."""
T_start, T_end = _sample_common_tpd_params(rng)
Ea, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
theta_A0 = rng.uniform(0.1, 1.0)
theta_B0 = rng.uniform(0.1, 1.0)
return {
'mechanism': 'LH_Surface',
'Ea': float(Ea), 'nu': float(nu),
'theta_A0': float(theta_A0), 'theta_B0': float(theta_B0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_mvk_params(rng):
"""Sample parameters for Mars-van Krevelen mechanism."""
T_start, T_end = _sample_common_tpd_params(rng)
Ea_red, nu_red, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
# Reoxidation energy: sample independently but also constrain to reasonable range
Ea_reox, _, _ = _sample_Ed_nu_beta(rng, T_start, T_end)
theta_O0 = rng.uniform(0.5, 1.0)
return {
'mechanism': 'MvK',
'Ea_red': float(Ea_red), 'Ea_reox': float(Ea_reox),
'nu_red': float(nu_red), 'theta_O0': float(theta_O0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_first_order_covdep_params(rng):
"""Sample parameters for coverage-dependent first-order desorption.
Ed(theta) = Ed0 + alpha_cov * theta. alpha_cov > 0 means repulsive
lateral interactions (common for CO on metals). We sample Ed0 via the
Redhead constraint at the *initial* coverage so the peak lands in the
measurement window, then add a coverage-dependence coefficient that
shifts the effective energy by up to ~30% of Ed0.
"""
T_start, T_end = _sample_common_tpd_params(rng)
Ed0, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
# alpha_cov in [0.05*Ed0, 0.35*Ed0]: ensures noticeable but not
# overwhelming coverage dependence
alpha_cov = rng.uniform(0.05, 0.35) * Ed0
theta_0 = rng.uniform(0.3, 1.0)
return {
'mechanism': 'FirstOrderCovDep',
'Ed0': float(Ed0), 'alpha_cov': float(alpha_cov),
'nu': float(nu), 'theta_0': float(theta_0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_diff_limited_params(rng):
"""Sample parameters for diffusion-limited desorption.
The key physics: if E_diff is comparable to Ed, diffusion is the
rate-limiting step and the TPD peak broadens with a long tail.
If E_diff << Ed, diffusion is fast and the curve looks like standard
first-order. We sample to ensure a range of diffusion-limitation
regimes.
"""
T_start, T_end = _sample_common_tpd_params(rng)
Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end)
# D0: dimensionless diffusion pre-exponential.
# In Fourier number units (D*t/R^2), D0 ~ 1e2 to 1e6 gives a range
# from strongly diffusion-limited to nearly surface-kinetics-limited.
log10_D0 = rng.uniform(2.0, 6.0)
D0 = 10 ** log10_D0
# E_diff: diffusion activation energy.
# Ratio E_diff/Ed ~ 0.3 to 0.9 covers weakly to strongly limited regimes.
E_diff = rng.uniform(0.3, 0.9) * Ed
theta_0 = rng.uniform(0.3, 1.0)
return {
'mechanism': 'DiffLimited',
'Ed': float(Ed), 'nu': float(nu),
'D0': float(D0), 'E_diff': float(E_diff),
'theta_0': float(theta_0),
'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end),
}
def sample_tpd_params(rng, mechanism=None):
"""Sample TPD parameters, optionally for a specific mechanism."""
if mechanism is None:
mechanism = rng.choice(TPD_MECHANISM_LIST)
samplers = {
'FirstOrder': sample_first_order_params,
'SecondOrder': sample_second_order_params,
'LH_Surface': sample_lh_surface_params,
'MvK': sample_mvk_params,
'FirstOrderCovDep': sample_first_order_covdep_params,
'DiffLimited': sample_diff_limited_params,
}
return samplers[mechanism](rng)
# =============================================================================
# Noise
# =============================================================================
def _add_noise(signal, rng, noise_range=(0.001, 0.02)):
"""Add Gaussian noise to a signal. Returns (noisy_signal, sigma_noise).
Clamps result to >= 0 since desorption/reaction rates are non-negative."""
sigma_noise = rng.uniform(*noise_range)
peak = np.max(np.abs(signal)) + 1e-20
noise = sigma_noise * peak * rng.standard_normal(signal.shape)
noisy = signal + noise.astype(signal.dtype)
np.maximum(noisy, 0, out=noisy)
return noisy, float(sigma_noise)
# =============================================================================
# Multi-heating-rate sampling
# =============================================================================
def _sample_heating_rates(rng, n_rates, log_beta_range=(-0.5, 1.5)):
"""Sample log-spaced heating rates spanning the given range."""
lo, hi = log_beta_range
if n_rates == 1:
return np.array([10 ** rng.uniform(lo, hi)])
anchors = np.linspace(lo, hi, n_rates)
jitter = (hi - lo) / (n_rates - 1) * 0.3
log_betas = np.array([rng.uniform(a - jitter, a + jitter) for a in anchors])
log_betas = np.clip(log_betas, lo, hi)
log_betas.sort()
return 10 ** log_betas
# =============================================================================
# Dataset generation
# =============================================================================
def generate_sample_single(idx, outdir, seed, mechanism=None,
n_heating_rates=1, add_noise=True):
"""
Generate and save a single TPD sample (single or multi-heating-rate).
When n_heating_rates > 1, the same kinetic parameters are simulated at
multiple heating rates and saved together.
"""
rng = np.random.default_rng(seed + idx)
try:
params = sample_tpd_params(rng, mechanism=mechanism)
actual_mechanism = params['mechanism']
mechanism_id = TPD_MECHANISM_TO_ID[actual_mechanism]
if n_heating_rates <= 1:
result = _run_single_tpd(params)
rate = result['rate'].copy()
sigma_noise = 0.0
if add_noise:
rate, sigma_noise = _add_noise(rate, rng)
save_params = dict(params)
save_params['sigma_noise'] = sigma_noise
np.savez_compressed(
os.path.join(outdir, f"sample_{idx:06d}.npz"),
temperature=result['temperature'],
rate=rate,
time=result['time'],
params=save_params,
mechanism_id=np.int32(mechanism_id),
)
meta = {
'idx': idx, 'success': True,
'mechanism': actual_mechanism,
'mechanism_id': int(mechanism_id),
'n_time': len(result['time']),
'beta': float(params['beta']),
'n_heating_rates': 1,
'sigma_noise': sigma_noise,
}
return meta
# Multi-heating-rate: same kinetic params, different heating rates
heating_rates = _sample_heating_rates(rng, n_heating_rates)
temperatures, rates, times = [], [], []
for beta in heating_rates:
p = dict(params)
p['beta'] = float(beta)
result = _run_single_tpd(p)
temp = result['temperature'].copy()
rate = result['rate'].copy()
if add_noise:
rate, _ = _add_noise(rate, rng)
temperatures.append(temp)
rates.append(rate)
times.append(result['time'].copy())
# Pad to same length
max_t = max(len(t) for t in temperatures)
n_hr = len(heating_rates)
temp_arr = np.zeros((n_hr, max_t), dtype=np.float32)
rate_arr = np.zeros((n_hr, max_t), dtype=np.float32)
time_arr = np.zeros((n_hr, max_t), dtype=np.float32)
lengths = np.zeros(n_hr, dtype=np.int32)
for i in range(n_hr):
t_len = len(temperatures[i])
temp_arr[i, :t_len] = temperatures[i]
rate_arr[i, :t_len] = rates[i]
time_arr[i, :t_len] = times[i]
lengths[i] = t_len
save_params = dict(params)
# Remove the single beta; heating_rates array is saved separately
save_params.pop('beta', None)
np.savez_compressed(
os.path.join(outdir, f"sample_{idx:06d}.npz"),
temperature=temp_arr,
rate=rate_arr,
time=time_arr,
heating_rates=heating_rates.astype(np.float32),
lengths=lengths,
params=save_params,
mechanism_id=np.int32(mechanism_id),
n_heating_rates=np.int32(n_heating_rates),
)
meta = {
'idx': idx, 'success': True,
'mechanism': actual_mechanism,
'mechanism_id': int(mechanism_id),
'n_time_max': int(max_t),
'heating_rates': [float(b) for b in heating_rates],
'n_heating_rates': n_heating_rates,
}
return meta
except Exception as e:
return {
'idx': idx,
'success': False,
'error': str(e),
}
def _worker_generate(args):
"""Worker function for multiprocessing (must be at module level)."""
idx, outdir, seed, mechanism, n_heating_rates, add_noise = args
return generate_sample_single(idx, outdir, seed, mechanism,
n_heating_rates, add_noise)
def generate_dataset(
n_samples=1000,
outdir="data_tpd/raw",
seed=42,
n_workers=None,
mechanism=None,
multi_mechanism=False,
n_per_mechanism=None,
n_heating_rates=1,
add_noise=True,
):
"""Generate a dataset of TPD simulations."""
os.makedirs(outdir, exist_ok=True)
if n_workers is None:
n_workers = max(1, cpu_count() - 1)
if multi_mechanism:
if n_per_mechanism is None:
n_per_mechanism = n_samples
total = n_per_mechanism * len(TPD_MECHANISM_LIST)
print(f"Generating multi-mechanism TPD dataset: "
f"{n_per_mechanism} per mechanism x {len(TPD_MECHANISM_LIST)} = {total}")
args_list = []
for mech_idx, mech in enumerate(TPD_MECHANISM_LIST):
offset = mech_idx * n_per_mechanism
for i in range(n_per_mechanism):
args_list.append((offset + i, outdir, seed, mech,
n_heating_rates, add_noise))
n_samples = total
else:
args_list = [(i, outdir, seed, mechanism, n_heating_rates, add_noise)
for i in range(n_samples)]
n_workers = min(n_workers, n_samples)
print(f"Generating {n_samples} TPD samples...")
print(f"Output directory: {outdir}")
print(f"Heating rates per sample: {n_heating_rates}")
print(f"Using {n_workers} worker(s)")
metadata = []
if n_workers == 1:
for args in tqdm(args_list, desc="Generating samples"):
meta = _worker_generate(args)
metadata.append(meta)
if not meta['success']:
print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown')}")
else:
try:
with Pool(processes=n_workers) as pool:
for meta in tqdm(
pool.imap_unordered(_worker_generate, args_list, chunksize=max(1, n_samples // (n_workers * 4))),
total=n_samples,
desc="Generating samples",
):
metadata.append(meta)
if not meta['success']:
tqdm.write(f"Sample {meta['idx']} failed: "
f"{meta.get('error', 'Unknown')}")
except (PermissionError, OSError) as e:
print(f"\nWarning: Multiprocessing failed ({e}). "
"Falling back to sequential...")
metadata = []
for args in tqdm(args_list, desc="Generating samples (sequential)"):
meta = _worker_generate(args)
metadata.append(meta)
if not meta['success']:
print(f"\nSample {meta['idx']} failed: "
f"{meta.get('error', 'Unknown')}")
metadata = sorted(metadata, key=lambda x: x['idx'])
n_success = sum(1 for m in metadata if m['success'])
mech_counts = {}
for m in metadata:
if m['success']:
mech = m.get('mechanism', 'Unknown')
mech_counts[mech] = mech_counts.get(mech, 0) + 1
summary = {
'n_samples': n_samples,
'n_success': n_success,
'n_heating_rates': n_heating_rates,
'add_noise': add_noise,
'seed': seed,
'n_workers': n_workers,
'multi_mechanism': multi_mechanism,
'mechanism_counts': mech_counts,
'samples': metadata,
}
with open(os.path.join(outdir, "metadata.json"), "w") as f:
json.dump(summary, f, indent=2)
print(f"\nGeneration complete: {n_success}/{n_samples} successful")
print(f"Mechanism counts: {mech_counts}")
print(f"Metadata saved to {os.path.join(outdir, 'metadata.json')}")
return metadata
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate TPD dataset for catalysis mechanism identification"
)
parser.add_argument("--n_samples", type=int, default=1000)
parser.add_argument("--outdir", type=str, default="data_tpd/raw")
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--n_workers", type=int, default=None)
parser.add_argument("--mechanism", type=str, default=None,
choices=TPD_MECHANISM_LIST)
parser.add_argument("--multi_mechanism", action="store_true")
parser.add_argument("--n_per_mechanism", type=int, default=None)
parser.add_argument("--n_heating_rates", type=int, default=1)
parser.add_argument("--no_noise", action="store_true")
parser.add_argument("--test", action="store_true",
help="Run a single test simulation and plot")
args = parser.parse_args()
if args.test:
print(f"Running test TPD simulations for all {len(TPD_MECHANISM_LIST)} mechanisms...\n")
rng = np.random.default_rng(42)
for mech in TPD_MECHANISM_LIST:
params = sample_tpd_params(rng, mechanism=mech)
result = _run_single_tpd(params)
peak_rate = np.max(result['rate'])
peak_T = result['temperature'][np.argmax(result['rate'])]
print(f"{mech}:")
print(f" T range: [{result['temperature'][0]:.0f}, "
f"{result['temperature'][-1]:.0f}] K")
print(f" Peak rate: {peak_rate:.4e} at T = {peak_T:.0f} K")
print(f" Time steps: {len(result['time'])}")
print(f" Params: {params}")
print()
try:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, mech in zip(axes.flat, TPD_MECHANISM_LIST):
params = sample_tpd_params(rng, mechanism=mech)
for beta in [0.5, 2.0, 10.0]:
p = dict(params)
p['beta'] = beta
result = _run_single_tpd(p)
ax.plot(result['temperature'], result['rate'],
label=f'beta={beta:.1f} K/s')
ax.set_xlabel('Temperature (K)')
ax.set_ylabel('Desorption/Reaction Rate')
ax.set_title(mech)
ax.legend(fontsize=8)
plt.tight_layout()
plt.savefig('test_tpd_simulation.png', dpi=150)
print("Plot saved to test_tpd_simulation.png")
except ImportError:
print("matplotlib not available, skipping plot")
else:
generate_dataset(
n_samples=args.n_samples,
outdir=args.outdir,
seed=args.seed,
n_workers=args.n_workers,
mechanism=args.mechanism,
multi_mechanism=args.multi_mechanism,
n_per_mechanism=args.n_per_mechanism,
n_heating_rates=args.n_heating_rates,
add_noise=not args.no_noise,
)