In [None]:
import os

from ase import units
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from dotenv import load_dotenv
from prefect import flow, task
from prefect_dask import DaskTaskRunner

from mlip_arena.models import REGISTRY, MLIPEnum
from mlip_arena.tasks.md import run as MD
from mlip_arena.tasks.stability.input import get_atoms_from_db

load_dotenv()

HF_TOKEN = os.environ.get("HF_TOKEN", None)
MP_API_KEY = os.environ.get("MP_API_KEY", None)

In [None]:
nodes_per_alloc = 1
gpus_per_alloc = 4
ntasks = 1

cluster_kwargs = dict(
 cores=1,
 memory="64 GB",
 processes=1,
 shebang="#!/bin/bash",
 account="matgen",
 walltime="03:00:00",
 # job_cpu=128,
 job_mem="0",
 job_script_prologue=[
 "source ~/.bashrc",
 "module load python",
 "source activate /pscratch/sd/c/cyrusyc/.conda/mlip-arena",
 ],
 job_directives_skip=["-n", "--cpus-per-task", "-J"],
 job_extra_directives=[
 "-J stability-npt",
 "-q preempt",
 "--time-min=00:30:00",
 "--comment=12:00:00",
 f"-N {nodes_per_alloc}",
 "-C gpu",
 f"-G {gpus_per_alloc}",
 ],
)

cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())
cluster.adapt(minimum_jobs=5, maximum_jobs=10)
client = Client(cluster)

In [None]:
from mlip_arena.tasks.utils import get_calculator

selected_models = [
 "MACE-MP(M)",
 "CHGNet",
 "M3GNet",
 "MatterSim",
 "eqV2(OMat)",
 "MACE-MPA",
 "ORBv2",
 "SevenNet",
 "ALIGNN",
]


@task
def run_one(
 atoms,
 model,
):
 result = MD.with_options(
 timeout_seconds=600,
 retries=2,
 refresh_cache=True
 )(
 atoms=atoms,
 calculator=get_calculator(
 model.name,
 calculator_kwargs=None,
 ),
 ensemble="npt",
 dynamics="nose-hoover",
 time_step=None,
 dynamics_kwargs=dict(
 ttime=25 * units.fs, pfactor=((75 * units.fs) ** 2) * 1e2 * units.GPa
 ),
 total_time=1e4, # 5e4, # fs
 temperature=[300, 3000],
 pressure=[0, 5e2 * units.GPa], # 500 GPa / 10 ps = 50 GPa / 1 ps
 traj_file=f"{REGISTRY[model.name]['family']}/{model.name}_{atoms.info.get('material_id', 'random')}_{atoms.get_chemical_formula()}_npt.traj",
 traj_interval=10,
 )

 return result


@flow
def compress():
 futures = []
 # To download the database automatically, `huggingface_hub login` or provide HF_TOKEN
 for atoms in get_atoms_from_db("random-mixture.db", force_download=False):
 for model in MLIPEnum:
 if model.name not in selected_models:
 continue

 if "stability" not in REGISTRY[model.name]["gpu-tasks"]:
 continue

 try:
 future = run_one.with_options(
 timeout_seconds=600, retries=2, refresh_cache=False
 ).submit(atoms.copy(), model)
 futures.append(future)
 except:
 continue

 return [future.result(raise_on_failure=False) for future in futures]

In [None]:
compress.with_options(
 task_runner=DaskTaskRunner(address=client.scheduler.address), log_prints=True
)()