digital-pipette / scripts /example_app_with_response.py
sgbaird's picture
chore: Add link to documentation in example_app_with_response.py
2152642
"""https://chatgpt.com/share/19aa181c-051e-4387-9991-578603dd4590"""
import json
import queue
import threading
import paho.mqtt.client as mqtt
import streamlit as st
# Initialize Streamlit app
st.title("Actuator Control Panel")
# MQTT Configuration
HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "")
HIVEMQ_PASSWORD = st.text_input("Enter your HiveMQ password:", "", type="password")
PORT = st.number_input(
"Enter the port number:", min_value=1, max_value=65535, value=8883
)
# User input for the Pico ID
pico_id = st.text_input("Enter your Pico ID:", "", type="password")
# Slider for position value
position = st.slider(
"Select the position value:", min_value=1.1, max_value=1.9, value=1.5
)
# Initialize session state for messages
if "messages" not in st.session_state:
st.session_state.messages = []
# Queue to hold messages
message_queue = queue.Queue()
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def get_paho_client(hostname, username, password=None, port=8883, tls=True):
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("Connected with result code " + str(rc))
else:
print("Connected successfully")
client.on_connect = on_connect
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
message_queue.put(msg.payload.decode())
client.on_message = on_message
# Enable TLS for secure connection
if tls:
client.tls_set()
# Set username and password
client.username_pw_set(username, password)
# Connect to HiveMQ Cloud on port 8883 (default for MQTT)
client.connect(hostname, port)
client.loop_start() # Use a non-blocking loop
return client
def send_command(client, pico_id, position):
# Topic
command_topic = f"digital-pipette/picow/{pico_id}/L16-R"
# Create and send command
command = {"position": position}
try:
result = client.publish(command_topic, json.dumps(command), qos=1)
result.wait_for_publish() # Ensure the message is sent
if result.rc == mqtt.MQTT_ERR_SUCCESS:
return f"Command sent: {command} to topic {command_topic}"
else:
return f"Failed to send command: {result.rc}"
except Exception as e:
return f"An error occurred: {e}"
# Subscribe to the response topic
def subscribe_to_response(client, pico_id):
response_topic = f"digital-pipette/picow/{pico_id}/response"
client.subscribe(response_topic, qos=1)
# Publish button
if st.button("Send Command"):
if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
st.error("Please enter all required fields.")
else:
client = get_paho_client(
HIVEMQ_HOST,
HIVEMQ_USERNAME,
password=HIVEMQ_PASSWORD,
port=int(PORT),
tls=True,
)
subscribe_to_response(client, pico_id)
success_msg = send_command(client, pico_id, position)
st.success(success_msg)
# Start a thread to listen for messages
def listen_for_messages():
while True:
msg = message_queue.get()
if msg:
st.session_state.messages.append(msg)
st.experimental_rerun()
threading.Thread(target=listen_for_messages, daemon=True).start()
# Display received messages
for message in st.session_state.messages:
st.write(f"Received message: {message}")