import json import queue import threading import paho.mqtt.client as mqtt import streamlit as st # Initialize Streamlit app st.title("RGB Command and Sensor Data Panel") # MQTT Configuration HIVEMQ_HOST = st.text_input( "Enter your HiveMQ host:", "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", type="password", ) HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") HIVEMQ_PASSWORD = st.text_input( "Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" ) PORT = st.number_input( "Enter the port number:", min_value=1, max_value=65535, value=8883 ) # User input for the Pico ID PICO_ID = st.text_input("Enter your Pico ID:", "test", type="password") command_topic = f"sdl-demo/picow/{PICO_ID}/GPIO/28/" sensor_data_topic = f"sdl-demo/picow/{PICO_ID}/as7341/" max_power = 0.35 max_value = round(max_power * 255) # Information about the maximum power reduction st.info( f"The upper limit for RGB power levels has been set to {max_value} instead of 255. NeoPixels can get very bright 😎" ) # Sliders for RGB values R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0) G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0) B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0) # Initialize session state for messages and lock state if "messages" not in st.session_state: st.session_state.messages = [] if "locked" not in st.session_state: st.session_state.locked = False # Queue to hold sensor data sensor_data_queue = queue.Queue() # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource @st.cache_resource def get_paho_client( sensor_data_topic, hostname, username, password=None, port=8883, tls=True ): client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance def on_message(client, userdata, msg): sensor_data_queue.put(json.loads(msg.payload)) def on_connect(client, userdata, flags, rc, properties=None): if rc != 0: print("Connected with result code " + str(rc)) client.subscribe(sensor_data_topic, qos=1) client.on_connect = on_connect client.on_message = on_message if tls: client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) client.username_pw_set(username, password) client.connect(hostname, port) client.loop_start() # Use a non-blocking loop return client def send_and_receive(client, command_topic, msg, queue_timeout=60): client.publish(command_topic, json.dumps(msg), qos=2) while True: sensor_data = sensor_data_queue.get(True, queue_timeout) return sensor_data # Publish button if st.button("Send RGB Command") and not st.session_state.locked: if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: st.error("Please enter all required fields.") else: st.session_state.locked = True client = get_paho_client( sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD, port=int(PORT), tls=True, ) command_msg = {"R": R, "G": G, "B": B} sensor_data = send_and_receive( client, command_topic, command_msg, queue_timeout=30 ) st.session_state.locked = False st.success("Command sent successfully!") st.write("Sensor Data Received:", sensor_data) # Display received messages for message in st.session_state.messages: st.write(f"Received message: {message}")