|
import numpy as np |
|
import gradio as gr |
|
import pandas as pd |
|
from sklearn.preprocessing import MinMaxScaler |
|
from surrogate import CrabNetSurrogateModel, PARAM_BOUNDS |
|
from pydantic import ( |
|
BaseModel, |
|
ValidationError, |
|
ValidationInfo, |
|
field_validator, |
|
model_validator, |
|
) |
|
|
|
model = CrabNetSurrogateModel() |
|
|
|
|
|
example_parameterization = { |
|
"N": 3, |
|
"alpha": 0.5, |
|
"d_model": 512, |
|
"dim_feedforward": 2048, |
|
"dropout": 0.1, |
|
"emb_scaler": 0.5, |
|
"epochs_step": 10, |
|
"eps": 0.000001, |
|
"fudge": 0.02, |
|
"heads": 4, |
|
"k": 6, |
|
"lr": 0.001, |
|
"pe_resolution": 5000, |
|
"ple_resolution": 5000, |
|
"pos_scaler": 0.5, |
|
"weight_decay": 0, |
|
"batch_size": 32, |
|
"out_hidden4": 128, |
|
"betas1": 0.9, |
|
"betas2": 0.999, |
|
"bias": False, |
|
"criterion": "RobustL1", |
|
"elem_prop": "mat2vec", |
|
"train_frac": 0.5, |
|
} |
|
|
|
example_results = model.surrogate_evaluate([example_parameterization]) |
|
example_result = example_results[0] |
|
|
|
|
|
scalers = {} |
|
for param_info in PARAM_BOUNDS: |
|
if param_info["type"] == "range": |
|
scaler = MinMaxScaler() |
|
|
|
scaler.fit([[bound] for bound in param_info["bounds"]]) |
|
scalers[param_info["name"]] = scaler |
|
|
|
|
|
BLINDED_PARAM_BOUNDS = [ |
|
{"name": "x1", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x2", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x3", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x4", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x5", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x6", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x7", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x8", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x9", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x10", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x11", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x12", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x13", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x14", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x15", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x16", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x17", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x18", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x19", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "x20", "type": "range", "bounds": [0.0, 1.0]}, |
|
{"name": "c1", "type": "choice", "values": ["c1_0", "c1_1"]}, |
|
{"name": "c2", "type": "choice", "values": ["c2_0", "c2_1"]}, |
|
{"name": "c3", "type": "choice", "values": ["c3_0", "c3_1"]}, |
|
{"name": "fidelity1", "type": "range", "bounds": [0.0, 1.0]}, |
|
] |
|
|
|
|
|
class BlindedParameterization(BaseModel): |
|
x1: float |
|
x2: float |
|
x3: float |
|
x4: float |
|
x5: float |
|
x6: float |
|
x7: float |
|
x8: float |
|
x9: float |
|
x10: float |
|
x11: float |
|
x12: float |
|
x13: float |
|
x14: float |
|
x15: float |
|
x16: float |
|
x17: float |
|
x18: float |
|
x19: float |
|
x20: float |
|
c1: str |
|
c2: str |
|
c3: str |
|
fidelity1: float |
|
|
|
@field_validator("*") |
|
def check_bounds(cls, v: int, info: ValidationInfo) -> int: |
|
param = next( |
|
(item for item in BLINDED_PARAM_BOUNDS if item["name"] == info.field_name), |
|
None, |
|
) |
|
if param is None: |
|
return v |
|
|
|
if param["type"] == "range": |
|
min_val, max_val = param["bounds"] |
|
if not min_val <= v <= max_val: |
|
raise ValueError( |
|
f"{info.field_name} must be between {min_val} and {max_val}" |
|
) |
|
elif param["type"] == "choice": |
|
if v not in param["values"]: |
|
raise ValueError(f"{info.field_name} must be one of {param['values']}") |
|
|
|
return v |
|
|
|
@model_validator(mode="after") |
|
def check_constraints(self) -> "BlindedParameterization": |
|
if self.x19 > self.x20: |
|
raise ValueError( |
|
f"Received x19={self.x19} which should be less than x20={self.x20}" |
|
) |
|
if self.x6 + self.x15 > 1.0: |
|
raise ValueError( |
|
f"Received x6={self.x6} and x15={self.x15} which should sum to less than or equal to 1.0" |
|
) |
|
|
|
|
|
|
|
def convert_to_blinded(params): |
|
blinded_params = {} |
|
numeric_index = 1 |
|
choice_index = 1 |
|
for param in PARAM_BOUNDS: |
|
if param["type"] == "range": |
|
key = f"x{numeric_index}" if param["name"] != "train_frac" else "fidelity1" |
|
blinded_params[key] = scalers[param["name"]].transform( |
|
[[params[param["name"]]]] |
|
)[0][0] |
|
numeric_index += 1 if param["name"] != "train_frac" else 0 |
|
elif param["type"] == "choice": |
|
key = f"c{choice_index}" |
|
choice_index = param["values"].index(params[param["name"]]) |
|
blinded_params[key] = f"{key}_{choice_index}" |
|
choice_index += 1 |
|
return blinded_params |
|
|
|
|
|
|
|
def convert_from_blinded(blinded_params): |
|
original_params = {} |
|
numeric_index = 1 |
|
choice_index = 1 |
|
for param in PARAM_BOUNDS: |
|
if param["type"] == "range": |
|
key = f"x{numeric_index}" if param["name"] != "train_frac" else "fidelity1" |
|
original_params[param["name"]] = scalers[param["name"]].inverse_transform( |
|
[[blinded_params[key]]] |
|
)[0][0] |
|
numeric_index += 1 if param["name"] != "train_frac" else 0 |
|
elif param["type"] == "choice": |
|
key = f"c{choice_index}" |
|
choice_value = blinded_params[key].split("_")[-1] |
|
original_params[param["name"]] = param["values"][int(choice_value)] |
|
choice_index += 1 |
|
return original_params |
|
|
|
|
|
def evaluate(*args): |
|
|
|
blinded_params = dict(zip([param["name"] for param in BLINDED_PARAM_BOUNDS], args)) |
|
original_params = convert_from_blinded(blinded_params) |
|
BlindedParameterization(**blinded_params) |
|
|
|
params_list = [original_params] |
|
results = model.surrogate_evaluate(params_list) |
|
results_list = [list(result.values()) for result in results] |
|
return results_list |
|
|
|
|
|
def get_interface(param_info, numeric_index, choice_index): |
|
key = param_info["name"] |
|
default_value = example_parameterization[key] |
|
if param_info["type"] == "range": |
|
|
|
scaler = scalers[key] |
|
scaler.fit([[bound] for bound in param_info["bounds"]]) |
|
scaled_value = scaler.transform([[default_value]])[0][0] |
|
scaled_bounds = scaler.transform([[bound] for bound in param_info["bounds"]]) |
|
label = f"fidelity1" if key == "train_frac" else f"x{numeric_index}" |
|
return ( |
|
gr.Slider( |
|
value=scaled_value, |
|
minimum=scaled_bounds[0][0], |
|
maximum=scaled_bounds[1][0], |
|
label=label, |
|
step=(scaled_bounds[1][0] - scaled_bounds[0][0]) / 100, |
|
), |
|
numeric_index + 1, |
|
choice_index, |
|
) |
|
elif param_info["type"] == "choice": |
|
return ( |
|
gr.Radio( |
|
choices=[ |
|
f"c{choice_index}_{i}" for i in range(len(param_info["values"])) |
|
], |
|
label=f"c{choice_index}", |
|
value=f"c{choice_index}_{param_info['values'].index(default_value)}", |
|
), |
|
numeric_index, |
|
choice_index + 1, |
|
) |
|
|
|
|
|
|
|
blinded_results = evaluate(*[0.5] * 20, "c1_0", "c2_0", "c3_0", 0.5) |
|
|
|
numeric_index = 1 |
|
choice_index = 1 |
|
inputs = [] |
|
for param in PARAM_BOUNDS: |
|
input, numeric_index, choice_index = get_interface( |
|
param, numeric_index, choice_index |
|
) |
|
inputs.append(input) |
|
|
|
iface = gr.Interface( |
|
title="CrabNetSurrogateModel", |
|
fn=evaluate, |
|
inputs=inputs, |
|
outputs=gr.Numpy( |
|
value=np.array([list(example_result.values())]), |
|
headers=[f"y{i+1}" for i in range(len(example_result))], |
|
col_count=(len(example_result), "fixed"), |
|
datatype=["number"] * len(example_result), |
|
), |
|
description=""" |
|
## Objectives |
|
|
|
**Minimize `y1`, `y2`, `y3`, and `y4`** |
|
|
|
### Correlations |
|
|
|
- `y1` and `y2` are correlated |
|
- `y1` is anticorrelated with `y3` |
|
- `y2` is anticorrelated with `y3` |
|
|
|
### Noise |
|
|
|
`y1`, `y2`, and `y3` are stochastic with heteroskedastic, parameter-free |
|
noise, whereas `y4` is deterministic, but still considered 'black-box'. In |
|
other words, repeat calls with the same input arguments will result in |
|
different values for `y1`, `y2`, and `y3`, but the same value for `y4`. |
|
|
|
### Objective thresholds |
|
|
|
If `y1` is greater than 0.2, the result is considered "bad" no matter how |
|
good the other values are. If `y2` is greater than 0.7, the result is |
|
considered "bad" no matter how good the other values are. If `y3` is greater |
|
than 1800, the result is considered "bad" no matter how good the other |
|
values are. If `y4` is greater than 40e6, the result is considered "bad" no |
|
matter how good the other values are. |
|
|
|
## Search Space |
|
|
|
### Fidelity |
|
|
|
`fidelity1` is a fidelity parameter. The lowest fidelity is 0, and the |
|
highest fidelity is 1. The higher the fidelity, the more expensive the |
|
evaluation, and the higher the quality. |
|
|
|
NOTE: `fidelity1` and `y3` are correlated. |
|
|
|
### Constraints |
|
|
|
- x<sub>19</sub> < x<sub>20</sub> |
|
- x<sub>6</sub> + x<sub>15</sub> β€ 1.0 |
|
|
|
### Parameter bounds |
|
|
|
- 0 β€ x<sub>i</sub> β€ 1 for i β {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, |
|
14, 15, 16, 17, 18, 19, 20} |
|
- c<sub>1</sub> β {c1_0, c1_1} |
|
- c<sub>2</sub> β {c2_0, c2_1} |
|
- c<sub>3</sub> β {c3_0, c3_1, c3_2} |
|
- 0 β€ fidelity1 β€ 1 |
|
|
|
## Notion of best |
|
|
|
Thresholded Pareto front hypervolume vs. running cost for three different |
|
budgets, and averaged over 10 search campaigns. |
|
|
|
References: |
|
|
|
(1) Baird, S. G.; Liu, M.; Sparks, T. D. High-Dimensional Bayesian |
|
Optimization of 23 Hyperparameters over 100 Iterations for an |
|
Attention-Based Network to Predict Materials Property: A Case Study on |
|
CrabNet Using Ax Platform and SAASBO. Computational Materials Science |
|
2022, 211, 111505. https://doi.org/10.1016/j.commatsci.2022.111505. |
|
(2) Baird, S. G.; Parikh, J. N.; Sparks, T. D. Materials Science |
|
Optimization Benchmark Dataset for High-Dimensional, Multi-Objective, |
|
Multi-Fidelity Optimization of CrabNet Hyperparameters. ChemRxiv March |
|
7, 2023. https://doi.org/10.26434/chemrxiv-2023-9s6r7. |
|
""", |
|
) |
|
iface.launch() |
|
|