Spaces:
Running
Running
from sklearn.metrics import roc_auc_score, roc_curve | |
import datetime | |
import os | |
import umap | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import pickle | |
import json | |
from xgboost import XGBClassifier, XGBRegressor | |
import xgboost as xgb | |
from sklearn.metrics import roc_auc_score, mean_squared_error | |
import xgboost as xgb | |
from sklearn.svm import SVR | |
from sklearn.linear_model import LinearRegression | |
from sklearn.kernel_ridge import KernelRidge | |
import json | |
from sklearn.compose import TransformedTargetRegressor | |
from sklearn.preprocessing import MinMaxScaler | |
import torch | |
from transformers import AutoTokenizer, AutoModel | |
import sys | |
sys.path.append("models/") | |
from models.selfies_model.load import SELFIES as bart | |
from models.mhg_model import load as mhg | |
from models.smi_ted.smi_ted_light.load import load_smi_ted | |
datasets = {} | |
models = {} | |
downstream_models ={} | |
def avail_models_data(): | |
global datasets | |
global models | |
datasets = [{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv", "Timestamp": "2024-06-26 11:27:37"}, | |
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre", "Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"}, | |
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv", "Timestamp": "2024-06-26 11:33:47"}, | |
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo", "Timestamp": "2024-06-26 11:34:37"}, | |
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace", "Timestamp": "2024-06-26 11:36:40"}, | |
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp", "Timestamp": "2024-06-26 11:39:23"}, | |
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox", "Timestamp": "2024-06-26 11:42:43"}] | |
models = [{"Name": "bart","Model Name": "SELFIES-TED","Description": "BART model for string based SELFIES modality", "Timestamp": "2024-06-21 12:32:20"}, | |
{"Name": "mol-xl","Model Name": "Molformer", "Description": "MolFormer model for string based SMILES modality", "Timestamp": "2024-06-21 12:35:56"}, | |
{"Name": "mhg", "Model Name": "MHG-GED","Description": "Molecular hypergraph model", "Timestamp": "2024-07-10 00:09:42"}, | |
{"Name": "smi-ted", "Model Name": "SMI-TED","Description": "SMILES based encoder decoder model", "Timestamp": "2024-07-10 00:09:42"}] | |
def avail_models(raw=False): | |
global models | |
models = [{"Name": "smi-ted", "Model Name": "SMI-TED","Description": "SMILES based encoder decoder model"}, | |
{"Name": "bart","Model Name": "SELFIES-TED","Description": "BART model for string based SELFIES modality"}, | |
{"Name": "mol-xl","Model Name": "Molformer", "Description": "MolFormer model for string based SMILES modality"}, | |
{"Name": "mhg", "Model Name": "MHG-GED","Description": "Molecular hypergraph model"}, | |
] | |
if raw: return models | |
else: | |
return pd.DataFrame(models).drop('Name', axis=1) | |
return models | |
def avail_downstream_models(): | |
global downstream_models | |
with open("downstream_models.json", "r") as outfile: | |
downstream_models = json.load(outfile) | |
return downstream_models | |
def avail_datasets(): | |
global datasets | |
datasets = [{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv", | |
"Timestamp": "2024-06-26 11:27:37"}, | |
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre", | |
"Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"}, | |
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv", | |
"Timestamp": "2024-06-26 11:33:47"}, | |
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo", | |
"Timestamp": "2024-06-26 11:34:37"}, | |
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace", | |
"Timestamp": "2024-06-26 11:36:40"}, | |
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp", | |
"Timestamp": "2024-06-26 11:39:23"}, | |
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox", | |
"Timestamp": "2024-06-26 11:42:43"}] | |
return datasets | |
def reset(): | |
"""datasets = {"esol": ["smiles", "ESOL predicted log solubility in mols per litre", "data/esol", "2024-06-26 11:36:46.509324"], | |
"freesolv": ["smiles", "expt", "data/freesolv", "2024-06-26 11:37:37.393273"], | |
"lipo": ["smiles", "y", "data/lipo", "2024-06-26 11:37:37.393273"], | |
"hiv": ["smiles", "HIV_active", "data/hiv", "2024-06-26 11:37:37.393273"], | |
"bace": ["smiles", "Class", "data/bace", "2024-06-26 11:38:40.058354"], | |
"bbbp": ["smiles", "p_np", "data/bbbp","2024-06-26 11:38:40.058354"], | |
"clintox": ["smiles", "CT_TOX", "data/clintox","2024-06-26 11:38:40.058354"], | |
"sider": ["smiles","1:", "data/sider","2024-06-26 11:38:40.058354"], | |
"tox21": ["smiles",":-2", "data/tox21","2024-06-26 11:38:40.058354"] | |
}""" | |
datasets = [ | |
{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv", "Timestamp": "2024-06-26 11:27:37"}, | |
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre", "Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"}, | |
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv", "Timestamp": "2024-06-26 11:33:47"}, | |
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo", "Timestamp": "2024-06-26 11:34:37"}, | |
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace", "Timestamp": "2024-06-26 11:36:40"}, | |
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp", "Timestamp": "2024-06-26 11:39:23"}, | |
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox", "Timestamp": "2024-06-26 11:42:43"}, | |
#{"Dataset": "sider", "Input": "smiles", "Output": "1:", "path": "data/sider", "Timestamp": "2024-06-26 11:38:40.058354"}, | |
#{"Dataset": "tox21", "Input": "smiles", "Output": ":-2", "path": "data/tox21", "Timestamp": "2024-06-26 11:38:40.058354"} | |
] | |
models = [{"Name": "bart", "Description": "BART model for string based SELFIES modality", | |
"Timestamp": "2024-06-21 12:32:20"}, | |
{"Name": "mol-xl", "Description": "MolFormer model for string based SMILES modality", | |
"Timestamp": "2024-06-21 12:35:56"}, | |
{"Name": "mhg", "Description": "MHG", "Timestamp": "2024-07-10 00:09:42"}, | |
{"Name": "spec-gru", "Description": "Spectrum modality with GRU", "Timestamp": "2024-07-10 00:09:42"}, | |
{"Name": "spec-lstm", "Description": "Spectrum modality with LSTM", "Timestamp": "2024-07-10 00:09:54"}, | |
{"Name": "3d-vae", "Description": "VAE model for 3D atom positions", "Timestamp": "2024-07-10 00:10:08"}] | |
downstream_models = [ | |
{"Name": "XGBClassifier", "Description": "XG Boost Classifier", | |
"Timestamp": "2024-06-21 12:31:20"}, | |
{"Name": "XGBRegressor", "Description": "XG Boost Regressor", | |
"Timestamp": "2024-06-21 12:32:56"}, | |
{"Name": "2-FNN", "Description": "A two layer feedforward network", | |
"Timestamp": "2024-06-24 14:34:16"}, | |
{"Name": "3-FNN", "Description": "A three layer feedforward network", | |
"Timestamp": "2024-06-24 14:38:37"}, | |
] | |
with open("datasets.json", "w") as outfile: | |
json.dump(datasets, outfile) | |
with open("models.json", "w") as outfile: | |
json.dump(models, outfile) | |
with open("downstream_models.json", "w") as outfile: | |
json.dump(downstream_models, outfile) | |
def update_data_list(list_data): | |
#datasets[list_data[0]] = list_data[1:] | |
with open("datasets.json", "w") as outfile: | |
json.dump(datasets, outfile) | |
avail_models_data() | |
def update_model_list(list_model): | |
#models[list_model[0]] = list_model[1] | |
with open("models.json", "w") as outfile: | |
json.dump(list_model, outfile) | |
avail_models_data() | |
def update_downstream_model_list(list_model): | |
#models[list_model[0]] = list_model[1] | |
with open("downstream_models.json", "w") as outfile: | |
json.dump(list_model, outfile) | |
avail_models_data() | |
avail_models_data() | |
def list_models(): | |
#print(*list(models.keys()),sep='\n') | |
data = avail_models(raw=True) | |
# Convert data to a pandas DataFrame | |
df = pd.DataFrame(data) | |
# Add a column for displaying row numbers starting from 1 | |
df.index += 1 | |
# Create dropdown widget for sorting | |
sort_dropdown = widgets.Dropdown( | |
options=['Name', 'Timestamp'], | |
value='Name', | |
description='Sort by:', | |
disabled=False, | |
) | |
# Output widget to display the table | |
output = widgets.Output() | |
# Define function to update display based on sorting | |
def update_display(change): | |
with output: | |
output.clear_output(wait=True) | |
sorted_df = df.sort_values(by=sort_dropdown.value) | |
display(sorted_df.style.set_properties(**{ | |
'text-align': 'left', 'border': '1px solid #ddd', | |
})) | |
# Attach the update_display function to the dropdown widget | |
sort_dropdown.observe(update_display, names='value') | |
# Display the dropdown and the table initially | |
display(sort_dropdown, output) | |
update_display(None) | |
def list_downstream_models(): | |
#print(*list(models.keys()),sep='\n') | |
data = avail_downstream_models() | |
# Convert data to a pandas DataFrame | |
df = pd.DataFrame(data) | |
# Add a column for displaying row numbers starting from 1 | |
df.index += 1 | |
# Create dropdown widget for sorting | |
sort_dropdown = widgets.Dropdown( | |
options=['Name', 'Timestamp'], | |
value='Timestamp', | |
description='Sort by:', | |
disabled=False, | |
) | |
# Output widget to display the table | |
output = widgets.Output() | |
# Define function to update display based on sorting | |
def update_display(change): | |
with output: | |
output.clear_output(wait=True) | |
sorted_df = df.sort_values(by=sort_dropdown.value) | |
display(sorted_df.style.set_properties(**{ | |
'text-align': 'left', 'border': '1px solid #ddd', | |
})) | |
# Attach the update_display function to the dropdown widget | |
sort_dropdown.observe(update_display, names='value') | |
# Display the dropdown and the table initially | |
display(sort_dropdown, output) | |
update_display(None) | |
def list_data(): | |
#print(*list(datasets.keys()),sep='\n') | |
data = avail_datasets() | |
# Convert data to a pandas DataFrame | |
df = pd.DataFrame(data) | |
# Add a column for displaying row numbers starting from 1 | |
df.index += 1 | |
# Create dropdown widget for sorting | |
sort_dropdown = widgets.Dropdown( | |
options=['Dataset', 'Input', 'Output', 'Path', 'Timestamp'], | |
value='Input', | |
description='Sort by:', | |
disabled=False, | |
) | |
# Output widget to display the table | |
output = widgets.Output() | |
# Define function to update display based on sorting | |
def update_display(change): | |
with output: | |
output.clear_output(wait=True) | |
sorted_df = df.sort_values(by=sort_dropdown.value) | |
display(sorted_df.style.set_properties(**{ | |
'text-align': 'left', 'border': '1px solid #ddd', | |
})) | |
# Attach the update_display function to the dropdown widget | |
sort_dropdown.observe(update_display, names='value') | |
# Display the dropdown and the table initially | |
display(sort_dropdown, output) | |
update_display(None) | |
def vizualize(roc_auc,fpr, tpr, features, labels): | |
#def vizualize(features, labels): | |
reducer = umap.UMAP(metric="jaccard", n_neighbors=20, n_components=2, low_memory=True, min_dist=0.001, verbose=False) | |
features_umap = reducer.fit_transform(features) | |
x = labels.values | |
index_0 = [index for index in range(len(x)) if x[index] == 0] | |
index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap[index_0] | |
class_1 = features_umap[index_1] | |
# Function to create ROC AUC plot | |
def plot_roc_auc(): | |
plt.figure(figsize=(8, 6)) | |
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.4f})') | |
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') | |
plt.xlim([0.0, 1.0]) | |
plt.ylim([0.0, 1.05]) | |
plt.xlabel('False Positive Rate') | |
plt.ylabel('True Positive Rate') | |
plt.title('Receiver Operating Characteristic') | |
plt.legend(loc='lower right') | |
plt.show() | |
# Function to create scatter plot of the dataset distribution | |
def plot_distribution(): | |
plt.figure(figsize=(8, 6)) | |
#plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.coolwarm, edgecolors='k') | |
plt.scatter(class_1[:, 0], class_1[:, 1], c='red', label='Class 1') | |
plt.scatter(class_0[:, 0], class_0[:, 1], c='blue', label='Class 0') | |
plt.xlabel('Feature 1') | |
plt.ylabel('Feature 2') | |
plt.title('Dataset Distribution') | |
plt.show() | |
# Create tabs using ipywidgets | |
tab_contents = ['ROC AUC', 'Distribution'] | |
children = [widgets.Output(), widgets.Output()] | |
tab = widgets.Tab() | |
tab.children = children | |
for i in range(len(tab_contents)): | |
tab.set_title(i, tab_contents[i]) | |
# Display plots in their respective tabs | |
with children[0]: | |
plot_roc_auc() | |
with children[1]: | |
plot_distribution() | |
# Display the tab widget | |
display(tab) | |
def get_representation(train_data,test_data,model_type, return_tensor=True): | |
alias = {"MHG-GED": "mhg", "SELFIES-TED": "bart", "MolFormer": "mol-xl", "Molformer": "mol-xl", "SMI-TED": "smi-ted"} | |
if model_type in alias.keys(): | |
model_type = alias[model_type] | |
if model_type == "mhg": | |
model = mhg.load("models/mhg_model/pickles/mhggnn_pretrained_model_0724_2023.pickle") | |
with torch.no_grad(): | |
train_emb = model.encode(train_data) | |
x_batch = torch.stack(train_emb) | |
test_emb = model.encode(test_data) | |
x_batch_test = torch.stack(test_emb) | |
if not return_tensor: | |
x_batch = pd.DataFrame(x_batch) | |
x_batch_test = pd.DataFrame(x_batch_test) | |
elif model_type == "bart": | |
model = bart() | |
model.load() | |
x_batch = model.encode(train_data, return_tensor=return_tensor) | |
x_batch_test = model.encode(test_data, return_tensor=return_tensor) | |
elif model_type == "smi-ted": | |
model = load_smi_ted(folder='./models/smi_ted/smi_ted_light', ckpt_filename='smi-ted-Light_40.pt') | |
with torch.no_grad(): | |
x_batch = model.encode(train_data, return_torch=return_tensor) | |
x_batch_test = model.encode(test_data, return_torch=return_tensor) | |
elif model_type == "mol-xl": | |
model = AutoModel.from_pretrained("ibm/MoLFormer-XL-both-10pct", deterministic_eval=True, | |
trust_remote_code=True) | |
tokenizer = AutoTokenizer.from_pretrained("ibm/MoLFormer-XL-both-10pct", trust_remote_code=True) | |
if type(train_data) == list: | |
inputs = tokenizer(train_data, padding=True, return_tensors="pt") | |
else: | |
inputs = tokenizer(list(train_data.values), padding=True, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
x_batch = outputs.pooler_output | |
if type(test_data) == list: | |
inputs = tokenizer(test_data, padding=True, return_tensors="pt") | |
else: | |
inputs = tokenizer(list(test_data.values), padding=True, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
x_batch_test = outputs.pooler_output | |
if not return_tensor: | |
x_batch = pd.DataFrame(x_batch) | |
x_batch_test = pd.DataFrame(x_batch_test) | |
return x_batch, x_batch_test | |
def single_modal(model,dataset, downstream_model,params): | |
print(model) | |
alias = {"MHG-GED":"mhg", "SELFIES-TED": "bart", "MolFormer":"mol-xl", "SMI-TED": "smi-ted"} | |
data = avail_models(raw=True) | |
df = pd.DataFrame(data) | |
print(list(df["Name"].values)) | |
if alias[model] in list(df["Name"].values): | |
if model in alias.keys(): | |
model_type = alias[model] | |
else: | |
model_type = model | |
else: | |
print("Model not available") | |
return | |
data = avail_datasets() | |
df = pd.DataFrame(data) | |
print(list(df["Dataset"].values)) | |
if dataset in list(df["Dataset"].values): | |
task = dataset | |
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1: | |
x_batch, y_batch, x_batch_test, y_batch_test = pickle.load(f1) | |
print(f" Representation loaded successfully") | |
else: | |
print("Custom Dataset") | |
#return | |
components = dataset.split(",") | |
train_data = pd.read_csv(components[0])[components[2]] | |
test_data = pd.read_csv(components[1])[components[2]] | |
y_batch = pd.read_csv(components[0])[components[3]] | |
y_batch_test = pd.read_csv(components[1])[components[3]] | |
x_batch, x_batch_test = get_representation(train_data,test_data,model_type) | |
print(f" Representation loaded successfully") | |
print(f" Calculating ROC AUC Score ...") | |
if downstream_model == "XGBClassifier": | |
xgb_predict_concat = XGBClassifier(**params) # n_estimators=5000, learning_rate=0.01, max_depth=10 | |
xgb_predict_concat.fit(x_batch, y_batch) | |
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1] | |
roc_auc = roc_auc_score(y_batch_test, y_prob) | |
fpr, tpr, _ = roc_curve(y_batch_test, y_prob) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
try: | |
with open(f"plot_emb/{task}_{model_type}.pkl", "rb") as f1: | |
class_0,class_1 = pickle.load(f1) | |
except: | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
index_0 = [index for index in range(len(x)) if x[index] == 0] | |
index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap[index_0] | |
class_1 = features_umap[index_1] | |
print("Generating latent plots : Done") | |
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch ) | |
result = f"ROC-AUC Score: {roc_auc:.4f}" | |
return result, roc_auc,fpr, tpr, class_0, class_1 | |
elif downstream_model == "DefaultClassifier": | |
xgb_predict_concat = XGBClassifier() # n_estimators=5000, learning_rate=0.01, max_depth=10 | |
xgb_predict_concat.fit(x_batch, y_batch) | |
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1] | |
roc_auc = roc_auc_score(y_batch_test, y_prob) | |
fpr, tpr, _ = roc_curve(y_batch_test, y_prob) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
try: | |
with open(f"plot_emb/{task}_{model_type}.pkl", "rb") as f1: | |
class_0,class_1 = pickle.load(f1) | |
except: | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors= 10, n_components=2, low_memory=True, min_dist=0.1, verbose=False) | |
n_samples = np.minimum(1000,len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
index_0 = [index for index in range(len(x)) if x[index] == 0] | |
index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap[index_0] | |
class_1 = features_umap[index_1] | |
print("Generating latent plots : Done") | |
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch ) | |
result = f"ROC-AUC Score: {roc_auc:.4f}" | |
return result, roc_auc,fpr, tpr, class_0, class_1 | |
elif downstream_model == "SVR": | |
regressor = SVR(**params) | |
model = TransformedTargetRegressor(regressor= regressor, | |
transformer = MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch,y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
#index_0 = [index for index in range(len(x)) if x[index] == 0] | |
#index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap#[index_0] | |
class_1 = features_umap#[index_1] | |
print("Generating latent plots : Done") | |
return result, RMSE_score,y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "Kernel Ridge": | |
regressor = KernelRidge(**params) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
# index_0 = [index for index in range(len(x)) if x[index] == 0] | |
# index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap#[index_0] | |
class_1 = features_umap#[index_1] | |
print("Generating latent plots : Done") | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "Linear Regression": | |
regressor = LinearRegression(**params) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
# index_0 = [index for index in range(len(x)) if x[index] == 0] | |
# index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap#[index_0] | |
class_1 = features_umap#[index_1] | |
print("Generating latent plots : Done") | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "DefaultRegressor": | |
regressor = SVR(kernel="rbf", degree=3, C=5, gamma="scale", epsilon=0.01) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
x = y_batch.values[:n_samples] | |
# index_0 = [index for index in range(len(x)) if x[index] == 0] | |
# index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap#[index_0] | |
class_1 = features_umap#[index_1] | |
print("Generating latent plots : Done") | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
def multi_modal(model_list,dataset, downstream_model,params): | |
print(model_list) | |
data = avail_datasets() | |
df = pd.DataFrame(data) | |
list(df["Dataset"].values) | |
if dataset in list(df["Dataset"].values): | |
task = dataset | |
predefined = True | |
else: | |
predefined = False | |
components = dataset.split(",") | |
train_data = pd.read_csv(components[0])[components[2]] | |
test_data = pd.read_csv(components[1])[components[2]] | |
y_batch = pd.read_csv(components[0])[components[3]] | |
y_batch_test = pd.read_csv(components[1])[components[3]] | |
print("Custom Dataset loaded") | |
data = avail_models(raw=True) | |
df = pd.DataFrame(data) | |
list(df["Name"].values) | |
alias = {"MHG-GED":"mhg", "SELFIES-TED": "bart", "MolFormer":"mol-xl", "SMI-TED":"smi-ted"} | |
#if set(model_list).issubset(list(df["Name"].values)): | |
if set(model_list).issubset(list(alias.keys())): | |
for i, model in enumerate(model_list): | |
if model in alias.keys(): | |
model_type = alias[model] | |
else: | |
model_type = model | |
if i == 0: | |
if predefined: | |
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1: | |
x_batch, y_batch, x_batch_test, y_batch_test = pickle.load(f1) | |
print(f" Loaded representation/{task}_{model_type}.pkl") | |
else: | |
x_batch, x_batch_test = get_representation(train_data, test_data, model_type) | |
x_batch = pd.DataFrame(x_batch) | |
x_batch_test = pd.DataFrame(x_batch_test) | |
else: | |
if predefined: | |
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1: | |
x_batch_1, y_batch_1, x_batch_test_1, y_batch_test_1 = pickle.load(f1) | |
print(f" Loaded representation/{task}_{model_type}.pkl") | |
else: | |
x_batch_1, x_batch_test_1 = get_representation(train_data, test_data, model_type) | |
x_batch_1 = pd.DataFrame(x_batch_1) | |
x_batch_test_1 = pd.DataFrame(x_batch_test_1) | |
x_batch = pd.concat([x_batch, x_batch_1], axis=1) | |
x_batch_test = pd.concat([x_batch_test, x_batch_test_1], axis=1) | |
else: | |
print("Model not available") | |
return | |
num_columns = x_batch_test.shape[1] | |
x_batch_test.columns = [f'{i + 1}' for i in range(num_columns)] | |
num_columns = x_batch.shape[1] | |
x_batch.columns = [f'{i + 1}' for i in range(num_columns)] | |
print(f"Representations loaded successfully") | |
try: | |
with open(f"plot_emb/{task}_multi.pkl", "rb") as f1: | |
class_0, class_1 = pickle.load(f1) | |
except: | |
print("Generating latent plots") | |
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1, | |
verbose=False) | |
n_samples = np.minimum(1000, len(x_batch)) | |
features_umap = reducer.fit_transform(x_batch[:n_samples]) | |
if "Classifier" in downstream_model: | |
x = y_batch.values[:n_samples] | |
index_0 = [index for index in range(len(x)) if x[index] == 0] | |
index_1 = [index for index in range(len(x)) if x[index] == 1] | |
class_0 = features_umap[index_0] | |
class_1 = features_umap[index_1] | |
else: | |
class_0 = features_umap | |
class_1 = features_umap | |
print("Generating latent plots : Done") | |
print(f" Calculating ROC AUC Score ...") | |
if downstream_model == "XGBClassifier": | |
xgb_predict_concat = XGBClassifier(**params)#n_estimators=5000, learning_rate=0.01, max_depth=10) | |
xgb_predict_concat.fit(x_batch, y_batch) | |
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1] | |
roc_auc = roc_auc_score(y_batch_test, y_prob) | |
fpr, tpr, _ = roc_curve(y_batch_test, y_prob) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch ) | |
#vizualize(x_batch_test, y_batch_test) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
result = f"ROC-AUC Score: {roc_auc:.4f}" | |
return result, roc_auc,fpr, tpr, class_0, class_1 | |
elif downstream_model == "DefaultClassifier": | |
xgb_predict_concat = XGBClassifier()#n_estimators=5000, learning_rate=0.01, max_depth=10) | |
xgb_predict_concat.fit(x_batch, y_batch) | |
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1] | |
roc_auc = roc_auc_score(y_batch_test, y_prob) | |
fpr, tpr, _ = roc_curve(y_batch_test, y_prob) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch ) | |
#vizualize(x_batch_test, y_batch_test) | |
print(f"ROC-AUC Score: {roc_auc:.4f}") | |
result = f"ROC-AUC Score: {roc_auc:.4f}" | |
return result, roc_auc,fpr, tpr, class_0, class_1 | |
elif downstream_model == "SVR": | |
regressor = SVR(**params) | |
model = TransformedTargetRegressor(regressor= regressor, | |
transformer = MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch,y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
return result, RMSE_score,y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "Linear Regression": | |
regressor = LinearRegression(**params) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "Kernel Ridge": | |
regressor = KernelRidge(**params) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
elif downstream_model == "DefaultRegressor": | |
regressor = SVR(kernel="rbf", degree=3, C=5, gamma="scale", epsilon=0.01) | |
model = TransformedTargetRegressor(regressor=regressor, | |
transformer=MinMaxScaler(feature_range=(-1, 1)) | |
).fit(x_batch, y_batch) | |
y_prob = model.predict(x_batch_test) | |
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob)) | |
print(f"RMSE Score: {RMSE_score:.4f}") | |
result = f"RMSE Score: {RMSE_score:.4f}" | |
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1 | |
def finetune_optuna(x_batch,y_batch, x_batch_test, y_test ): | |
print(f" Finetuning with Optuna and calculating ROC AUC Score ...") | |
X_train = x_batch.values | |
y_train = y_batch.values | |
X_test = x_batch_test.values | |
y_test = y_test.values | |
def objective(trial): | |
# Define parameters to be optimized | |
params = { | |
# 'objective': 'binary:logistic', | |
'eval_metric': 'auc', | |
'verbosity': 0, | |
'n_estimators': trial.suggest_int('n_estimators', 1000, 10000), | |
# 'booster': trial.suggest_categorical('booster', ['gbtree', 'gblinear', 'dart']), | |
# 'lambda': trial.suggest_loguniform('lambda', 1e-8, 1.0), | |
'alpha': trial.suggest_loguniform('alpha', 1e-8, 1.0), | |
'max_depth': trial.suggest_int('max_depth', 1, 12), | |
# 'eta': trial.suggest_loguniform('eta', 1e-8, 1.0), | |
# 'gamma': trial.suggest_loguniform('gamma', 1e-8, 1.0), | |
# 'grow_policy': trial.suggest_categorical('grow_policy', ['depthwise', 'lossguide']), | |
# "subsample": trial.suggest_float("subsample", 0.05, 1.0), | |
# "colsample_bytree": trial.suggest_float("colsample_bytree", 0.05, 1.0), | |
} | |
# Train XGBoost model | |
dtrain = xgb.DMatrix(X_train, label=y_train) | |
dtest = xgb.DMatrix(X_test, label=y_test) | |
model = xgb.train(params, dtrain) | |
# Predict probabilities | |
y_pred = model.predict(dtest) | |
# Calculate ROC AUC score | |
roc_auc = roc_auc_score(y_test, y_pred) | |
print("ROC_AUC : ", roc_auc) | |
return roc_auc | |