import json import queue import paho.mqtt.client as mqtt import streamlit as st import matplotlib.pyplot as plt import numpy as np from matplotlib.patches import Rectangle # Initialize Streamlit app st.title("Light-mixing Control Panel") # Description and context st.markdown( """ This application accesses a public test demo located in Toronto, ON, Canada (as of 2024-07-27). For more context, you can refer to this [Colab notebook](https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/4.2-paho-mqtt-colab-sdl-demo-test.ipynb) and the [self-driving-lab-demo project](https://github.com/sparks-baird/self-driving-lab-demo). You may also be interested in the Acceleration Consortium's ["Hello World" microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html) for self-driving labs. """ ) # MQTT Configuration HIVEMQ_HOST = st.text_input( "Enter your HiveMQ host:", "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", type="password", ) HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") HIVEMQ_PASSWORD = st.text_input( "Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" ) PORT = st.number_input( "Enter the port number:", min_value=1, max_value=65535, value=8883 ) # User input for the Pico ID PICO_ID = st.text_input("Enter your Pico ID:", "test", type="password") command_topic = f"sdl-demo/picow/{PICO_ID}/GPIO/28/" sensor_data_topic = f"sdl-demo/picow/{PICO_ID}/as7341/" max_power = 0.35 max_value = round(max_power * 255) # Information about the maximum power reduction st.info( f"The upper limit for RGB power levels has been set to {max_value} instead of 255. NeoPixels are bright 😎" ) # Sliders for RGB values R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0) G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0) B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0) # # Initialize session state for messages and lock state if "messages" not in st.session_state: st.session_state.messages = [] # if "locked" not in st.session_state: # st.session_state.locked = False # if "command_history" not in st.session_state: # st.session_state.command_history = [] # if "sensor_data_history" not in st.session_state: # st.session_state.sensor_data_history = [] # Queue to hold sensor data sensor_data_queue = queue.Queue() # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource @st.cache_resource def get_paho_client( sensor_data_topic, hostname, username, password=None, port=8883, tls=True ): client = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5 ) # create new instance def on_message(client, userdata, msg): sensor_data_queue.put(json.loads(msg.payload)) def on_connect(client, userdata, flags, rc, properties=None): if rc != 0: print("Connected with result code " + str(rc)) client.subscribe(sensor_data_topic, qos=1) client.on_connect = on_connect client.on_message = on_message if tls: client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) client.username_pw_set(username, password) client.connect(hostname, port) client.loop_start() # Use a non-blocking loop return client def send_and_receive(client, command_topic, msg, queue_timeout=60): print("Sending command...") result = client.publish(command_topic, json.dumps(msg), qos=2) result.wait_for_publish() # Ensure the message is sent if result.rc == mqtt.MQTT_ERR_SUCCESS: print(f"Command sent: {msg} to topic {command_topic}") else: print(f"Failed to send command: {result.rc}") try: sensor_data = sensor_data_queue.get(True, queue_timeout) return sensor_data except queue.Empty: st.error("No sensor data received within the timeout period.") return None # Helper function to plot discrete spectral sensor data def plot_spectra(sensor_data): """https://chatgpt.com/share/210d2fee-ca64-45a5-866e-e6df6e56bd1c""" wavelengths = np.array([410, 440, 470, 510, 550, 583, 620, 670]) intensities = np.array( [ sensor_data["ch410"], sensor_data["ch440"], sensor_data["ch470"], sensor_data["ch510"], sensor_data["ch550"], sensor_data["ch583"], sensor_data["ch620"], sensor_data["ch670"], ] ) fig, ax = plt.subplots(figsize=(10, 6)) num_points = 100 # for "fake" color bar effect # Adding rectangles for color bar effect dense_wavelengths = np.linspace(wavelengths.min(), wavelengths.max(), num_points) rect_height = max(intensities) * 0.02 # Height of the rectangles for dw in dense_wavelengths: rect = Rectangle( ( dw - (wavelengths.max() - wavelengths.min()) / num_points / 2, -rect_height * 2, ), (wavelengths.max() - wavelengths.min()) / num_points, rect_height * 3, color=plt.cm.rainbow( (dw - wavelengths.min()) / (wavelengths.max() - wavelengths.min()) ), edgecolor="none", ) ax.add_patch(rect) # Main scatter plot scatter = ax.scatter( wavelengths, intensities, c=wavelengths, cmap="rainbow", edgecolor="k" ) # Adding vertical lines from the x-axis to each point for wavelength, intensity in zip(wavelengths, intensities): ax.vlines(wavelength, 0, intensity, color="gray", linestyle="--", linewidth=1) # Adjust limits and labels with larger font size ax.set_xlim(wavelengths.min() - 10, wavelengths.max() + 10) ax.set_ylim( 0, max(intensities) + 15 ) # Ensure the lower y limit is 0 and add buffer ax.set_xticks(wavelengths) ax.set_xlabel("Wavelength (nm)", fontsize=14) ax.set_ylabel("Intensity", fontsize=14) ax.set_title("Spectral Intensity vs. Wavelength", fontsize=16) ax.tick_params(axis="both", which="major", labelsize=12) st.pyplot(fig) # Publish button # if st.button("Send RGB Command", disabled=st.session_state.locked): if st.button("Send RGB Command"): if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: st.error("Please enter all required fields.") else: # st.session_state.locked = True st.info("Please wait while the command is sent...") client = get_paho_client( sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD, port=int(PORT), tls=True, ) command_msg = {"R": R, "G": G, "B": B} sensor_data = send_and_receive( client, command_topic, command_msg, queue_timeout=15 ) # st.session_state.locked = False print("Waiting for sensor data...") if sensor_data: st.success("Command sent successfully!") plot_spectra(sensor_data) st.write("Sensor Data Received:", sensor_data)