import gradio as gr import paho.mqtt.client as mqtt import os import json import time import matplotlib.pyplot as plt import numpy as np from datetime import datetime import pytz import pandas as pd import plotly.graph_objs as go HOST = os.environ.get("host") PORT = int(os.environ.get("port")) USERNAME = os.environ.get("username") PASSWORD = os.environ.get("password") PIOREACTOR = os.environ.get("pioreactor") client = None client_custom = None experiment = None running = [] temp_mqtt_auto = None rpm_mqtt = None led_mqtt = None experiments = [] temp_graph = None od_graph = None norm_od_graph = None growth_rate_graph = None def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) def create_client(host, port, username, password): print(host, port, username, password) client = mqtt.Client() client.username_pw_set(username, password) client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) client.on_connect = on_connect client.connect(host, port) client.loop_start() return client def on_message_worker(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global experiment global running global temp_mqtt_auto global rpm_mqtt global led_mqtt global experiments experiment = data.get("experiment", None) running = data.get("running", []) temp_mqtt_auto = data.get("temperature_automation", None) rpm_mqtt = data.get("stirring", None) led_mqtt = data.get("leds", None) experiments = data.get("experiments", []) def store_client(): global client global HOST global PORT global USERNAME global PASSWORD client = create_client(HOST, PORT, USERNAME, PASSWORD) def custom_client(host, port, username, password): global client_custom client_custom = create_client(host, port, username, password) def stirring(rpm, host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def toggle_temperature_input(selected_option): # Show the temperature slider only if "Heat To Temp" is selected return gr.update(visible=selected_option == "Heat To Temp") def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option): global client_custom if client_custom is None: custom_client(host, port, username, password) payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": pioreactor, "temp": temperature, "automation": payload_options } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "$state": "disconnected" } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "target_temperature": temperature } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": pioreactor, "automation": payload_options, "temp": temperature } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def od(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def grf(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def led1fn(led1, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "A", "brightness": led1 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led2fn(led2, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "B", "brightness": led2 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led3fn(led3, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "C", "brightness": led3 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led4fn(led4, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "D", "brightness": led4 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def add_media(media, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_add_media", "experiment": experiment, "reactor": pioreactor, "volume": media } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def remove_waste(waste, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_remove_media", "experiment": experiment, "reactor": pioreactor, "volume": waste } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def cycle_media(cycle, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "circulate_media", "experiment": experiment, "reactor": pioreactor, "duration": cycle } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def void_client(): global client_custom if client_custom is not None: client_custom.disconnect() client_custom = None print("Client disconnected") def stirring_default(rpm, experiment, state): global client if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def temp_automation_default(temperature, experiment, state, option): payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" global client if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": PIOREACTOR, "temp": temperature, "automation": payload_options } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "$state": "disconnected" } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "target_temperature": temperature } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": PIOREACTOR, "automation": payload_options, "temp": temperature } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def od_default(experiment, state): global client if state == "start": payload = { "command": "start_od_reading", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_od_reading", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def grf_default(experiment, state): global client if state == "start": payload = { "command": "start_growth_rate", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_growth_rate", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def led1fn_default(led1, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "A", "brightness": led1 } client.publish(f"pioreactor/control", json.dumps(payload)) def led2fn_default(led2, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "B", "brightness": led2 } client.publish(f"pioreactor/control", json.dumps(payload)) def led3fn_default(led3, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "C", "brightness": led3 } client.publish(f"pioreactor/control", json.dumps(payload)) def led4fn_default(led4, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "D", "brightness": led4 } client.publish(f"pioreactor/control", json.dumps(payload)) def add_media_default(media, experiment): global client payload = { "command": "pump_add_media", "experiment": experiment, "reactor": PIOREACTOR, "volume": media } client.publish(f"pioreactor/control", json.dumps(payload)) def remove_waste_default(waste, experiment): global client payload = { "command": "pump_remove_media", "experiment": experiment, "reactor": PIOREACTOR, "volume": waste } client.publish(f"pioreactor/control", json.dumps(payload)) def cycle_media_default(cycle, experiment): global client payload = { "command": "circulate_media", "experiment": experiment, "reactor": PIOREACTOR, "duration": cycle } client.publish(f"pioreactor/control", json.dumps(payload)) def get_status(host, port, username, password, pioreactor, exp): global client_custom if client_custom is None: custom_client(host, port, username, password) client_custom.subscribe(f"pioreactor/{pioreactor}/worker") client_custom.on_message = on_message_worker global experiment experiment = None payload = { "command": "get_worker", "experiment": exp, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) timeout = 10 start = time.time() while experiment is None and (time.time() - start) < timeout: time.sleep(0.1) client_custom.unsubscribe(f"pioreactor/{pioreactor}/worker") return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments def get_status_default(exp): global client client.subscribe(f"pioreactor/{PIOREACTOR}/worker") client.on_message = on_message_worker global experiment experiment = None payload = { "command": "get_worker", "experiment": exp, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) timeout = 10 start = time.time() while experiment is None and (time.time() - start) < timeout: time.sleep(0.1) client.unsubscribe(f"pioreactor/{PIOREACTOR}/worker") return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments def new_experiment_default(new_exp, exp): global client payload = { "command": "new_experiment", "experiment": new_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) def remove_experiment_default(rem_exp, exp): global client payload = { "command": "delete_experiment", "experiment": rem_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) def change_experiment_default(ch_exp, exp): global client payload = { "command": "change_experiment", "experiment": exp, "experiment_new": ch_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) def new_experiment(new_exp, host, port, username, password, pioreactor, exp): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "new_experiment", "experiment": new_exp, "reactor": pioreactor, } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def remove_experiment(rem_exp, host, port, username, password, pioreactor, exp): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "delete_experiment", "experiment": rem_exp, "reactor": pioreactor, } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def change_experiment(ch_exp, host, port, username, password, pioreactor, exp): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "change_experiment", "experiment": exp, "experiment_new": ch_exp, "reactor": pioreactor, } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def on_reading(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global temp_graph global od_graph global norm_od_graph global growth_rate_graph temp_graph = data.get("temperature", None) od_graph = data.get("od", None) norm_od_graph = data.get("normalized_od", None) growth_rate_graph = data.get("growth_rate", None) def get_data_default(time_scale, exp): global client payload = { "command": "get_readings", "experiment": exp, "filter_mod": 1, "lookback": 1000000, "filter_mod2": 1, "lookback2": 1000000, "filter_mod3": 1, "lookback3": 1000000, "filter_mod4": 1, "lookback4": 1000000, "amount": time_scale, "amount2": time_scale, "amount3": time_scale, "amount4": time_scale, "reactor": PIOREACTOR } client.on_message = on_reading client.subscribe(f"pioreactor/{PIOREACTOR}/readings") timeout = 10 start = time.time() global temp_graph global od_graph global norm_od_graph global growth_rate_graph temp_graph = None od_graph = None norm_od_graph = None growth_rate_graph = None client.publish(f"pioreactor/control", json.dumps(payload)) while temp_graph is None and (time.time() - start) < timeout: time.sleep(0.1) client.unsubscribe(f"pioreactor/{PIOREACTOR}/readings") if temp_graph is not None: for temp in temp_graph: utc_time = datetime.strptime(temp["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) temp["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df = pd.DataFrame(temp_graph) df = df.set_index("x") # plt.figure() # plt.plot(df.index, df["y"]) # plt.xlabel("Time") # plt.ylabel("Temperature") # plt.title("Temperature vs Time") # plot1 = plt.gcf() # plt.close() plot1 = go.Figure() plot1.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Temperature")) plot1.update_layout(title="Temperature vs Time", xaxis_title="Time", yaxis_title="Temperature") else: plot1 = None if od_graph is not None: for od in od_graph: utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df = pd.DataFrame(od_graph) df = df.set_index("x") # plt.figure() # plt.plot(df.index, df["y"]) # plt.xlabel("Time") # plt.ylabel("OD") # plt.title("OD vs Time") # plot2 = plt.gcf() # plt.close() plot2 = go.Figure() plot2.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="OD")) plot2.update_layout(title="OD vs Time", xaxis_title="Time", yaxis_title="OD") else: plot2 = None if norm_od_graph is not None: for od in norm_od_graph: utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df = pd.DataFrame(norm_od_graph) df = df.set_index("x") # plt.figure() # plt.plot(df.index, df["y"]) # plt.xlabel("Time") # plt.ylabel("Normalized OD") # plt.title("Normalized OD vs Time") # plot3 = plt.gcf() # plt.close() plot3 = go.Figure() plot3.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Normalized OD")) plot3.update_layout(title="Normalized OD vs Time", xaxis_title="Time", yaxis_title="Normalized OD") else: plot3 = None if growth_rate_graph is not None: for gr in growth_rate_graph: utc_time = datetime.strptime(gr["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) gr["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df = pd.DataFrame(growth_rate_graph) df = df.set_index("x") # plt.figure() # plt.plot(df.index, df["y"]) # plt.xlabel("Time") # plt.ylabel("Growth Rate") # plt.title("Growth Rate vs Time") # plot4 = plt.gcf() # plt.close() plot4 = go.Figure() plot4.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Growth Rate")) plot4.update_layout(title="Growth Rate vs Time", xaxis_title="Time", yaxis_title="Growth Rate") else: plot4 = None # Return both plots return plot1, plot2, plot3, plot4 # Define the interface components with gr.Blocks() as demo: gr.Markdown("# Pioreactor Control Panel") with gr.Tab("Default"): with gr.Blocks(): gr.HTML(""" Open In Colab """) gr.Markdown("This is a control panel for the [Pioreactor](https://pioreactor.com/en-ca). You can use this to control the Pioreactor. You can also view the status of the Pioreactor and view graphs of the data. To use a custom Pioreactor, click on the Custom tab.") gr.Image("pioreactor.webp") gr.Markdown("The experiment name is required for all commands. You can get the experiment name from the status tab. When changing experiments, remember to update the experiment name.") experiment_input = gr.Textbox(label="Experiment", placeholder="Get Experiment Name From Status") experiment_input.input(fn=void_client) client = create_client(HOST, PORT, USERNAME, PASSWORD) with gr.Blocks(): gr.Markdown("# Status") get_status_a = gr.Button("Get Status") with gr.Row(): experiment_output = gr.Textbox(label="Experiment", interactive=False) running_output = gr.Textbox(label="Running", interactive=False) with gr.Row(): temp_output = gr.Textbox(label="Temperature Automation", interactive=False) rpm_output = gr.Textbox(label="RPM", interactive=False) led_output = gr.Textbox(label="LEDs", interactive=False) experiments_output = gr.Textbox(label="Experiments", interactive=False) get_status_a.click( fn=get_status_default, inputs=[experiment_input], outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output] ) with gr.Blocks(): gr.Markdown("# Experiments") with gr.Row(): new_experiment_a = gr.Textbox(label="New Experiment") new_experiment_button = gr.Button("Add Experiment") with gr.Row(): remove_experiment_a = gr.Textbox(label="Remove Experiment") remove_experiment_button = gr.Button("Remove Experiment") with gr.Row(): change_experiment_a = gr.Textbox(label="Change Experiment") change_experiment_button = gr.Button("Change Experiment") new_experiment_button.click( fn=new_experiment_default, inputs=[new_experiment_a, experiment_input] ) remove_experiment_button.click( fn=remove_experiment_default, inputs=[remove_experiment_a, experiment_input] ) change_experiment_button.click( fn=change_experiment_default, inputs=[change_experiment_a, experiment_input] ) with gr.Blocks(): gr.Markdown("# Stirring") rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") with gr.Row(): stir_state = gr.Radio(choices=["start", "stop", "update"], label="State", value="start") st = gr.Button("Send Command") st.click( fn=stirring_default, inputs=[rpm_input, experiment_input, stir_state] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State", value="start") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation_default, inputs=[temperature_slider, experiment_input, temp_state, temp_option] ) with gr.Blocks(): with gr.Row(): with gr.Column(): gr.Markdown("# OD Reading") od_state = gr.Radio(choices=["start", "stop"], label="State", value="start") od_button = gr.Button("Send Command") od_button.click( fn=od_default, inputs=[experiment_input, od_state] ) with gr.Column(): gr.Markdown("# Growth Rate") gr_state = gr.Radio(choices=["start", "stop"], label="State", value="start") gr_button = gr.Button("Send Command") gr_button.click( fn=grf_default, inputs=[experiment_input, gr_state] ) with gr.Blocks(): gr.Markdown("# LEDS") with gr.Row(): with gr.Column(): gr.Markdown("### Channel A") led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led1_button = gr.Button("Send Command") led1_button.click( fn=led1fn_default, inputs=[led1, experiment_input] ) with gr.Column(): gr.Markdown("### Channel B") led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led2_button = gr.Button("Send Command") led2_button.click( fn=led2fn_default, inputs=[led2, experiment_input] ) with gr.Column(): gr.Markdown("### Channel C") led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led3_button = gr.Button("Send Command") led3_button.click( fn=led3fn_default, inputs=[led3, experiment_input] ) with gr.Column(): gr.Markdown("### Channel D") led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led4_button = gr.Button("Send Command") led4_button.click( fn=led4fn_default, inputs=[led4, experiment_input] ) with gr.Blocks(): gr.Markdown("# Dosing") with gr.Row(): with gr.Column(): gr.Markdown("### Add Media") add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) add_media_button = gr.Button("Send Command") add_media_button.click( fn=add_media_default, inputs=[add_media_ml, experiment_input] ) with gr.Column(): gr.Markdown("### Remove Waste") remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) remove_waste_button = gr.Button("Send Command") remove_waste_button.click( fn=remove_waste_default, inputs=[remove_waste_ml, experiment_input] ) with gr.Column(): gr.Markdown("### Cycle Media") cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) cycle_media_button = gr.Button("Send Command") cycle_media_button.click( fn=cycle_media_default, inputs=[cycle_media_sec, experiment_input] ) with gr.Tab("Default Graphs"): with gr.Row(): time_scale = gr.Radio(label="Time Scale", choices=["1 hour", "24 hour", "All Data"], value="All Data") get_graphs = gr.Button("Get Graphs") plot1 = gr.Plot() plot2 = gr.Plot() plot3 = gr.Plot() plot4 = gr.Plot() get_graphs.click( fn=get_data_default, inputs=[time_scale, experiment_input], outputs=[plot1, plot2, plot3, plot4] ) with gr.Tab("Custom"): gr.Markdown("Remember to change the experiment name when changing experiments.") # Input components with gr.Row(): host_input = gr.Textbox(label="Host") port_input = gr.Number(label="Port") host_input.input(fn=void_client) port_input.input(fn=void_client) with gr.Row(): username_input = gr.Textbox(label="Username") password_input = gr.Textbox(label="Password") pioreactor_input = gr.Textbox(label="Pioreactor") experiment_input = gr.Textbox(label="Experiment") username_input.input(fn=void_client) password_input.input(fn=void_client) pioreactor_input.input(fn=void_client) experiment_input.input(fn=void_client) with gr.Blocks(): gr.Markdown("# Status") get_status_b = gr.Button("Get Status") with gr.Row(): experiment_output = gr.Textbox(label="Experiment", interactive=False) running_output = gr.Textbox(label="Running", interactive=False) with gr.Row(): temp_output = gr.Textbox(label="Temperature Automation", interactive=False) rpm_output = gr.Textbox(label="RPM", interactive=False) led_output = gr.Textbox(label="LEDs", interactive=False) experiments_output = gr.Textbox(label="Experiments", interactive=False) get_status_b.click( fn=get_status, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input], outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output] ) with gr.Blocks(): gr.Markdown("# Experiments") with gr.Row(): new_experiment_b = gr.Textbox(label="New Experiment") new_experiment_button_b = gr.Button("Add Experiment") with gr.Row(): remove_experiment_b = gr.Textbox(label="Remove Experiment") remove_experiment_button_b = gr.Button("Remove Experiment") with gr.Row(): change_experiment_b = gr.Textbox(label="Change Experiment") change_experiment_button_b = gr.Button("Change Experiment") new_experiment_button_b.click( fn=new_experiment, inputs=[new_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) remove_experiment_button_b.click( fn=remove_experiment, inputs=[remove_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) change_experiment_button_b.click( fn=change_experiment, inputs=[change_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Blocks(): gr.Markdown("# Stirring") rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") with gr.Row(): stir_state = gr.Radio(choices=["start", "stop", "update"], label="State", value="start") st = gr.Button("Send Command") st.click( fn=stirring, inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State", value="start") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation, inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option] ) with gr.Blocks(): with gr.Row(): with gr.Column(): gr.Markdown("# OD Reading") od_state = gr.Radio(choices=["start", "stop"], label="State", value="start") od_button = gr.Button("Send Command") od_button.click( fn=od, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state] ) with gr.Column(): gr.Markdown("# Growth Rate") gr_state = gr.Radio(choices=["start", "stop"], label="State", value="start") gr_button = gr.Button("Send Command") gr_button.click( fn=grf, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state] ) with gr.Blocks(): gr.Markdown("# LEDS") with gr.Row(): # Row for all channels with gr.Column(): gr.Markdown("### Channel A") led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led1_button = gr.Button("Send Command") led1_button.click( fn=led1fn, inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel B") led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led2_button = gr.Button("Send Command") led2_button.click( fn=led2fn, inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel C") led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led3_button = gr.Button("Send Command") led3_button.click( fn=led3fn, inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel D") led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led4_button = gr.Button("Send Command") led4_button.click( fn=led4fn, inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Blocks(): gr.Markdown("# Dosing") with gr.Row(): with gr.Column(): gr.Markdown("### Add Media") add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) add_media_button = gr.Button("Send Command") add_media_button.click( fn=add_media, inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Remove Waste") remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) remove_waste_button = gr.Button("Send Command") remove_waste_button.click( fn=remove_waste, inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Cycle Media") cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) cycle_media_button = gr.Button("Send Command") cycle_media_button.click( fn=cycle_media, inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) # Launch the interface demo.launch()