sgbaird commited on
Commit
e56c4e3
1 Parent(s): 8fda02f

chore: Add Actuator Control Panel implementation using Streamlit and MQTT and basic scripts and reqs file

Browse files
app.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """This streamlit implementation is preferred over the gradio implementation"""
2
+
3
+ import json
4
+
5
+ import paho.mqtt.client as mqtt
6
+ import streamlit as st
7
+
8
+ # Initialize Streamlit app
9
+ st.title("Actuator Control Panel")
10
+
11
+ # MQTT Configuration
12
+ HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
13
+ HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "")
14
+ HIVEMQ_PASSWORD = st.text_input("Enter your HiveMQ password:", "", type="password")
15
+ PORT = st.number_input(
16
+ "Enter the port number:", min_value=1, max_value=65535, value=8883
17
+ )
18
+
19
+ # User input for the Pico ID
20
+ pico_id = st.text_input("Enter your Pico ID:", "", type="password")
21
+
22
+ # Slider for position value
23
+ position = st.slider(
24
+ "Select the position value:", min_value=1.1, max_value=1.9, value=1.5
25
+ )
26
+
27
+
28
+ # singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
29
+ @st.cache_resource
30
+ def get_paho_client(hostname, username, password=None, port=8883, tls=True):
31
+
32
+ client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
33
+
34
+ # The callback for when the client receives a CONNACK response from the server.
35
+ def on_connect(client, userdata, flags, rc, properties=None):
36
+ if rc != 0:
37
+ print("Connected with result code " + str(rc))
38
+
39
+ client.on_connect = on_connect
40
+
41
+ # enable TLS for secure connection
42
+ if tls:
43
+ client.tls_set()
44
+ # set username and password
45
+ client.username_pw_set(username, password)
46
+ # connect to HiveMQ Cloud on port 8883 (default for MQTT)
47
+ client.connect(hostname, port)
48
+ client.loop_start() # Use a non-blocking loop
49
+
50
+ return client
51
+
52
+
53
+ def send_command(client, pico_id, position):
54
+ # Topic
55
+ command_topic = f"digital-pipette/picow/{pico_id}/L16-R"
56
+
57
+ # Create and send command
58
+ command = {"position": position}
59
+
60
+ try:
61
+ result = client.publish(command_topic, json.dumps(command), qos=1)
62
+ result.wait_for_publish() # Ensure the message is sent
63
+ if result.rc == mqtt.MQTT_ERR_SUCCESS:
64
+ return f"Command sent: {command} to topic {command_topic}"
65
+ else:
66
+ return f"Failed to send command: {result.rc}"
67
+ except Exception as e:
68
+ return f"An error occurred: {e}"
69
+
70
+
71
+ # Publish button
72
+ if st.button("Send Command"):
73
+ if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
74
+ st.error("Please enter all required fields.")
75
+ else:
76
+ client = get_paho_client(
77
+ HIVEMQ_HOST,
78
+ HIVEMQ_USERNAME,
79
+ password=HIVEMQ_PASSWORD,
80
+ port=int(PORT),
81
+ tls=True,
82
+ )
83
+ success_msg = send_command(client, pico_id, position)
84
+ st.success(success_msg)
requirements.txt ADDED
@@ -0,0 +1 @@
 
 
1
+ paho-mqtt
scripts/digital_pipette_picow_basic.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import utime
2
+ from machine import PWM, Pin
3
+
4
+ # Setup PWM
5
+ pwm = PWM(Pin(0)) # Use the appropriate GPIO pin
6
+ pwm.freq(50) # 50 Hz frequency
7
+
8
+
9
+ def set_position(pulse_ms):
10
+ duty = int((pulse_ms / 20.0) * 65535)
11
+ pwm.duty_u16(duty)
12
+
13
+
14
+ # Example to set the actuator to different positions
15
+ set_position(1.1) # Almost full retraction
16
+ utime.sleep(5)
17
+ set_position(1.5) # Halfway
18
+ utime.sleep(5)
19
+ set_position(1.9) # Almost full extension
20
+ utime.sleep(5)
21
+ set_position(1.1) # Almost full retraction
22
+ utime.sleep(5)
23
+ set_position(1.5) # Halfway
24
+ utime.sleep(5)
25
+
26
+ # Add your logic to set it to the desired intermediate positions
27
+
28
+ pwm.deinit() # Deinitialize PWM
scripts/single_submit.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """This streamlit implementation is preferred over the gradio implementation"""
2
+
3
+ import json
4
+
5
+ import paho.mqtt.client as mqtt
6
+
7
+ # MQTT Configuration
8
+ HIVEMQ_HOST = ""
9
+ HIVEMQ_USERNAME = ""
10
+ HIVEMQ_PASSWORD = ""
11
+ PORT = 8883
12
+
13
+ # User input for the Pico ID
14
+ pico_id = ""
15
+
16
+ # Slider for position value
17
+ position = 1.35
18
+
19
+
20
+ def send_command(client, pico_id, position):
21
+ # Topic
22
+ command_topic = f"digital-pipette/picow/{pico_id}/L16-R"
23
+
24
+ # Create and send command
25
+ command = {"position": position}
26
+ client.publish(command_topic, json.dumps(command), qos=1)
27
+
28
+ return f"Command sent: {command} to topic {command_topic}"
29
+
30
+
31
+ # Initialize MQTT client
32
+ client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
33
+ client.tls_set()
34
+ client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD)
35
+ client.connect(HIVEMQ_HOST, PORT, 60)
36
+
37
+ success_msg = send_command(client, pico_id, position)
38
+
39
+ client.disconnect()