File size: 5,432 Bytes
afe68b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd
from ase import Atoms, io, units
from ase.calculators.calculator import BaseCalculator
from ase.neighborlist import NeighborList, natural_cutoffs
from prefect import flow, task
from tqdm.auto import tqdm

from mlip_arena.models import MLIPEnum
from mlip_arena.tasks.md import run as MD
from mlip_arena.tasks.utils import get_calculator


def identify_water_molecules(atoms):
    nl = NeighborList(natural_cutoffs(atoms), self_interaction=False, bothways=True)
    nl.update(atoms)

    water_molecule_count = 0
    visited_atoms = set()

    for atom in atoms:
        if atom.symbol == "O" and atom.index not in visited_atoms:
            indices, offsets = nl.get_neighbors(atom.index)
            hydrogen_indices = [idx for idx in indices if atoms[idx].symbol == "H"]
            if len(hydrogen_indices) == 2:
                water_molecule_count += 1
                visited_atoms.update([atom.index, *hydrogen_indices])

    return water_molecule_count


@task
def get_runtime_stats(traj: list[Atoms], atoms0: Atoms):
    restarts = []
    steps, times = [], []
    Ts, Ps, PEs, KEs = [], [], [], []
    com_drifts = []
    nproducts = []

    for atoms in tqdm(traj, desc="Analyzing trajectory"):
        try:
            energy = atoms.get_potential_energy()
            assert np.isfinite(energy), f"invalid energy: {energy}"

            restarts.append(atoms.info["restart"])
            times.append(atoms.info["datetime"])
            steps.append(atoms.info["step"])

            PEs.append(energy)
            KEs.append(atoms.get_kinetic_energy())
            Ts.append(atoms.get_temperature())

            try:
                Ps.append(atoms.get_stress()[:3].mean())
            except Exception:
                Ps.append(np.nan)

            com_drifts.append(
                (atoms.get_center_of_mass() - atoms0.get_center_of_mass()).tolist()
            )
            nproducts.append(identify_water_molecules(atoms))
        except Exception:
            continue

    restarts = np.array(restarts)
    times = np.array(times)
    steps = np.array(steps)

    # Identify unique blocks
    unique_restarts = np.unique(restarts)

    total_time_seconds = 0
    total_steps = 0

    # Iterate over unique blocks to calculate averages
    for block in unique_restarts:
        # Get the indices corresponding to the current block
        # indices = np.where(restarts == block)[0]
        indices = restarts == block
        # Extract the corresponding data values
        block_time = times[indices][-1] - times[indices][0]
        total_time_seconds += block_time.total_seconds()
        total_steps += steps[indices][-1] - steps[indices][0]

    target_steps = traj[1].info["target_steps"]
    natoms = len(atoms0)

    return {
        "natoms": natoms,
        "total_time_seconds": total_time_seconds,
        "total_steps": total_steps,
        "steps_per_second": total_steps / total_time_seconds
        if total_time_seconds != 0
        else 0,
        "seconds_per_step": total_time_seconds / total_steps
        if total_steps != 0
        else float("inf"),
        "seconds_per_step_per_atom": total_time_seconds / total_steps / natoms
        if total_steps != 0
        else float("inf"),
        "energies": PEs,
        "kinetic_energies": KEs,
        "temperatures": Ts,
        "pressures": Ps,
        "target_steps": target_steps,
        "final_step": steps[-1] if len(steps) != 0 else 0,
        "timestep": steps,
        "com_drifts": com_drifts,
        "nproducts": nproducts,
    }


@flow
def hydrogen_combustion(model: str | BaseCalculator, run_dir: Path):
    """Run hydrogen combustion simulation and analyze results."""
    atoms = io.read(Path(__file__).parent / "H256O128.extxyz")
    assert isinstance(atoms, Atoms)

    model = MLIPEnum[model] if isinstance(model, str) else model
    calculator = get_calculator(model) if isinstance(model, MLIPEnum) else model

    model_name = model.name if isinstance(model, MLIPEnum) else calculator.__class__.__name__
    traj_file = run_dir / f"{model_name}_{atoms.get_chemical_formula()}.traj"
    json_fpath = run_dir / f"{model_name}_{atoms.get_chemical_formula()}.json"

    result = MD(
        atoms=atoms,
        calculator=calculator,
        ensemble="nvt",
        dynamics="nose-hoover",
        time_step=None,
        dynamics_kwargs=dict(ttime=25 * units.fs, pfactor=None),
        total_time=1_000_000,
        temperature=[300, 3000, 3000, 300],
        pressure=None,
        velocity_seed=0,
        traj_file=traj_file,
        traj_interval=1000,
        restart=True,
    )

    traj = io.read(traj_file, index=":")

    assert len(traj) >= 2000, (
        f"Trajectory has only {len(traj)} frames and is not complete."
    )

    assert np.allclose(traj[0].positions, atoms.positions), "Initial positions do not match."

    stats = get_runtime_stats(traj, atoms0=traj[0])

    formula = traj_file.stem.split("_")[-1]
    reaction = "hydrogen"
    max_nproducts = 128  # Maximum possible number of water molecules (for H256O128)

    data = {
        "formula": formula,
        "method": model_name,
        "reaction": reaction,
        **stats,
        "yield": stats["nproducts"][-1] / max_nproducts,
    }

    df = pd.DataFrame([data])
    df.to_json(json_fpath, orient="records")

    return result