Spaces:
Sleeping
Sleeping
import json | |
import os | |
from pathlib import Path | |
import paho.mqtt.client as mqtt | |
import streamlit as st | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from matplotlib.patches import Rectangle | |
import secrets | |
from time import time, sleep | |
# Initialize Streamlit app | |
st.title("Light-mixing Control Panel") | |
# Description and context | |
st.markdown( | |
""" | |
This application accesses a public test demo located in Toronto, ON, Canada (as of 2024-07-27). | |
For more context, you can refer to this [Colab notebook](https://colab.research.google.com/github/sparks-baird/self-driving-lab-demo/blob/main/notebooks/4.2-paho-mqtt-colab-sdl-demo-test.ipynb) | |
and the [self-driving-lab-demo project](https://github.com/sparks-baird/self-driving-lab-demo). | |
You may also be interested in the Acceleration Consortium's ["Hello World" microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html) for self-driving labs. | |
""" | |
) | |
max_power = 0.35 | |
max_value = round(max_power * 255) | |
with st.form("mqtt_form"): | |
# MQTT Configuration | |
HIVEMQ_HOST = st.text_input( | |
"Enter your HiveMQ host:", | |
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", | |
type="password", | |
) | |
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") | |
HIVEMQ_PASSWORD = st.text_input( | |
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" | |
) | |
PORT = st.number_input( | |
"Enter the port number:", min_value=1, max_value=65535, value=8883 | |
) | |
# User input for the Pico ID | |
PICO_ID = st.text_input("Enter your Pico ID:", "test", type="password") | |
# Information about the maximum power reduction | |
st.info( | |
f"The upper limit for RGB power levels has been set to {max_value} instead of 255. NeoPixels are bright 😎" | |
) | |
# Sliders for RGB values | |
R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0) | |
G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0) | |
B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0) | |
submit_button = st.form_submit_button(label="Send RGB Command") | |
command_topic = f"sdl-demo/picow/{PICO_ID}/GPIO/28/" | |
sensor_data_topic = f"sdl-demo/picow/{PICO_ID}/as7341/" | |
# random session id to keep track of the session and filter out old data | |
experiment_id = secrets.token_hex(4) # 4 bytes = 8 characters | |
sensor_data_file = f"sensor_data-{experiment_id}.json" | |
# TODO: Session ID using st.session_state to have history of commands and sensor data | |
# file_path = Path(sensor_data_file) | |
# file_path.unlink(missing_ok=True) | |
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource | |
# (on_message to be set later since filename is dynamic) | |
def get_paho_client( | |
sensor_data_topic, hostname, username, password=None, port=8883, tls=True | |
): | |
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5) | |
def on_connect(client, userdata, flags, rc, properties=None): | |
if rc != 0: | |
print("Connected with result code " + str(rc)) | |
client.subscribe(sensor_data_topic, qos=1) | |
client.on_connect = on_connect | |
if tls: | |
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) | |
client.username_pw_set(username, password) | |
client.connect(hostname, port) | |
client.loop_start() # Use a non-blocking loop | |
return client | |
def send_and_receive(client, command_topic, msg, queue_timeout=15): | |
print("Sending command...") | |
result = client.publish(command_topic, json.dumps(msg), qos=2) | |
result.wait_for_publish() # Ensure the message is sent | |
if result.rc == mqtt.MQTT_ERR_SUCCESS: | |
print(f"Command sent: {msg} to topic {command_topic}") | |
else: | |
print(f"Failed to send command: {result.rc}") | |
timeout = time() + queue_timeout # Set timeout | |
while True: | |
if time() > timeout: | |
st.error("No sensor data received within the timeout period.") | |
return None | |
if os.path.exists(sensor_data_file): | |
with open(sensor_data_file, "r") as f: | |
sensor_data = json.load(f) | |
file_path = Path(sensor_data_file) | |
file_path.unlink(missing_ok=True) | |
return sensor_data | |
# Helper function to plot discrete spectral sensor data | |
def plot_spectra(sensor_data): | |
"""https://chatgpt.com/share/210d2fee-ca64-45a5-866e-e6df6e56bd1c""" | |
wavelengths = np.array([410, 440, 470, 510, 550, 583, 620, 670]) | |
intensities = np.array( | |
[ | |
sensor_data["ch410"], | |
sensor_data["ch440"], | |
sensor_data["ch470"], | |
sensor_data["ch510"], | |
sensor_data["ch550"], | |
sensor_data["ch583"], | |
sensor_data["ch620"], | |
sensor_data["ch670"], | |
] | |
) | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
num_points = 100 # for "fake" color bar effect | |
# Adding rectangles for color bar effect | |
dense_wavelengths = np.linspace(wavelengths.min(), wavelengths.max(), num_points) | |
rect_height = max(intensities) * 0.02 # Height of the rectangles | |
for dw in dense_wavelengths: | |
rect = Rectangle( | |
( | |
dw - (wavelengths.max() - wavelengths.min()) / num_points / 2, | |
-rect_height * 2, | |
), | |
(wavelengths.max() - wavelengths.min()) / num_points, | |
rect_height * 3, | |
color=plt.cm.rainbow( | |
(dw - wavelengths.min()) / (wavelengths.max() - wavelengths.min()) | |
), | |
edgecolor="none", | |
) | |
ax.add_patch(rect) | |
# Main scatter plot | |
scatter = ax.scatter( | |
wavelengths, intensities, c=wavelengths, cmap="rainbow", edgecolor="k" | |
) | |
# Adding vertical lines from the x-axis to each point | |
for wavelength, intensity in zip(wavelengths, intensities): | |
ax.vlines(wavelength, 0, intensity, color="gray", linestyle="--", linewidth=1) | |
# Adjust limits and labels with larger font size | |
ax.set_xlim(wavelengths.min() - 10, wavelengths.max() + 10) | |
ax.set_ylim( | |
0, max(intensities) + 15 | |
) # Ensure the lower y limit is 0 and add buffer | |
ax.set_xticks(wavelengths) | |
ax.set_xlabel("Wavelength (nm)", fontsize=14) | |
ax.set_ylabel("Intensity", fontsize=14) | |
ax.set_title("Spectral Intensity vs. Wavelength", fontsize=16) | |
ax.tick_params(axis="both", which="major", labelsize=12) | |
st.pyplot(fig) | |
# Publish button | |
if submit_button: | |
if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: | |
st.error("Please enter all required fields.") | |
else: | |
st.info( | |
f"Please wait while the command {R, G, B} for experiment {experiment_id} is sent..." | |
) | |
client = get_paho_client( | |
sensor_data_topic, | |
HIVEMQ_HOST, | |
HIVEMQ_USERNAME, | |
password=HIVEMQ_PASSWORD, | |
port=int(PORT), | |
tls=True, | |
) | |
def on_message(client, userdata, msg): | |
with open(sensor_data_file, "w") as f: | |
json.dump(json.loads(msg.payload), f) | |
client.on_message = on_message | |
command_msg = {"R": R, "G": G, "B": B} | |
sensor_data = send_and_receive( | |
client, command_topic, command_msg, queue_timeout=15 | |
) | |
if sensor_data: | |
received_cmd = sensor_data["_input_message"] | |
R1 = received_cmd["R"] | |
G1 = received_cmd["G"] | |
B1 = received_cmd["B"] | |
st.success( | |
f"Command {R1, G1, B1} for experiment {experiment_id} sent successfully!" | |
) | |
plot_spectra(sensor_data) | |
st.write("Sensor Data Received:", sensor_data) | |