Spaces:
Running
Running
| from functools import partial | |
| from pathlib import Path | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download | |
| from prefect import Task, flow, task | |
| from prefect.client.schemas.objects import TaskRun | |
| from prefect.futures import wait | |
| from prefect.states import State | |
| from ase.db import connect | |
| from mlip_arena.data.local import SafeHDFStore | |
| from mlip_arena.models import REGISTRY, MLIPEnum | |
| from mlip_arena.tasks.eos import run as EOS | |
| def get_atoms_from_db(db_path: Path | str): | |
| db_path = Path(db_path) | |
| if not db_path.exists(): | |
| db_path = hf_hub_download( | |
| repo_id="atomind/mlip-arena", | |
| repo_type="dataset", | |
| subfolder=f"{Path(__file__).parent.name}", | |
| filename=str(db_path), | |
| ) | |
| with connect(db_path) as db: | |
| for row in db.select(): | |
| yield row.toatoms() | |
| def save_to_hdf( | |
| tsk: Task, run: TaskRun, state: State, fpath: Path | str, table_name: str | |
| ): | |
| """ | |
| Define a hook on completion of EOS task to save results to HDF5 file. | |
| """ | |
| if run.state.is_failed(): | |
| return | |
| result = run.state.result(raise_on_failure=False) | |
| if not isinstance(result, dict): | |
| return | |
| try: | |
| atoms = result["atoms"] | |
| calculator_name = ( | |
| run.task_inputs["calculator_name"] or result["calculator_name"] | |
| ) | |
| energies = [float(e) for e in result["eos"]["energies"]] | |
| formula = atoms.get_chemical_formula() | |
| df = pd.DataFrame( | |
| { | |
| "method": calculator_name, | |
| "formula": formula, | |
| "total_run_time": run.total_run_time, | |
| "v0": result["v0"], | |
| "e0": result["e0"], | |
| "b0": result["b0"], | |
| "b1": result["b1"], | |
| "volume": result["eos"]["volumes"], | |
| "energy": energies, | |
| } | |
| ) | |
| fpath = Path(fpath) | |
| fpath = fpath.with_stem(fpath.stem + f"_{calculator_name}") | |
| family_path = Path(__file__).parent / REGISTRY[calculator_name]["family"] | |
| family_path.mkdir(parents=True, exist_ok=True) | |
| df.to_json(family_path / f"{calculator_name}_{formula}.json", indent=2) | |
| with SafeHDFStore(fpath, mode="a") as store: | |
| store.append( | |
| table_name, | |
| df, | |
| format="table", | |
| data_columns=True, | |
| min_itemsize={"formula": 50, "method": 20}, | |
| ) | |
| except Exception as e: | |
| print(e) | |
| def run( | |
| db_path: Path | str, | |
| out_path: Path | str, | |
| table_name: str, | |
| optimizer="FIRE", | |
| optimizer_kwargs=None, | |
| filter="FrechetCell", | |
| filter_kwargs=None, | |
| criterion=dict(fmax=0.1, steps=1000), | |
| max_abs_strain=0.20, | |
| concurrent=False, | |
| cache=True, | |
| ): | |
| EOS_ = EOS.with_options( | |
| on_completion=[partial(save_to_hdf, fpath=out_path, table_name=table_name)], | |
| refresh_cache=not cache, | |
| ) | |
| futures = [] | |
| for atoms in get_atoms_from_db(db_path): | |
| for mlip in MLIPEnum: | |
| if not REGISTRY[mlip.name]["npt"]: | |
| continue | |
| if Path(__file__).parent.name not in ( | |
| REGISTRY[mlip.name].get("cpu-tasks", []) | |
| + REGISTRY[mlip.name].get("gpu-tasks", []) | |
| ): | |
| continue | |
| future = EOS_.submit( | |
| atoms=atoms, | |
| calculator_name=mlip.name, | |
| calculator_kwargs=dict(), | |
| optimizer=optimizer, | |
| optimizer_kwargs=optimizer_kwargs, | |
| filter=filter, | |
| filter_kwargs=filter_kwargs, | |
| criterion=criterion, | |
| max_abs_strain=max_abs_strain, | |
| concurrent=concurrent, | |
| persist_opt=cache, | |
| cache_opt=cache, | |
| # return_state=True | |
| ) | |
| futures.append(future) | |
| wait(futures) | |
| return [ | |
| f.result(timeout=None, raise_on_failure=False) | |
| for f in futures | |
| if f.state.is_completed() | |
| ] | |