Hugo HE
add custom BIS target and noise level
5d762cb
from fmpy import *
from fmpy import read_model_description, extract
from fmpy.fmi2 import FMU2Slave
import numpy as np
import shutil
import pandas as pd
import random
import plotly.graph_objects as go
import json
from fmpy import *
from fmpy import read_model_description, extract
from fmpy.fmi2 import FMU2Slave
import numpy as np
import shutil
import pandas as pd
import random
import plotly.graph_objects as go
import json
df_profile = pd.read_csv("profile_processed.csv")
def getProfileFromID(id):
return df_profile[df_profile.ID==id].iloc[0, 1:].to_list()
def simulation(id, kp, ki, kd, bis_target=40, min_noise=50, max_noise=150):
profile = getProfileFromID(id)
age = profile[0]
weight = profile[1]
height = profile[2]
gender = profile[3]
vrs = {}
fmu = 'Pharmacokinetics_4_comportmental_model_PI_ref_FMU_base4_OAAS_lnx.fmu'
model_description = read_model_description(fmu)
for variable in model_description.modelVariables:
vrs[variable.name] = variable.valueReference
start_time = 0.0
stop_time = 7000
step_size = 1
unzipdir = extract(fmu)
fmu = FMU2Slave(guid=model_description.guid,
unzipDirectory=unzipdir,
modelIdentifier=model_description.coSimulation.modelIdentifier,
instanceName='instance1')
# initialize
fmu.instantiate()
fmu.setupExperiment(startTime=start_time)
fmu.enterInitializationMode()
fmu.exitInitializationMode()
fmu.setReal([vrs["amesim_interface.Age_year"]], [age])
fmu.setReal([vrs["amesim_interface.BIS0"]], [95.6])
fmu.setReal([vrs["amesim_interface.BISmin"]], [8.9])
fmu.setReal([vrs["amesim_interface.Drug_concentration_mgmL"]], [20])
fmu.setReal([vrs["amesim_interface.EC50"]], [2.23])
fmu.setReal([vrs["amesim_interface.Gamma"]], [1.58])
fmu.setReal([vrs["amesim_interface.Gender_0male_1female"]], [gender])
fmu.setReal([vrs["amesim_interface.Height_cm"]], [height])
fmu.setReal([vrs["amesim_interface.Infusion_rate_mLh"]], [200])
fmu.setReal([vrs["amesim_interface.Weight_kg"]], [weight])
vr_input = vrs["amesim_interface.Infusion_rate_mLh"]
vr_output = vrs["amesim_interface.BIS_Index"]
rows = [] # list to record the results
time = start_time
infusion_rate = 200
i = 0
target = bis_target
last_error = 0
# simulation loop
impulsive_noise = random.randint(min_noise, max_noise)
print("noise level:", impulsive_noise)
while time < stop_time:
if time >= 2.4e3 and time < 4.5e3:
target = 60
p = 0
i = 0
if time >= 4.5e3:
target = bis_target
p = 0
i = 0
bis = fmu.getReal([int(vr_output)])[0] if time > step_size else 95.6
p = bis - target
i = i + p
d = p - last_error
last_error = p
infusion_rate = np.clip(kp*p + ki*i + kd*d, 0, 200)
# add impulsive noise to infusion rate
n = time // 100
start = 100 * n
end = start + 50
if (time > start and time < end and n % 15 == 0):
infusion_rate += impulsive_noise
fmu.setReal([vr_input], [int(infusion_rate)])
# perform one step
fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size)
# advance the time
time += step_size
# get the values for 'inputs' and 'outputs[4]'
inputs, outputs = fmu.getReal([int(vr_input), int(vr_output)])
# append the results
rows.append((time, bis, inputs))
fmu.terminate()
fmu.freeInstance()
shutil.rmtree(unzipdir, ignore_errors=True)
result = np.array(rows, dtype=np.dtype([('time', np.float64), ('BIS', np.float64), ('Infusion', np.float64)]))
return result, impulsive_noise
def plot_result(result, show_original):
df = pd.DataFrame(result)
trace1 = go.Scatter(x=df.index, y=df['BIS'], mode='lines', name='BIS')
fig1 = go.Figure(data=trace1)
fig1.update_layout(height=400, width=1200, title_text="BIS evolution")
# Add a line trace for column_2 in the second subplot
trace2 = go.Scatter(x=df.index, y=df['Infusion'], mode='lines', name='Infusion rate')
fig2 = go.Figure(data=trace2)
fig2.update_layout(height=400, width=1200, title_text="Infusion rate evolution")
if show_original:
result_baseline = np.load("result_impulsive.npy")
df_original = pd.DataFrame(result_baseline)
fig1.add_trace(go.Scatter(x=df_original.index, y=df_original['BIS'], mode='lines', name='BIS original', line=dict(color="red"), opacity=0.5))
fig2.add_trace(go.Scatter(x=df_original.index, y=df_original['Infusion'], mode='lines', name='Infusion rate original', line=dict(color="red"), opacity=0.5))
else:
np.save("result_impulsive.npy", result)
return fig1, fig2
def gradio_display_profile(id):
profile = getProfileFromID(id)
gender = "Male" if profile[3] == 0 else "Female"
data = {}
data["age"] = [profile[0]]
data["weight"] = [profile[1]]
data["height"] = [profile[2]]
data["gender"] = [gender]
df = pd.DataFrame(data)
return df
def gradio_simulation(id, kp, ki, kd, show_original, bis_target, min_noise, max_noise):
result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise)
fig1, fig2 = plot_result(result, show_original)
return fig1, fig2, noise_level
def gradio_save(id, kp, ki, kd, bis_target, min_noise, max_noise):
result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise)
patient_profile = getProfileFromID(id)
# Assuming patient_profile is a list of 4 integers, bis_trace is a list of 7000 floats, and kp, ki, kd are floats
data = {
'inputs': {
'patient_profile': {
'age': patient_profile[0],
'weight': patient_profile[1],
'height': patient_profile[2],
'gender': patient_profile[3]
},
'bis_trace': result['BIS'].tolist(),
'noise_level': noise_level
},
'outputs': {
'kp': kp,
'ki': ki,
'kd': kd
}
}
with open(f'saved_data/patient-{id}.json', 'w') as f:
json.dump(data, f)
return "Saved"
import gradio as gr
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("# BIS Target")
bis_target = gr.Slider(minimum=0, maximum=100, step=1, value=30, label="BIS target")
gr.Markdown("# Impulsive noise range")
min_noise = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min")
max_noise = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max")
gr.Markdown("# Patient profile")
id = gr.Number(value=1, precision=0, label="Patient ID")
profile_output = gr.Dataframe(value=gradio_display_profile(1), label="Patient profile")
id.change(gradio_display_profile, inputs=[id], outputs=[profile_output])
# with gr.Blocks():
# with gr.Accordion("noise range"):
# min_pul = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min")
# max_pul = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max")
gr.Markdown("# PID parameters")
with gr.Blocks():
kp = gr.Slider(minimum=0, maximum=20, value=4, label="kp")
ki = gr.Slider(minimum=0, maximum=1, value=0.01, label="ki")
kd = gr.Slider(minimum=0, maximum=200, value=0, label="kd")
button = gr.Button("Simulate")
show_original = gr.Checkbox(label="Show original")
gr.Markdown("# Save the best parameters")
save_result = gr.Button("Save")
save_output = gr.Textbox(label="Save status")
with gr.Column(scale=5):
plot1 = gr.Plot(label="BIS evolution")
plot2 = gr.Plot(label="Infusion rate evolution")
button.click(gradio_simulation, inputs=[id, kp, ki, kd, show_original, bis_target, min_noise, max_noise], outputs=[plot1, plot2])
save_result.click(gradio_save, inputs=[id, kp, ki, kd, bis_target, min_noise, max_noise], outputs=[save_output])
demo.launch()