sgbaird commited on
Commit
758a4c9
1 Parent(s): 18aba6e

chore: Update RGB control panel with Streamlit and MQTT implementation

Browse files
scripts/basic_rgb_led_sensor.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/1.4.1-onboard-led-temp.html
3
+
4
+ permalink: https://github.com/AccelerationConsortium/ac-microcourses/blob/07dc9a0286ded2e21ea64f02d8ae697717e786b9/docs/courses/hello-world/1.4.1-onboard-led-temp.ipynb
5
+ """
6
+
7
+ import paho.mqtt.client as mqtt
8
+ import json
9
+ from queue import Queue
10
+
11
+ PICO_ID = "test" # UPDATE THIS TO YOUR ID
12
+
13
+ command_topic = f"sdl-demo/picow/{PICO_ID}/GPIO/28/"
14
+ sensor_data_topic = f"sdl-demo/picow/{PICO_ID}/as7341/"
15
+
16
+ HIVEMQ_USERNAME = "sgbaird"
17
+ HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"
18
+ HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
19
+
20
+ sensor_data_queue: "Queue[dict]" = Queue()
21
+
22
+
23
+ def get_paho_client(
24
+ sensor_data_topic, hostname, username, password=None, port=8883, tls=True
25
+ ):
26
+ client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
27
+
28
+ def on_message(client, userdata, msg):
29
+ sensor_data_queue.put(json.loads(msg.payload))
30
+
31
+ def on_connect(client, userdata, flags, rc, properties=None):
32
+ if rc != 0:
33
+ print("Connected with result code " + str(rc))
34
+ client.subscribe(sensor_data_topic, qos=1)
35
+
36
+ client.on_connect = on_connect
37
+ client.on_message = on_message
38
+
39
+ if tls:
40
+ client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
41
+ client.username_pw_set(username, password)
42
+ client.connect(hostname, port)
43
+ client.subscribe(sensor_data_topic, qos=2)
44
+
45
+ return client
46
+
47
+
48
+ def send_and_receive(client, command_topic, msg, queue_timeout=60):
49
+ client.publish(command_topic, json.dumps(msg), qos=2)
50
+
51
+ client.loop_start()
52
+
53
+ while True:
54
+ sensor_data = sensor_data_queue.get(True, queue_timeout)
55
+ client.loop_stop()
56
+ return sensor_data
57
+
58
+
59
+ client = get_paho_client(
60
+ sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD
61
+ )
62
+
63
+ command_msg = {"R": 10, "G": 10, "B": 10}
64
+ sensor_data = send_and_receive(client, command_topic, command_msg, queue_timeout=30)
65
+ print(sensor_data)
66
+
67
+ 1 + 1
scripts/rgb_led_sensor.py CHANGED
@@ -1,28 +1,59 @@
1
- """
2
- https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/1.4.1-onboard-led-temp.html
3
-
4
- permalink: https://github.com/AccelerationConsortium/ac-microcourses/blob/07dc9a0286ded2e21ea64f02d8ae697717e786b9/docs/courses/hello-world/1.4.1-onboard-led-temp.ipynb
5
- """
6
 
7
  import paho.mqtt.client as mqtt
8
- import json
9
- from queue import Queue
 
 
10
 
11
- PICO_ID = "test" # UPDATE THIS TO YOUR ID
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
- command_topic = f"{PICO_ID}/neopixel"
14
- sensor_data_topic = f"{PICO_ID}/as7341"
15
 
16
- HIVEMQ_USERNAME = "sgbaird"
17
- HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"
18
- HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
19
 
20
- sensor_data_queue: "Queue[dict]" = Queue()
 
 
 
21
 
 
 
 
 
22
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  def get_paho_client(
24
  sensor_data_topic, hostname, username, password=None, port=8883, tls=True
25
  ):
 
26
  client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
27
 
28
  def on_message(client, userdata, msg):
@@ -40,7 +71,7 @@ def get_paho_client(
40
  client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
41
  client.username_pw_set(username, password)
42
  client.connect(hostname, port)
43
- client.subscribe(sensor_data_topic, qos=2)
44
 
45
  return client
46
 
@@ -48,18 +79,38 @@ def get_paho_client(
48
  def send_and_receive(client, command_topic, msg, queue_timeout=60):
49
  client.publish(command_topic, json.dumps(msg), qos=2)
50
 
51
- client.loop_start()
52
-
53
  while True:
54
  sensor_data = sensor_data_queue.get(True, queue_timeout)
55
- client.loop_stop()
56
  return sensor_data
57
 
58
 
59
- client = get_paho_client(
60
- sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD
61
- )
62
-
63
- command_msg = {"R": 50, "G": 20, "B": 240}
64
- sensor_data = send_and_receive(client, command_topic, command_msg, queue_timeout=30)
65
- print(sensor_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import queue
3
+ import threading
 
 
4
 
5
  import paho.mqtt.client as mqtt
6
+ import streamlit as st
7
+
8
+ # Initialize Streamlit app
9
+ st.title("RGB Command and Sensor Data Panel")
10
 
11
+ # MQTT Configuration
12
+ HIVEMQ_HOST = st.text_input(
13
+ "Enter your HiveMQ host:",
14
+ "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud",
15
+ type="password",
16
+ )
17
+ HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird")
18
+ HIVEMQ_PASSWORD = st.text_input(
19
+ "Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password"
20
+ )
21
+ PORT = st.number_input(
22
+ "Enter the port number:", min_value=1, max_value=65535, value=8883
23
+ )
24
 
25
+ # User input for the Pico ID
26
+ pico_id = st.text_input("Enter your Pico ID:", "test", type="password")
27
 
28
+ max_power = 0.3
29
+ max_value = round(max_power * 255)
 
30
 
31
+ # Information about the maximum power reduction
32
+ st.info(
33
+ f"The maximum power has been reduced. The upper limit for RGB values is {max_value} instead of 255."
34
+ )
35
 
36
+ # Sliders for RGB values
37
+ R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0)
38
+ G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0)
39
+ B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0)
40
 
41
+ # Initialize session state for messages and lock state
42
+ if "messages" not in st.session_state:
43
+ st.session_state.messages = []
44
+ if "locked" not in st.session_state:
45
+ st.session_state.locked = False
46
+
47
+ # Queue to hold sensor data
48
+ sensor_data_queue = queue.Queue()
49
+
50
+
51
+ # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
52
+ @st.cache_resource
53
  def get_paho_client(
54
  sensor_data_topic, hostname, username, password=None, port=8883, tls=True
55
  ):
56
+
57
  client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
58
 
59
  def on_message(client, userdata, msg):
 
71
  client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
72
  client.username_pw_set(username, password)
73
  client.connect(hostname, port)
74
+ client.loop_start() # Use a non-blocking loop
75
 
76
  return client
77
 
 
79
  def send_and_receive(client, command_topic, msg, queue_timeout=60):
80
  client.publish(command_topic, json.dumps(msg), qos=2)
81
 
 
 
82
  while True:
83
  sensor_data = sensor_data_queue.get(True, queue_timeout)
 
84
  return sensor_data
85
 
86
 
87
+ # Publish button
88
+ if st.button("Send RGB Command") and not st.session_state.locked:
89
+ if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
90
+ st.error("Please enter all required fields.")
91
+ else:
92
+ st.session_state.locked = True
93
+ command_topic = f"{pico_id}/neopixel"
94
+ sensor_data_topic = f"{pico_id}/as7341"
95
+
96
+ client = get_paho_client(
97
+ sensor_data_topic,
98
+ HIVEMQ_HOST,
99
+ HIVEMQ_USERNAME,
100
+ password=HIVEMQ_PASSWORD,
101
+ port=int(PORT),
102
+ tls=True,
103
+ )
104
+
105
+ command_msg = {"R": R, "G": G, "B": B}
106
+ sensor_data = send_and_receive(
107
+ client, command_topic, command_msg, queue_timeout=30
108
+ )
109
+
110
+ st.session_state.locked = False
111
+ st.success("Command sent successfully!")
112
+ st.write("Sensor Data Received:", sensor_data)
113
+
114
+ # Display received messages
115
+ for message in st.session_state.messages:
116
+ st.write(f"Received message: {message}")