Spaces:
Runtime error
Runtime error
"""https://chatgpt.com/share/19aa181c-051e-4387-9991-578603dd4590""" | |
import json | |
import queue | |
import threading | |
import paho.mqtt.client as mqtt | |
import streamlit as st | |
# Initialize Streamlit app | |
st.title("Actuator Control Panel") | |
# MQTT Configuration | |
HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password") | |
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "") | |
HIVEMQ_PASSWORD = st.text_input("Enter your HiveMQ password:", "", type="password") | |
PORT = st.number_input( | |
"Enter the port number:", min_value=1, max_value=65535, value=8883 | |
) | |
# User input for the Pico ID | |
pico_id = st.text_input("Enter your Pico ID:", "", type="password") | |
# Slider for position value | |
position = st.slider( | |
"Select the position value:", min_value=1.1, max_value=1.9, value=1.5 | |
) | |
# Initialize session state for messages | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# Queue to hold messages | |
message_queue = queue.Queue() | |
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource | |
def get_paho_client(hostname, username, password=None, port=8883, tls=True): | |
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5) | |
# The callback for when the client receives a CONNACK response from the server. | |
def on_connect(client, userdata, flags, rc, properties=None): | |
if rc != 0: | |
print("Connected with result code " + str(rc)) | |
else: | |
print("Connected successfully") | |
client.on_connect = on_connect | |
# The callback for when a PUBLISH message is received from the server. | |
def on_message(client, userdata, msg): | |
message_queue.put(msg.payload.decode()) | |
client.on_message = on_message | |
# Enable TLS for secure connection | |
if tls: | |
client.tls_set() | |
# Set username and password | |
client.username_pw_set(username, password) | |
# Connect to HiveMQ Cloud on port 8883 (default for MQTT) | |
client.connect(hostname, port) | |
client.loop_start() # Use a non-blocking loop | |
return client | |
def send_command(client, pico_id, position): | |
# Topic | |
command_topic = f"digital-pipette/picow/{pico_id}/L16-R" | |
# Create and send command | |
command = {"position": position} | |
try: | |
result = client.publish(command_topic, json.dumps(command), qos=1) | |
result.wait_for_publish() # Ensure the message is sent | |
if result.rc == mqtt.MQTT_ERR_SUCCESS: | |
return f"Command sent: {command} to topic {command_topic}" | |
else: | |
return f"Failed to send command: {result.rc}" | |
except Exception as e: | |
return f"An error occurred: {e}" | |
# Subscribe to the response topic | |
def subscribe_to_response(client, pico_id): | |
response_topic = f"digital-pipette/picow/{pico_id}/response" | |
client.subscribe(response_topic, qos=1) | |
# Publish button | |
if st.button("Send Command"): | |
if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: | |
st.error("Please enter all required fields.") | |
else: | |
client = get_paho_client( | |
HIVEMQ_HOST, | |
HIVEMQ_USERNAME, | |
password=HIVEMQ_PASSWORD, | |
port=int(PORT), | |
tls=True, | |
) | |
subscribe_to_response(client, pico_id) | |
success_msg = send_command(client, pico_id, position) | |
st.success(success_msg) | |
# Start a thread to listen for messages | |
def listen_for_messages(): | |
while True: | |
msg = message_queue.get() | |
if msg: | |
st.session_state.messages.append(msg) | |
st.experimental_rerun() | |
threading.Thread(target=listen_for_messages, daemon=True).start() | |
# Display received messages | |
for message in st.session_state.messages: | |
st.write(f"Received message: {message}") | |