Spaces:
Runtime error
Runtime error
from fmpy import * | |
from fmpy import read_model_description, extract | |
from fmpy.fmi2 import FMU2Slave | |
import numpy as np | |
import shutil | |
import pandas as pd | |
import random | |
import plotly.graph_objects as go | |
import json | |
from fmpy import * | |
from fmpy import read_model_description, extract | |
from fmpy.fmi2 import FMU2Slave | |
import numpy as np | |
import shutil | |
import pandas as pd | |
import random | |
import plotly.graph_objects as go | |
import json | |
df_profile = pd.read_csv("profile_processed.csv") | |
def getProfileFromID(id): | |
return df_profile[df_profile.ID==id].iloc[0, 1:].to_list() | |
def simulation(id, kp, ki, kd, bis_target=40, min_noise=50, max_noise=150): | |
profile = getProfileFromID(id) | |
age = profile[0] | |
weight = profile[1] | |
height = profile[2] | |
gender = profile[3] | |
vrs = {} | |
fmu = 'Pharmacokinetics_4_comportmental_model_PI_ref_FMU_base4_OAAS_lnx.fmu' | |
model_description = read_model_description(fmu) | |
for variable in model_description.modelVariables: | |
vrs[variable.name] = variable.valueReference | |
start_time = 0.0 | |
stop_time = 7000 | |
step_size = 1 | |
unzipdir = extract(fmu) | |
fmu = FMU2Slave(guid=model_description.guid, | |
unzipDirectory=unzipdir, | |
modelIdentifier=model_description.coSimulation.modelIdentifier, | |
instanceName='instance1') | |
# initialize | |
fmu.instantiate() | |
fmu.setupExperiment(startTime=start_time) | |
fmu.enterInitializationMode() | |
fmu.exitInitializationMode() | |
fmu.setReal([vrs["amesim_interface.Age_year"]], [age]) | |
fmu.setReal([vrs["amesim_interface.BIS0"]], [95.6]) | |
fmu.setReal([vrs["amesim_interface.BISmin"]], [8.9]) | |
fmu.setReal([vrs["amesim_interface.Drug_concentration_mgmL"]], [20]) | |
fmu.setReal([vrs["amesim_interface.EC50"]], [2.23]) | |
fmu.setReal([vrs["amesim_interface.Gamma"]], [1.58]) | |
fmu.setReal([vrs["amesim_interface.Gender_0male_1female"]], [gender]) | |
fmu.setReal([vrs["amesim_interface.Height_cm"]], [height]) | |
fmu.setReal([vrs["amesim_interface.Infusion_rate_mLh"]], [200]) | |
fmu.setReal([vrs["amesim_interface.Weight_kg"]], [weight]) | |
vr_input = vrs["amesim_interface.Infusion_rate_mLh"] | |
vr_output = vrs["amesim_interface.BIS_Index"] | |
rows = [] # list to record the results | |
time = start_time | |
infusion_rate = 200 | |
i = 0 | |
target = bis_target | |
last_error = 0 | |
# simulation loop | |
impulsive_noise = random.randint(min_noise, max_noise) | |
print("noise level:", impulsive_noise) | |
while time < stop_time: | |
if time >= 2.4e3 and time < 4.5e3: | |
target = 60 | |
p = 0 | |
i = 0 | |
if time >= 4.5e3: | |
target = bis_target | |
p = 0 | |
i = 0 | |
bis = fmu.getReal([int(vr_output)])[0] if time > step_size else 95.6 | |
p = bis - target | |
i = i + p | |
d = p - last_error | |
last_error = p | |
infusion_rate = np.clip(kp*p + ki*i + kd*d, 0, 200) | |
# add impulsive noise to infusion rate | |
n = time // 100 | |
start = 100 * n | |
end = start + 50 | |
if (time > start and time < end and n % 15 == 0): | |
infusion_rate += impulsive_noise | |
fmu.setReal([vr_input], [int(infusion_rate)]) | |
# perform one step | |
fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size) | |
# advance the time | |
time += step_size | |
# get the values for 'inputs' and 'outputs[4]' | |
inputs, outputs = fmu.getReal([int(vr_input), int(vr_output)]) | |
# append the results | |
rows.append((time, bis, inputs)) | |
fmu.terminate() | |
fmu.freeInstance() | |
shutil.rmtree(unzipdir, ignore_errors=True) | |
result = np.array(rows, dtype=np.dtype([('time', np.float64), ('BIS', np.float64), ('Infusion', np.float64)])) | |
return result, impulsive_noise | |
def plot_result(result, show_original): | |
df = pd.DataFrame(result) | |
trace1 = go.Scatter(x=df.index, y=df['BIS'], mode='lines', name='BIS') | |
fig1 = go.Figure(data=trace1) | |
fig1.update_layout(height=400, width=1200, title_text="BIS evolution") | |
# Add a line trace for column_2 in the second subplot | |
trace2 = go.Scatter(x=df.index, y=df['Infusion'], mode='lines', name='Infusion rate') | |
fig2 = go.Figure(data=trace2) | |
fig2.update_layout(height=400, width=1200, title_text="Infusion rate evolution") | |
if show_original: | |
result_baseline = np.load("result_impulsive.npy") | |
df_original = pd.DataFrame(result_baseline) | |
fig1.add_trace(go.Scatter(x=df_original.index, y=df_original['BIS'], mode='lines', name='BIS original', line=dict(color="red"), opacity=0.5)) | |
fig2.add_trace(go.Scatter(x=df_original.index, y=df_original['Infusion'], mode='lines', name='Infusion rate original', line=dict(color="red"), opacity=0.5)) | |
else: | |
np.save("result_impulsive.npy", result) | |
return fig1, fig2 | |
def gradio_display_profile(id): | |
profile = getProfileFromID(id) | |
gender = "Male" if profile[3] == 0 else "Female" | |
data = {} | |
data["age"] = [profile[0]] | |
data["weight"] = [profile[1]] | |
data["height"] = [profile[2]] | |
data["gender"] = [gender] | |
df = pd.DataFrame(data) | |
return df | |
def gradio_simulation(id, kp, ki, kd, show_original, bis_target, min_noise, max_noise): | |
result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) | |
fig1, fig2 = plot_result(result, show_original) | |
return fig1, fig2, noise_level | |
def gradio_save(id, kp, ki, kd, bis_target, min_noise, max_noise): | |
result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) | |
patient_profile = getProfileFromID(id) | |
# Assuming patient_profile is a list of 4 integers, bis_trace is a list of 7000 floats, and kp, ki, kd are floats | |
data = { | |
'inputs': { | |
'patient_profile': { | |
'age': patient_profile[0], | |
'weight': patient_profile[1], | |
'height': patient_profile[2], | |
'gender': patient_profile[3] | |
}, | |
'bis_trace': result['BIS'].tolist(), | |
'noise_level': noise_level | |
}, | |
'outputs': { | |
'kp': kp, | |
'ki': ki, | |
'kd': kd | |
} | |
} | |
with open(f'saved_data/patient-{id}.json', 'w') as f: | |
json.dump(data, f) | |
return "Saved" | |
import gradio as gr | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("# BIS Target") | |
bis_target = gr.Slider(minimum=0, maximum=100, step=1, value=30, label="BIS target") | |
gr.Markdown("# Impulsive noise range") | |
min_noise = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") | |
max_noise = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") | |
gr.Markdown("# Patient profile") | |
id = gr.Number(value=1, precision=0, label="Patient ID") | |
profile_output = gr.Dataframe(value=gradio_display_profile(1), label="Patient profile") | |
id.change(gradio_display_profile, inputs=[id], outputs=[profile_output]) | |
# with gr.Blocks(): | |
# with gr.Accordion("noise range"): | |
# min_pul = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") | |
# max_pul = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") | |
gr.Markdown("# PID parameters") | |
with gr.Blocks(): | |
kp = gr.Slider(minimum=0, maximum=20, value=4, label="kp") | |
ki = gr.Slider(minimum=0, maximum=1, value=0.01, label="ki") | |
kd = gr.Slider(minimum=0, maximum=200, value=0, label="kd") | |
button = gr.Button("Simulate") | |
show_original = gr.Checkbox(label="Show original") | |
gr.Markdown("# Save the best parameters") | |
save_result = gr.Button("Save") | |
save_output = gr.Textbox(label="Save status") | |
with gr.Column(scale=5): | |
plot1 = gr.Plot(label="BIS evolution") | |
plot2 = gr.Plot(label="Infusion rate evolution") | |
button.click(gradio_simulation, inputs=[id, kp, ki, kd, show_original, bis_target, min_noise, max_noise], outputs=[plot1, plot2]) | |
save_result.click(gradio_save, inputs=[id, kp, ki, kd, bis_target, min_noise, max_noise], outputs=[save_output]) | |
demo.launch() |