import streamlit as st import paho.mqtt.client as mqtt import json import time import os import asyncio import pandas as pd from datetime import datetime import pytz # The web app that will connect to the PioReactor MQTT broker and control the reactor # Author: Enrui (Edison) Lin experiment = None running = [] temp_graph = None od_graph = None norm_od_graph = None growth_rate_graph = None all_graph = False temp_mqtt_auto = None rpm_mqtt = None led_mqtt = None experiments = [] # if st.session_state.get("channels", None) is not None: # st.session_state["channel_a"] = int(st.session_state["channels"].get("A", 0)) # st.session_state["channel_b"] = int(st.session_state["channels"].get("B", 0)) # st.session_state["channel_c"] = int(st.session_state["channels"].get("C", 0)) # st.session_state["channel_d"] = int(st.session_state["channels"].get("D", 0)) REQUEST_INTERVAL = 120 if "experiment" not in st.session_state: st.session_state["experiment"] = None st.session_state["jobs"] = { "temperature_automation": False, "growth_rate_calculating": False, "stirring": False, "od_reading": False, "led_automation": False, "dosing_automation": False } st.session_state["client"] = None # Create a form to get the MQTT broker details st.title("PioReactor MQTT Client") # Create a button to choose whether or not to use custom credentials use_custom_credentials = st.checkbox("Use Custom Credentials") if use_custom_credentials: with st.form("mqtt_form"): # hivemq_host = st.text_input("HiveMQ Host", type="password") # hivemq_username = st.text_input("HiveMQ Username", type="password") # hivemq_password = st.text_input("HiveMQ Password", type="password") # hivemq_port = st.number_input("HiveMQ Port", min_value=1, max_value=65535, step=1, value=8883) # reactor = st.text_input("PioReactor") hivemq_host = st.text_input("HiveMQ Host", type="password") hivemq_username = st.text_input("HiveMQ Username", type="password") hivemq_password = st.text_input("HiveMQ Password", type="password") hivemq_port = st.number_input("HiveMQ Port", min_value=1, max_value=65535, step=1, value=8883) reactor = st.text_input("PioReactor") submit_button = st.form_submit_button("Connect") else: hivemq_host = os.getenv("host") hivemq_username = os.getenv("username") hivemq_password = os.getenv("password") hivemq_port = int(os.getenv("port")) reactor = os.getenv("reactor") # button to submit the form if st.session_state["experiment"] is None: submit_button = st.button("Connect") else: submit_button = st.button("Reconnect") # Create the MQTT client # @st.cache_resource def create_mqtt_client(hivemq_host, hivemq_port, hivemq_username, hivemq_password): client = mqtt.Client() # Set authentication and TLS settings client.username_pw_set(hivemq_username, hivemq_password) client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) # Define callback functions def on_connect(client, userdata, flags, rc): if rc == 0: st.success("Connected to HiveMQ successfully!") else: st.error(f"Failed to connect, return code {rc}") client.on_connect = on_connect return client def on_message_worker(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global experiment global running global temp_mqtt_auto global rpm_mqtt global led_mqtt global experiments experiment = data.get("experiment", None) running = data.get("running", []) temp_mqtt_auto = data.get("temperature_automation", None) rpm_mqtt = data.get("stirring", None) led_mqtt = data.get("leds", None) experiments = data.get("experiments", []) def on_message(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global temp_graph global od_graph global norm_od_graph global growth_rate_graph temp_graph = data.get("temperature", None) od_graph = data.get("od", None) norm_od_graph = data.get("normalized_od", None) growth_rate_graph = data.get("growth_rate", None) async def publish_temperature_request(client, experiment, filter_mod_N, lookback, reactor): """Publish a temperature request to the request topic.""" payload = { "command": "get_temperature_readings", "experiment": experiment, "filter_mod": filter_mod_N, "lookback": lookback, "reactor": reactor } payload_str = json.dumps(payload) client.publish("pioreactor/control", payload_str) async def periodic_temp_reading(client, reactor, experiment, placeholder): try: while True: await publish_temperature_request(client, experiment, 1, 100000, reactor) global temp_graph placeholder.write(temp_graph) await asyncio.sleep(REQUEST_INTERVAL) except asyncio.CancelledError: pass async def run_mqtt_client(client, placeholder): client.subscribe(f"pioreactor/{reactor}/temp_reading") client.subscribe(f"pioreactor/{reactor}/od_reading") client.subscribe(f"pioreactor/{reactor}/growth_rate") client.on_message = on_message client.loop_start() await periodic_temp_reading(client, reactor, st.session_state["experiment"], placeholder) def get_running_jobs(client, reactor): payload = { "command": "get_worker", "reactor": reactor } payload_str = json.dumps(payload) client.publish("pioreactor/control", payload_str) global experiment experiment = None running = [] client.subscribe(f"pioreactor/{reactor}/worker") client.message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) if experiment is None: st.error("Failed to get info") st.session_state["experiment"] = None time.sleep(3) st.rerun() if experiment != st.session_state["experiment"]: st.session_state["experiment"] = experiment if st.session_state["jobs"]["temperature_automation"]: if temp_mqtt_auto == "thermostat": st.session_state["temp_auto"] = "Heat To Temp" elif temp_mqtt_auto == "only_record_temperature": st.session_state["temp_auto"] = "Record Temp Only" st.session_state["temp"] = None if st.session_state["jobs"]["stirring"]: print(rpm_mqtt) st.session_state["stirring_speed"] = rpm_mqtt # st.session_state["channels"] = led_mqtt if led_mqtt is not None: st.session_state["channel_a"] = int(led_mqtt.get("A", 0)) st.session_state["channel_b"] = int(led_mqtt.get("B", 0)) st.session_state["channel_c"] = int(led_mqtt.get("C", 0)) st.session_state["channel_d"] = int(led_mqtt.get("D", 0)) st.session_state["experiments"] = experiments for run in running: st.session_state["jobs"][run] = True async def get_graph(placeholder): try: st.session_state["client"].loop_start() st.session_state["client"].subscribe(f"pioreactor/{reactor}/temperature") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/temperature", on_message) while True: await publish_temperature_request(st.session_state["client"], st.session_state["experiment"], 1, 100000, reactor) await asyncio.sleep(REQUEST_INTERVAL) global temp_graph placeholder.write(temp_graph) except asyncio.CancelledError: pass if submit_button: client = create_mqtt_client(hivemq_host, hivemq_port, hivemq_username, hivemq_password) # Publish a message to the MQTT broker if client: client.connect(hivemq_host, hivemq_port) client.loop_start() # Subscribe to the worker topic client.subscribe(f"pioreactor/{reactor}/worker") client.message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) payload = { "command": "get_worker", "reactor": reactor } payload_str = json.dumps(payload) client.publish("pioreactor/control", payload_str) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) st.session_state["experiment"] = experiment # Unsubscribe from the worker topic # client.unsubscribe(f"pioreactor/{reactor}/worker") # client.message_callback_remove(f"pioreactor/{reactor}/worker") client.loop_stop() if st.session_state["experiment"] is None: st.error("No experiment assigned to the reactor.") for run in running: st.session_state["jobs"][run] = True if st.session_state["jobs"]["temperature_automation"]: if temp_mqtt_auto == "thermostat": st.session_state["temp_auto"] = "Heat To Temp" elif temp_mqtt_auto == "only_record_temperature": st.session_state["temp_auto"] = "Record Temp Only" st.session_state["temp"] = None if st.session_state["jobs"]["stirring"]: print(rpm_mqtt) st.session_state["stirring_speed"] = rpm_mqtt if led_mqtt is not None: st.session_state["channel_a"] = int(led_mqtt.get("A", 0)) st.session_state["channel_b"] = int(led_mqtt.get("B", 0)) st.session_state["channel_c"] = int(led_mqtt.get("C", 0)) st.session_state["channel_d"] = int(led_mqtt.get("D", 0)) st.session_state["experiments"] = experiments # Save the client to the session state st.session_state["client"] = client # Display the experiment and running jobs if available if st.session_state["experiment"] is not None: st.title(f"Experiment: {st.session_state['experiment']}") st.header("Running Jobs") cols = st.columns(6) with cols[0]: st.write("Temperature") st.write(st.session_state["jobs"]["temperature_automation"]) with cols[1]: st.write("Growth Rate") st.write(st.session_state["jobs"]["growth_rate_calculating"]) with cols[2]: st.write("Stirring") st.write(st.session_state["jobs"]["stirring"]) with cols[3]: st.write("OD Reading") st.write(st.session_state["jobs"]["od_reading"]) with cols[4]: st.write("LED") st.write(st.session_state["jobs"]["led_automation"]) with cols[5]: st.write("Dosing") st.write(st.session_state["jobs"]["dosing_automation"]) st.divider() with st.form("experiment_form"): new_name = st.text_input("New Experiment Name") submit_new_experiment = st.form_submit_button("Create New Experiment") change_experiment_submit = False delete_experiment = False with st.form("experiment_edit"): if len(st.session_state["experiments"]) > 0: experiment_change = st.selectbox("Select Experiment", st.session_state["experiments"]) cols = st.columns(2) with cols[0]: change_experiment_submit = st.form_submit_button("Change Experiment") with cols[1]: delete_experiment = st.form_submit_button("Delete Experiment") else: st.write("No experiments available.") st.form_submit_button("Nothing Button") if submit_new_experiment: payload = { "command": "new_experiment", "experiment": new_name, "reactor": reactor } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) get_running_jobs(st.session_state["client"], reactor) if new_name in st.session_state["experiments"]: st.success("Experiment created successfully!") else: st.error("Failed to create experiment.") st.session_state["client"].loop_stop() time.sleep(3) st.rerun() if change_experiment_submit: payload = { "command": "change_experiment", "experiment": st.session_state["experiment"], "experiment_new": experiment_change, "reactor": reactor } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(3) get_running_jobs(st.session_state["client"], reactor) if experiment_change == st.session_state["experiment"]: st.success("Experiment changed successfully!") else: st.error("Failed to change experiment.") st.info(f"Experiment name changed to {experiment_change} from {st.session_state['experiment']}") st.session_state["client"].loop_stop() time.sleep(3) st.rerun() if delete_experiment: payload = { "command": "delete_experiment", "experiment": experiment_change, "reactor": reactor } payload_str = json.dumps(payload) time.sleep(1) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) get_running_jobs(st.session_state["client"], reactor) if experiment_change not in st.session_state["experiments"]: st.success("Experiment deleted successfully!") else: st.error("Failed to delete experiment.") st.session_state["client"].loop_stop() time.sleep(3) st.rerun() st.divider() # Display the job details st.header("Stirring") if st.session_state["jobs"]["stirring"]: with st.form("stirring_form"): if st.session_state.get("stirring_speed", None) is None: st.write("Current Stirring Speed Unknown") else: st.write(f"Current Stirring Speed: {st.session_state['stirring_speed']} RPM") stirring_speed = st.slider("Stirring Speed", min_value=100, max_value=3000, step=50, value=st.session_state.get("stirring_speed", 500)) cols = st.columns(2) with cols[0]: stirring = st.form_submit_button("Update Stirring Speed") with cols[1]: stop_stirring = st.form_submit_button("Stop Stirring") else: with st.form("stirring_form"): stirring_speed = st.slider("Stirring Speed", min_value=100, max_value=3000, step=50, value=500) stirring = st.form_submit_button("Start Stirring") stop_stirring = False if stirring: if st.session_state["jobs"]["stirring"]: payload = { "command": "update_stirring_rpm", "reactor": reactor, "experiment": st.session_state["experiment"], "rpm": stirring_speed } else: payload = { "command": "start_stirring", "experiment": st.session_state["experiment"], "reactor": reactor, "rpm": stirring_speed } payload_str = json.dumps(payload) # Start looping st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the stirring job is running payload = { "command": "get_worker", "reactor": reactor } experiment = None running = [] st.session_state["client"].subscribe(f"pioreactor/{reactor}/worker") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) payload_str = json.dumps(payload) st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) st.session_state["client"].loop_stop() if "stirring" in running and st.session_state["jobs"]["stirring"]: st.success("Stirring speed updated successfully!") st.session_state["stirring_speed"] = stirring_speed elif "stirring" in running: st.success("Stirring started successfully!") st.session_state["jobs"]["stirring"] = True st.session_state["stirring_speed"] = stirring_speed else: st.error("Failed to update stirring speed.") st.session_state["experiment"] = None st.session_state["jobs"]["stirring_speed"] = None time.sleep(3) st.rerun() if stop_stirring: payload = { "command": "stop_stirring", "reactor": reactor, "experiment": st.session_state["experiment"] } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) st.session_state["client"].loop_stop() # Check if the stirring job is running payload = { "command": "get_worker", "reactor": reactor } st.session_state["client"].subscribe(f"pioreactor/{reactor}/worker") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) experiment = None running = [] payload_str = json.dumps(payload) st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) if "stirring" not in running: st.success("Stirring stopped successfully!") st.session_state["stirring_speed"] = None st.session_state["jobs"]["stirring"] = False else: st.error("Failed to stop stirring.") time.sleep(3) st.rerun() st.divider() st.header("Temperature Automation") if st.session_state["jobs"]["temperature_automation"]: temp = None if st.session_state.get("temp_auto", None) is None: st.write("Current Temperature Automation Unknown, changes will result in restart of the automation to new settings.") temp_auto = st.selectbox("Temperature Automation", ["Record Temp Only", "Heat To Temp"], placeholder="UNKNOWN") with st.form("temperature_form"): if temp_auto == "Heat To Temp": temp = st.slider("Temperature", min_value=0, max_value=60, step=1, value=30) cols = st.columns(2) with cols[0]: temp_submit = st.form_submit_button("Restart Temperature Automation") with cols[1]: temp_stop = st.form_submit_button("Stop Temperature Automation") elif temp_auto == "Record Temp Only": cols = st.columns(2) with cols[0]: temp_submit = st.form_submit_button("Restart Temperature Automation") with cols[1]: temp_stop = st.form_submit_button("Stop Temperature Automation") else: if st.session_state["temp_auto"] == "Heat To Temp": index = 1 elif st.session_state["temp_auto"] == "Record Temp Only": index = 0 temp_auto = st.selectbox("Temperature Automation", ["Record Temp Only", "Heat To Temp"], index=index) with st.form("temperature_form"): st.write(f"Current Temperature Automation: {st.session_state['temp_auto']}") if temp_auto == "Heat To Temp": if st.session_state.get("temp", None) is not None: st.write(f"Current Temperature: {st.session_state['temp']} °C") temp = st.slider("Temperature", min_value=0, max_value=60, step=1, value=st.session_state["temp"] if st.session_state["temp"] is not None else 30) cols = st.columns(2) with cols[0]: temp_submit = st.form_submit_button("Update Temperature Automation") with cols[1]: temp_stop = st.form_submit_button("Stop Temperature Automation") elif temp_auto == "Record Temp Only": cols = st.columns(2) with cols[0]: temp_submit = st.form_submit_button("Update Temperature Automation", disabled=(st.session_state["temp_auto"] == "Record Temp Only")) with cols[1]: temp_stop = st.form_submit_button("Stop Temperature Automation") else: temp = None temp_auto = st.selectbox("Temperature Automation", ["Record Temp Only", "Heat To Temp"], placeholder="Select an option") with st.form("temperature_form"): if temp_auto == "Heat To Temp": temp = st.slider("Temperature", min_value=0, max_value=60, step=1, value=30) temp_submit = st.form_submit_button("Start Temperature Automation") temp_stop = False elif temp_auto == "Record Temp Only": temp_submit = st.form_submit_button("Start Temperature Automation") temp_stop = False if temp_submit: # Convert the temperature automation to the payload if temp_auto == "Heat To Temp": payload_auto = "thermostat" elif temp_auto == "Record Temp Only": payload_auto = "only_record_temperature" # Update the temperature automation if st.session_state["jobs"]["temperature_automation"]: #Check if the temperature automation is the same as the current one if temp_auto == st.session_state.get("temp_auto", None): # Same automation, update the temperature # Update the temperature automation payload = { "command": "temp_update", "reactor": reactor, "experiment": st.session_state["experiment"], "settings": { "target_temperature": temp } } else: # Different automation, restart the automation payload = { "command": "temp_restart", "reactor": reactor, "experiment": st.session_state["experiment"], "automation": payload_auto, "temp": temp } else: # Start the temperature automation payload = { "command": "set_temperature_automation", "reactor": reactor, "experiment": st.session_state["experiment"], "automation": payload_auto, "temp": temp } payload_str = json.dumps(payload) # Start looping st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the stirring job is running payload = { "command": "get_worker", "reactor": reactor } experiment = None running = [] st.session_state["client"].subscribe(f"pioreactor/{reactor}/worker") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) payload_str = json.dumps(payload) st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) st.session_state["client"].loop_stop() if "temperature_automation" in running and st.session_state["jobs"]["temperature_automation"]: st.success("Temperature automation updated!") st.session_state["temp_auto"] = temp_auto st.session_state["temp"] = temp elif "temperature_automation" in running: st.success("Temperature automation started successfully!") st.session_state["jobs"]["temperature_automation"] = True st.session_state["temp_auto"] = temp_auto st.session_state["temp"] = temp else: st.error("Failed to update temperature automation speed.") st.session_state["experiment"] = None st.session_state["jobs"]["temperature_automation"] = None time.sleep(3) st.rerun() if temp_stop: payload = { "command": "temp_update", "reactor": reactor, "experiment": st.session_state["experiment"], "settings": { "$state": "disconnected" } } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the temperature automation job has stopped payload = { "command": "get_worker", "reactor": reactor } experiment = None running = [] st.session_state["client"].subscribe(f"pioreactor/{reactor}/worker") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/worker", on_message_worker) payload_str = json.dumps(payload) st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) timeout = 10 start_time = time.time() while experiment is None and (time.time() - start_time) < timeout: time.sleep(1) if "temperature_automation" not in running: st.success("Temperature automation stopped successfully!") st.session_state["temp_auto"] = None st.session_state["temp"] = None else: st.error("Failed to stop temperature automation.") st.session_state["experiment"] = None time.sleep(3) st.rerun() st.divider() cols = st.columns(2) start_od = False stop_od = False start_growth = False stop_growth = False with cols[0]: st.header("OD Reading") if st.session_state["jobs"]["od_reading"]: stop_od = st.button("Stop OD Reading") else: start_od = st.button("Start OD Reading") with cols[1]: st.header("Growth Rate") if st.session_state["jobs"]["growth_rate_calculating"]: stop_growth = st.button("Stop Growth Rate Calculation") else: start_growth = st.button("Start Growth Rate Calculation") if start_od: payload = { "command": "start_od_reading", "reactor": reactor, "experiment": st.session_state["experiment"] } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(2) # Check if the OD reading job is running get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if "od_reading" in running: st.success("OD Reading started successfully!") st.session_state["jobs"]["od_reading"] = True else: st.error("Failed to start OD Reading.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() if stop_od: payload = { "command": "stop_od_reading", "reactor": reactor, "experiment": st.session_state["experiment"] } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the OD reading job has stopped get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if "od_reading" not in running: st.success("OD Reading stopped successfully!") st.session_state["jobs"]["od_reading"] = False else: st.error("Failed to stop OD Reading.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() if start_growth: payload = { "command": "start_growth_rate", "reactor": reactor, "experiment": st.session_state["experiment"] } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the growth rate job is running get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if "growth_rate_calculating" in running: st.success("Growth Rate Calculation started successfully!") st.session_state["jobs"]["growth_rate_calculating"] = True else: st.error("Failed to start Growth Rate Calculation.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() if stop_growth: payload = { "command": "stop_growth_rate", "reactor": reactor, "experiment": st.session_state["experiment"] } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the growth rate job has stopped get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if "growth_rate_calculating" not in running: st.success("Growth Rate Calculation stopped successfully!") st.session_state["jobs"]["growth_rate_calculating"] = False else: st.error("Failed to stop Growth Rate Calculation.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() st.divider() # Display the temperature graph # Using asyncio to run the loop in the background and periodically update the graph # st.header("LED Automation") # with st.form("led_form"): # channel_a = st.slider("Channel A", min_value=0, max_value=100, step=1, value=0) # channel_a_submit = st.form_submit_button("Update Channel A") # if channel_a_submit: # payload = { # "command": "set_led_intensity", # "reactor": reactor, # "experiment": st.session_state["experiment"], # "brightness": channel_a, # "led": "A" # } # payload_str = json.dumps(payload) # st.session_state["client"].loop_start() # st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) # time.sleep(1) # # Check if the LED automation job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "led_automation" in running: # st.success("LED Automation updated!") # else: # st.error("Failed to update LED Automation.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment # time.sleep(3) # st.rerun() # with st.form("led_form_b"): # channel_b = st.slider("Channel B", min_value=0, max_value=100, step=1, value=0) # channel_b_submit = st.form_submit_button("Update Channel B") # if channel_b_submit: # payload = { # "command": "set_led_intensity", # "reactor": reactor, # "experiment": st.session_state["experiment"], # "brightness": channel_b, # "led": "B" # } # payload_str = json.dumps(payload) # st.session_state["client"].loop_start() # st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) # time.sleep(1) # # Check if the LED automation job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "led_automation" in running: # st.success("LED Automation updated!") # else: # st.error("Failed to update LED Automation.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment # time.sleep(3) # st.rerun() # with st.form("led_form_c"): # channel_c = st.slider("Channel C", min_value=0, max_value=100, step=1, value=0) # channel_c_submit = st.form_submit_button("Update Channel C") # if channel_c_submit: # payload = { # "command": "set_led_intensity", # "reactor": reactor, # "experiment": st.session_state["experiment"], # "brightness": channel_c, # "led": "C" # } # payload_str = json.dumps(payload) # st.session_state["client"].loop_start() # st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) # time.sleep(1) # # Check if the LED automation job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "led_automation" in running: # st.success("LED Automation updated!") # else: # st.error("Failed to update LED Automation.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment # time.sleep(3) # st.rerun() # with st.form("led_form_d"): # channel_d = st.slider("Channel D", min_value=0, max_value=100, step=1, value=0) # channel_d_submit = st.form_submit_button("Update Channel D") # if channel_d_submit: # payload = { # "command": "set_led_intensity", # "reactor": reactor, # "experiment": st.session_state["experiment"], # "brightness": channel_d, # "led": "D" # } # payload_str = json.dumps(payload) # st.session_state["client"].loop_start() # st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) # time.sleep(1) # # Check if the LED automation job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "led_automation" in running: # st.success("LED Automation updated!") # else: # st.error("Failed to update LED Automation.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment # time.sleep(3) # st.rerun() # st.divider() st.header("LEDS") cols = st.columns(4) with cols[0]: st.write("Channel A") with st.form("led_form_a"): channel_a_placeholder = st.empty() channel_a_submit = st.form_submit_button("Update Channel A") with cols[1]: st.write("Channel B") with st.form("led_form_b"): channel_b_placeholder = st.empty() channel_b_submit = st.form_submit_button("Update Channel B") with cols[2]: st.write("Channel C") with st.form("led_form_c"): channel_c_placeholder = st.empty() channel_c_submit = st.form_submit_button("Update Channel C") with cols[3]: st.write("Channel D") with st.form("led_form_d"): channel_d_placeholder = st.empty() channel_d_submit = st.form_submit_button("Update Channel D") channel_a = channel_a_placeholder.number_input("Channel A", min_value=0, max_value=100, step=1, value=st.session_state.get("channel_a", 0)) channel_b = channel_b_placeholder.number_input("Channel B", min_value=0, max_value=100, step=1, value=st.session_state.get("channel_b", 0)) channel_c = channel_c_placeholder.number_input("Channel C", min_value=0, max_value=100, step=1, value=st.session_state.get("channel_c", 0)) channel_d = channel_d_placeholder.number_input("Channel D", min_value=0, max_value=100, step=1, value=st.session_state.get("channel_d", 0)) if channel_a_submit: pass if channel_b_submit: payload = { "command": "set_led_intensity", "reactor": reactor, "experiment": st.session_state["experiment"], "brightness": channel_b, "led": "B" } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) channel_a_placeholder.empty() channel_b_placeholder.empty() channel_c_placeholder.empty() channel_d_placeholder.empty() # Check if the LED automation job is running get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if channel_b == int(led_mqtt.get("B", 0)): st.success("LED Intensity updated!") else: st.error("Failed to update LED Intensity.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() if channel_c_submit: payload = { "command": "set_led_intensity", "reactor": reactor, "experiment": st.session_state["experiment"], "brightness": channel_c, "led": "C" } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) channel_a_placeholder.empty() channel_b_placeholder.empty() channel_c_placeholder.empty() channel_d_placeholder.empty() # Check if the LED automation job is running get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if channel_c == int(led_mqtt.get("C", 0)): st.success("LED Intensity updated!") else: st.error("Failed to update LED Intensity.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() if channel_d_submit: payload = { "command": "set_led_intensity", "reactor": reactor, "experiment": st.session_state["experiment"], "brightness": channel_d, "led": "D" } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) channel_a_placeholder.empty() channel_b_placeholder.empty() channel_c_placeholder.empty() channel_d_placeholder.empty() # Check if the LED automation job is running get_running_jobs(st.session_state["client"], reactor) st.session_state["client"].loop_stop() if channel_d == int(led_mqtt.get("D", 0)): st.success("LED Intensity updated!") else: st.error("Failed to update LED Intensity.") st.session_state["experiment"] = None for run in running: st.session_state["jobs"][run] = True st.session_state["experiment"] = experiment time.sleep(3) st.rerun() st.divider() st.header("Dosing") cols = st.columns(3) with cols[0]: st.subheader("Add Media") with st.form("media_form"): media_ml = st.number_input("Media (mL)", min_value=0, step=1, value=0) media_submit = st.form_submit_button("Add Media") with cols[1]: st.subheader("Remove Waste") with st.form("waste_form"): waste_ml = st.number_input("Waste (mL)", min_value=0, step=1, value=0) waste_submit = st.form_submit_button("Remove Waste") with cols[2]: st.subheader("Cycle Media") with st.form("cycle_form"): cycle_s = st.number_input("Cycle (s)", min_value=0, step=1, value=0) cycle_submit = st.form_submit_button("Cycle Media") if media_submit: payload = { "command": "pump_add_media", "reactor": reactor, "experiment": st.session_state["experiment"], "volume": media_ml } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the dosing job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "dosing" in running: # st.success("Media added successfully!") # else: # st.error("Failed to add media.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment st.success("Media added successfully!") time.sleep(3) st.rerun() if waste_submit: payload = { "command": "pump_remove_media", "reactor": reactor, "experiment": st.session_state["experiment"], "volume": waste_ml } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the dosing job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "dosing" in running: # st.success("Waste removed successfully!") # else: # st.error("Failed to remove waste.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment st.success("Waste removed successfully!") time.sleep(3) st.rerun() if cycle_submit: payload = { "command": "circulate_media", "reactor": reactor, "experiment": st.session_state["experiment"], "duration": cycle_s } payload_str = json.dumps(payload) st.session_state["client"].loop_start() st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) time.sleep(1) # Check if the dosing job is running # get_running_jobs(st.session_state["client"], reactor) # st.session_state["client"].loop_stop() # if "dosing" in running: # st.success("Media cycled successfully!") # else: # st.error("Failed to cycle media.") # st.session_state["experiment"] = None # for run in running: # st.session_state["jobs"][run] = True # st.session_state["experiment"] = experiment st.success("Media cycled successfully!") time.sleep(3) st.rerun() st.divider() st.header("Graphs") st.session_state["client"].loop_start() st.session_state["client"].subscribe(f"pioreactor/{reactor}/readings") st.session_state["client"].message_callback_add(f"pioreactor/{reactor}/readings", on_message) temp_graph_actual = [] st.subheader("Temperature Graph") amount_of_data = st.radio("Amount of data", ["1 hour", "24 hours", "All data"], index=1, key="temperature") placeholder = st.empty() st.subheader("OD Reading Graph") amount_of_data2 = st.radio("Amount of data", ["1 hour", "24 hours", "All data"], index=1, key="od") placeholder2 = st.empty() st.subheader("Normalized OD Reading Graph") amount_of_data3 = st.radio("Amount of data", ["1 hour", "24 hours", "All data"], index=1, key="norm_od") placeholder3 = st.empty() st.subheader("Growth Rate Graph") amount_of_data4 = st.radio("Amount of data", ["1 hour", "24 hours", "All data"], index=1, key="growth_rate") placeholder4 = st.empty() # get_graph(placeholder) while True: if st.session_state.get("df", None) is not None and st.session_state.get("temp_radio", None) == amount_of_data: placeholder.line_chart(st.session_state["df"], x_label="Time", y_label="Temperature (°C)") if st.session_state.get("df2", None) is not None and st.session_state.get("od_radio", None) == amount_of_data2: placeholder2.line_chart(st.session_state["df2"], x_label="Time", y_label="OD Reading") if st.session_state.get("df3", None) is not None and st.session_state.get("norm_od_radio", None) == amount_of_data3: placeholder3.line_chart(st.session_state["df3"], x_label="Time", y_label="Normalized OD Reading") if st.session_state.get("df4", None) is not None and st.session_state.get("growth_rate_radio", None) == amount_of_data4: placeholder4.line_chart(st.session_state["df4"], x_label="Time", y_label="Growth Rate") st.session_state["temp_radio"] = amount_of_data st.session_state["od_radio"] = amount_of_data2 st.session_state["norm_od_radio"] = amount_of_data3 st.session_state["growth_rate_radio"] = amount_of_data4 temp_graph = None od_graph = None norm_od_graph = None growth_rate_graph = None payload = { "command": "get_readings", "experiment": st.session_state["experiment"], "reactor": reactor, "filter_mod": 1, "lookback": 10000000, "filter_mod2": 1, "lookback2": 10000000, "filter_mod3": 1, "lookback3": 10000000, "filter_mod4": 1, "lookback4": 10000000, "amount": amount_of_data, "amount2": amount_of_data2, "amount3": amount_of_data3, "amount4": amount_of_data4 } payload_str = json.dumps(payload) st.session_state["client"].publish("pioreactor/control", payload_str, qos=1) timeout = 10 start_time = time.time() while (temp_graph is None or od_graph is None or norm_od_graph is None or growth_rate_graph is None) and (time.time() - start_time) < timeout: time.sleep(1) # Check if all is None # if temp_graph is None and od_graph is None and norm_od_graph is None and growth_rate_graph is None: # st.error("Failed to retrieve data.") # st.stop() if len(temp_graph) != 0: for temp in temp_graph: utc_time = datetime.strptime(temp["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) temp["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df = pd.DataFrame(temp_graph) df = df.set_index("x") placeholder.line_chart(df, x_label="Time", y_label="Temperature (°C)") st.session_state["df"] = df if len(od_graph) != 0: for od in od_graph: utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df2 = pd.DataFrame(od_graph) df2 = df2.set_index("x") placeholder2.line_chart(df2, x_label="Time", y_label="OD Reading") st.session_state["df2"] = df2 if len(norm_od_graph) != 0: for norm_od in norm_od_graph: utc_time = datetime.strptime(norm_od["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) norm_od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df3 = pd.DataFrame(norm_od_graph) df3 = df3.set_index("x") placeholder3.line_chart(df3, x_label="Time", y_label="Normalized OD Reading") st.session_state["df3"] = df3 if len(growth_rate_graph) != 0: for growth_rate in growth_rate_graph: utc_time = datetime.strptime(growth_rate["x"], "%Y-%m-%dT%H:%M:%S.%fZ") local_tz = pytz.timezone("America/New_York") utc_time = utc_time.replace(tzinfo=pytz.utc) local_time = utc_time.astimezone(local_tz) growth_rate["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S") df4 = pd.DataFrame(growth_rate_graph) df4 = df4.set_index("x") placeholder4.line_chart(df4, x_label="Time", y_label="Growth Rate") st.session_state["df4"] = df4 # Filter the data based on the amount of data requested # Data is in 4 minute intervals time.sleep(REQUEST_INTERVAL)