Spaces:
Running
Running
import numpy as np | |
import gradio as gr | |
import pickle | |
import plotly.graph_objects as go | |
import plotly.express as px | |
import pandas as pd | |
import tempfile | |
import os | |
# Split the styling into two separate functions for clarity and simplicity | |
def style_feasible_column(val): | |
"""Style for the Feasible? column""" | |
if val == 'Success': | |
return 'color:white;background-color: lightgreen' | |
elif val == 'Failed': | |
return 'color:white;background-color: lightcoral' | |
return '' | |
def style_size_column(val, target_size=3.0): | |
"""Style for the Size column based on proximity to target""" | |
try: | |
val_float = float(val) | |
distance = val_float - target_size # Signed distance from target | |
abs_distance = abs(distance) | |
# Calculate width percentage based on distance | |
max_distance = 2.5 | |
width_pct = 100 - min(abs_distance / max_distance * 100, 100) | |
# Determine color based on value position relative to target | |
if distance < 0: | |
color = f"rgba(0, 128, 128, {min(1.0, 0.4 + 0.6*(1-abs_distance/max_distance))})" # Teal for below | |
else: | |
color = f"rgba(230, 97, 0, {min(1.0, 0.4 + 0.6*(1-abs_distance/max_distance))})" # Orange for above | |
# Text styling based on proximity to target | |
if abs_distance > 3: | |
text_color = "grey" | |
elif abs_distance > 1: | |
text_color = "black" | |
else: | |
text_color = "white" | |
font_weight = "bold" if abs_distance < 0.5 else "normal" | |
# Create gradient style | |
return ( | |
f"background: linear-gradient(90deg, {color} {width_pct}%, transparent {width_pct}%); " | |
f"color: {text_color}; " | |
f"font-weight: {font_weight}; " | |
) | |
except (ValueError, TypeError): | |
return '' | |
# Simulation function for electrospraying | |
def sim_espray_constrained(x, noise_se=None): | |
# Ensure x is a numpy array with float data type | |
x = np.array(x, dtype=float) | |
# Ensure x is a 2D array | |
if x.ndim == 1: | |
x = x.reshape(1, -1) | |
# Define the equations | |
conc = x[:, 0] | |
flow_rate = x[:, 1] | |
voltage = x[:, 2] | |
solvent = x[:, 3] | |
diameter = (np.sqrt(conc) * np.sqrt(flow_rate)) / np.log2(voltage) * 10 + 0.4 + solvent # Diameter in micrometers | |
if noise_se is not None: | |
diameter = diameter + noise_se * np.random.randn(*diameter.shape) | |
exp_con = (np.log(flow_rate) * (solvent - 0.5) + 1.40 >= 0).astype(float) | |
return np.column_stack((diameter, exp_con)) | |
# Initialize experiment data | |
X_init = np.array([[0.5, 15, 10, 0], | |
[0.5, 0.1, 10, 1], | |
[3, 20, 15, 0], | |
[1, 20, 10, 1], | |
[0.2, 0.02, 10, 1]]) | |
Y_init = sim_espray_constrained(X_init) | |
exp_record_df = pd.DataFrame(X_init, columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent']) | |
exp_record_df['Size (um)'] = Y_init[:, 0] | |
exp_record_df['Solvent'] = ['DMAc' if x == 0 else 'CHCl3' for x in exp_record_df['Solvent']] | |
exp_record_df['Feasible?'] = ['Success' if x == 1 else 'Failed' for x in Y_init[:, 1]] | |
# Replace the static prior_experiments_display with a function | |
def generate_prior_experiments_display(target_size=3.0): | |
"""Generate styled prior experiments display based on target size""" | |
return exp_record_df.style\ | |
.map(style_feasible_column, subset=['Feasible?'])\ | |
.map(lambda x: style_size_column(x, target_size), subset=['Size (um)'])\ | |
.format(precision=3) | |
# Functions for data processing and visualization | |
def import_results(target_size=3): | |
strategies = ['qEI', 'qEI_vi_mixed_con', 'qEICF_vi_mixed_con', 'rnd'] | |
file_name_dict = { | |
'0.5': 'best_distances_0_5.pkl', | |
'3': 'best_distances_3_0.pkl', | |
'22': 'best_distances_22_0.pkl' | |
} | |
# Load results from pickle file based on target size | |
with open(file_name_dict[str(target_size)], 'rb') as f: | |
best_distances = pickle.load(f) | |
# vstack all values in best_distances | |
best_distances_vstack = {k: np.vstack(best_distances[k]) for k in strategies} | |
best_distances_all_trials = -np.vstack([best_distances_vstack[k] for k in strategies]) | |
best_distances_all_trials_df = pd.DataFrame(best_distances_all_trials) | |
best_distances_all_trials_df['strategy'] = np.repeat(['Vanilla BO', 'Constrained BO', 'CCBO', 'Random'], 20) | |
best_distances_all_trials_df['trial'] = list(range(20)) * len(strategies) | |
best_distances_df_long = pd.melt(best_distances_all_trials_df, id_vars=['strategy', 'trial'], var_name='iteration', value_name='regret') | |
return best_distances_df_long | |
def calc_human_performance(df, target_size=3.0): | |
# Make a copy of the dataframe to avoid modifying the original | |
df_copy = df.copy() | |
# convert back solvent to 0 and 1 | |
df_copy['Solvent'] = [0 if x == 'DMAc' else 1 for x in df_copy['Solvent']] | |
ROUNDS = len(df_copy) // 2 | |
# Ensure all values are numeric | |
numeric_cols = ['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent'] | |
for col in numeric_cols: | |
df_copy[col] = pd.to_numeric(df_copy[col]) | |
X_human = df_copy[numeric_cols].values | |
X_human_init = X_init.copy() | |
Y_human_init = Y_init.copy() | |
best_human_distance = [] | |
for iter in range(ROUNDS + 1): | |
Y_distance = -np.abs(Y_human_init[:, 0] - target_size) | |
best_human_distance.append(np.ma.masked_array(Y_distance, mask=~Y_human_init[:, 1].astype(bool)).max()) | |
# Check if we have more data for this iteration | |
if 2 * iter < len(X_human): | |
# Get the slice of new experiments | |
new_x = X_human[2 * iter:min(2 * (iter + 1), len(X_human))] | |
# Add the new experiments to our dataset | |
X_human_init = np.vstack([X_human_init, new_x]) | |
Y_human_init = np.vstack([Y_human_init, sim_espray_constrained(new_x)]) | |
return -np.array(best_human_distance) | |
def plot_results(exp_data_df, target_size=3.0): | |
# Extract human performance | |
best_human_distance = calc_human_performance(exp_data_df, target_size) | |
# Import results | |
best_distances_df_long = import_results(target_size) | |
fig = go.Figure() | |
strategies = best_distances_df_long['strategy'].unique() | |
for strategy in strategies: | |
strategy_data = best_distances_df_long[best_distances_df_long['strategy'] == strategy] | |
# Calculate mean and standard error | |
mean_regret = strategy_data.groupby('iteration')['regret'].mean() | |
std_regret = strategy_data.groupby('iteration')['regret'].std() | |
# Calculate standard error (SE = SD/√n) | |
n_trials = strategy_data.groupby('iteration')['regret'].count() | |
se_regret = std_regret / np.sqrt(n_trials) | |
iterations = mean_regret.index | |
color = px.colors.qualitative.Set2[strategies.tolist().index(strategy)] | |
# Add trace for mean line | |
mean_trace = go.Scatter( | |
x=iterations, | |
y=mean_regret, | |
mode='lines', | |
name=strategy, | |
line=dict(width=2, color=color) | |
) | |
fig.add_trace(mean_trace) | |
# Add trace for shaded area (standard error) | |
fig.add_trace(go.Scatter( | |
x=list(iterations) + list(iterations[::-1]), | |
y=list(mean_regret + se_regret) + list((mean_regret - se_regret)[::-1]), | |
fill='toself', | |
fillcolor=mean_trace.line.color.replace('rgb', 'rgba').replace(')', ',0.2)'), | |
line=dict(color='rgba(255,255,255,0)'), | |
showlegend=False, | |
name=f'{strategy} (standard error)' | |
)) | |
# Add trace for human performance | |
fig.add_trace(go.Scatter( | |
x=list(range(len(best_human_distance))), | |
y=best_human_distance, | |
mode='lines+markers', | |
name='Human', | |
line=dict(width=2, color='brown') | |
)) | |
fig.update_layout( | |
title='Performance Comparison', | |
xaxis_title='Iteration', | |
yaxis_title='Regret (μm)', | |
legend_title='Strategy', | |
template='plotly_white', | |
legend=dict( | |
x=0.01, | |
y=0.01, | |
bgcolor='rgba(255, 255, 255, 0.5)', | |
bordercolor='rgba(0, 0, 0, 0.5)', | |
borderwidth=1 | |
) | |
) | |
return fig | |
# Add function to calculate AUC | |
def calculate_auc(human_performance_values): | |
"""Calculate the Area Under the Curve for a user's performance""" | |
# Simple trapezoidal integration | |
if len(human_performance_values) <= 1: | |
return 0 | |
# AUC calculation using trapezoidal rule | |
auc_value = np.trapezoid(human_performance_values, dx=1) | |
return round(auc_value, 4) | |
# Prediction function - simplified signature by removing unnecessary text params | |
def predict(state, target_size, conc1, flow_rate1, voltage1, solvent1, conc2, flow_rate2, voltage2, solvent2): | |
# Get current results storage from state or initialize if None | |
if state is None: | |
results_storage = pd.DataFrame(columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
else: | |
results_storage = state.copy() | |
solvent_value1 = 0 if solvent1 == 'DMAc' else 1 | |
solvent_value2 = 0 if solvent2 == 'DMAc' else 1 | |
# Process inputs and get predictions | |
inputs1 = np.array([[conc1, flow_rate1, voltage1, solvent_value1]]) | |
inputs2 = np.array([[conc2, flow_rate2, voltage2, solvent_value2]]) | |
results1 = sim_espray_constrained(inputs1) | |
results2 = sim_espray_constrained(inputs2) | |
# Format and store results | |
results_df = pd.DataFrame([ | |
[conc1, flow_rate1, voltage1, solvent_value1, results1[0, 0], results1[0, 1]], | |
[conc2, flow_rate2, voltage2, solvent_value2, results2[0, 0], results2[0, 1]] | |
], columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
results_df['Solvent'] = ['DMAc' if x == 0 else 'CHCl3' for x in results_df['Solvent']] | |
results_df['Feasible?'] = ['Success' if x == 1 else 'Failed' for x in results_df['Feasible?']] | |
results_storage = pd.concat([results_storage, results_df], ignore_index=True) | |
# Apply each styling function to its specific column | |
results_display = results_storage.style\ | |
.map(style_feasible_column, subset=['Feasible?'])\ | |
.map(lambda x: style_size_column(x, target_size), subset=['Size (um)'])\ | |
.format(precision=3) | |
# Check if user has completed 5 rounds (10 experiments) | |
completed = len(results_storage) >= 10 | |
message = "" | |
auc_value = 0 | |
usr_level = "" | |
if completed: | |
# Calculate AUC | |
human_performance = calc_human_performance(results_storage, target_size) | |
auc_value = calculate_auc(human_performance) | |
# Set CCBO value based on target size | |
if target_size == 3.0: | |
ccbo_value = 1.40 | |
elif target_size == 0.5: | |
ccbo_value = 0.92 | |
else: | |
ccbo_value = 8.51 | |
# Calculate performance as a percentage of CCBO value | |
performance_percentage = (auc_value / ccbo_value) * 100 | |
if performance_percentage > 300: | |
usr_level = "randomly playing!" | |
elif performance_percentage > 150: | |
usr_level = "a beginner." | |
elif performance_percentage > 100: | |
usr_level = "an intermediate user." | |
elif performance_percentage > 65: | |
usr_level = "an advanced user." | |
else: | |
usr_level = "... come on, you must have cheated (or you are extremely lucky)!" | |
message = f"🎉 Congratulations! You've completed all 5 rounds. Your performance AUC is ** {auc_value:.2f} ** and CCBO was ** {ccbo_value} **. You seems to be {usr_level} Now you can download your results or click reset to try again!" | |
# Return updated state and UI updates | |
return ( | |
results_storage, | |
gr.DataFrame(value=generate_prior_experiments_display(target_size), label="Prior Experiments"), | |
gr.DataFrame(value=results_display, label="Your Results"), | |
plot_results(results_storage, target_size), | |
gr.update(visible=completed), # Show download button when completed | |
gr.update(value=message, visible=completed), # Show message when completed | |
gr.update(value=auc_value), # Update AUC value | |
gr.update(visible=completed), # Show result file component | |
gr.update(interactive=False if completed else True) # Disable target selection once completed | |
) | |
# Reset results function | |
def reset_results(state, target_size): | |
results_storage = pd.DataFrame(columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
# Generate the plot for empty results | |
empty_plot = plot_results(results_storage, target_size) | |
# Apply each styling function to its specific column | |
styled_results = results_storage.style\ | |
.map(style_feasible_column, subset=['Feasible?'])\ | |
.map(lambda x: style_size_column(x, target_size), subset=['Size (um)'])\ | |
.format(precision=3) | |
return ( | |
results_storage, # results_state | |
gr.DataFrame(value=generate_prior_experiments_display(target_size), label="Prior Experiments"), # prior_experiments | |
gr.DataFrame(value=styled_results, label="Your Results"), # results_df | |
empty_plot, # perf_plot | |
gr.update(visible=False), # download_btn visibility | |
gr.update(value="", visible=False), # completion_message | |
gr.update(value=0), # auc_state | |
gr.update(visible=False), # result_file visibility | |
gr.update(interactive=True) # Enable target selection | |
) | |
# Function to prepare results for download | |
def prepare_results_for_download(results, target_size): | |
"""Prepare results dataframe for download and save to CSV""" | |
if results is None or len(results) == 0: | |
return None | |
# Calculate human performance | |
human_performance = calc_human_performance(results, target_size) | |
auc_value = calculate_auc(human_performance) | |
# Add a summary row with AUC | |
summary_df = pd.DataFrame({ | |
'Concentration (%w/v)': ["Performance AUC:"], | |
'Flow Rate (mL/h)': [auc_value], | |
'Voltage (kV)': [""], | |
'Solvent': [""], | |
'Size (um)': [""], | |
'Feasible?': [""] | |
}) | |
# Combine results with summary | |
combined_df = pd.concat([results, summary_df], ignore_index=True) | |
combined_df = pd.concat([pd.DataFrame([{"Concentration (%w/v)": f"Target size: {target_size} μm"}]), combined_df], ignore_index=True) | |
# Save to temporary file | |
temp_dir = tempfile.gettempdir() | |
output_path = os.path.join(temp_dir, f"electrospray_results_{str(target_size).replace('.', '_')}.csv") | |
combined_df.to_csv(output_path, index=False) | |
return output_path | |
# Application description | |
description = "<h3>Welcome, challenger! 🎉</h3><p> If you think you may perform better than <strong>CCBO</strong>, try this interactive game to optimize electrospray!</p><p> Rules are simple:</p> <ul><li>🔍 Examine! Prior experiments are on the right (or below on your phone), always remeber the target you've selected! </li><li>⚠️ Be aware! Experiment may <u><i><strong>fail</strong></i></u> due to incompatible parameters, they don't count towards your optimization!</li><li>💡 Propose! Set your parameters, you have <strong>2</strong> chances in each round, use them wisely!</li><li>🚀 <strong>Submit</strong> to see the results, reflect and improve your selection!</li><li>🔄 Repeat! Run the process for <strong>5</strong> rounds to see if you can beat CCBO!</li></ul></p><p>Your data will not be stored, so feel free to play again, good luck! 🍀</p><p>Impressed by CCBO? Check our <a href='https://github.com/FrankWanger/CCBO'>implementation</a> and <a href='https://arxiv.org/abs/2411.10471'>paper!</a></p>" | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
# Add state component to store user-specific results | |
results_state = gr.State() | |
auc_state = gr.State(value=0) | |
with gr.Row(): | |
# Input parameters column | |
with gr.Column(): | |
gr.Markdown("## Human vs CCBO Campaign - Optimize Electrospray") | |
gr.Markdown(description) | |
# Add target size selection with new 22.0 option | |
target_size = gr.Radio( | |
[3.0, 0.5, 22.0], | |
label="🎯 Select Target Size (μm)", | |
value=3.0, | |
info="Choose the particle size you want to optimize for" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Experiment 1") | |
conc1 = gr.Slider(minimum=0.05, maximum=5.0, value=1.2, step=0.001, label="Concentration (%w/v)") | |
flow_rate1 = gr.Slider(minimum=0.01, maximum=60.0, value=20.0, step=0.001, label="Flow Rate (mL/h)") | |
voltage1 = gr.Slider(minimum=10.0, maximum=18.0, value=15.0, step=0.001, label="Voltage (kV)") | |
solvent1 = gr.Dropdown(['DMAc', 'CHCl3'], value='DMAc', label='Solvent') | |
with gr.Column(): | |
gr.Markdown("### Experiment 2") | |
conc2 = gr.Slider(minimum=0.05, maximum=5.0, value=2.8, step=0.001, label="Concentration (%w/v)") | |
flow_rate2 = gr.Slider(minimum=0.01, maximum=60.0, value=20.0, step=0.001, label="Flow Rate (mL/h)") | |
voltage2 = gr.Slider(minimum=10.0, maximum=18.0, value=15.0, step=0.001, label="Voltage (kV)") | |
solvent2 = gr.Dropdown(['DMAc', 'CHCl3'], value='CHCl3', label='Solvent') | |
# Group all buttons in a single row | |
with gr.Row(): | |
#make submit btn highlight color | |
submit_btn = gr.Button("🚀 Submit", variant="primary") | |
reset_btn = gr.Button("Reset") | |
download_btn = gr.Button("📥 Download Results", visible=False) | |
# Add notification component (initially hidden) | |
completion_message = gr.Markdown(visible=False) | |
# File output component | |
result_file = gr.File(label="Download Results CSV", visible=False) | |
# Results display column | |
with gr.Column(): | |
prior_experiments = gr.DataFrame(value=generate_prior_experiments_display(target_size), label="Prior Experiments") | |
results_df = gr.DataFrame(label="Your Results") | |
perf_plot = gr.Plot(label="Performance Comparison") | |
# Connect the submit button to the predict function | |
submit_btn.click( | |
fn=predict, | |
inputs=[ | |
results_state, | |
target_size, | |
conc1, flow_rate1, voltage1, solvent1, | |
conc2, flow_rate2, voltage2, solvent2 | |
], | |
outputs=[ | |
results_state, prior_experiments, results_df, perf_plot, | |
download_btn, completion_message, auc_state, result_file, | |
target_size | |
] | |
) | |
# Connect the reset button to the reset_results function | |
reset_btn.click( | |
fn=reset_results, | |
inputs=[results_state, target_size], | |
outputs=[ | |
results_state, prior_experiments, results_df, perf_plot, | |
download_btn, completion_message, auc_state, result_file, | |
target_size | |
] | |
) | |
# Connect download button to file download | |
download_btn.click( | |
fn=prepare_results_for_download, | |
inputs=[results_state, target_size], | |
outputs=[result_file] | |
) | |
# When target size changes, reset the application | |
target_size.change( | |
fn=reset_results, | |
inputs=[results_state, target_size], | |
outputs=[ | |
results_state, prior_experiments, results_df, perf_plot, | |
download_btn, completion_message, auc_state, result_file, | |
target_size | |
] | |
) | |
demo.launch() |