Spaces:
Sleeping
Sleeping
File size: 4,397 Bytes
ee3c704 722435e ee3c704 722435e ee3c704 722435e ee3c704 722435e ee3c704 722435e 70b4505 95608ed e970125 80dfab2 722435e 9c74df0 6424caa 1591d77 9c74df0 5c32678 95608ed 618e662 e18efce bb00cfa 39c781d 95608ed bb00cfa ee3c704 bb00cfa 7dce64e 722435e ee3c704 7da974d ee3c704 722435e 3eeeacd 722435e 1ec1680 ee3c704 722435e ee3c704 722435e ee3c704 da5366e 95608ed 7da974d 287b9d1 6aefd93 a41de55 39c781d 9004c90 7da974d 6faade7 95608ed ee3c704 3eeeacd 722435e ee3c704 722435e ee3c704 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import threading
import time
import os
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
import state # ูุญุชูู ุนูู `singlas_state`
# ------------------ Load .env ------------------ #
load_dotenv(dotenv_path="keys.env")
mqtt_broker = os.getenv("MQTT_BROKER")
mqtt_port = int(os.getenv("MQTT_PORT"))
MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")
def topic_msg(tl_id):
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}"
def topic_rep(tl_id):
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}"
# ------------------ Get state ------------------ #
def get_State(tl_id):
return state.singlas_state.get(tl_id, "FREE")
def checked_ql(ql):
max_queue_length = 16 # 15 # this is the maximum queue length for the road
return int(ql)<max_queue_length # for now, always return True, can be improved later to check if the queue length is valid or not
# ------------------ Listener ------------------ #
def mqtt_listener():
def on_message(client, userdata, message):
msg = message.payload.decode()
topic = message.topic
tl_id = topic.split('/')[1]
print(f"{state.request}-----{tl_id}")
print(f"๐ฅ Reply Received on {topic}: {msg}")
if (msg == 'AVBL'):
state.request[tl_id]['accepted'] = True
state.singlas_state[tl_id] = state.request[tl_id]['State']
next_request = 'DONE QL?' # ask for queue length if to open road in emergency
client.publish(topic_msg(tl_id), next_request, qos=1)
if (msg.startswith("QL")):
ql = msg.split()[1]
if checked_ql(ql):
client.publish(topic_msg(tl_id), state.singlas_state[tl_id] +" "+str(state.request[tl_id]['Duration']), qos=1)
#if(state.request[tl_id]['State']==state.status_emr):
# client.publish(topic_msg(tl_id), state.status_emr +" "+str(state.request[tl_id]['Duration']), qos=1)
client = mqtt.Client()
client.connect(mqtt_broker, mqtt_port)
# ุงุดุชุฑู ูู ูู ุงูุฑุฏูุฏ
for tl_id in state.singlas_state.keys():
client.subscribe(topic_rep(tl_id))
print(f"โ
Subscribed to {topic_rep(tl_id)}")
client.on_message = on_message
client.loop_forever()
# ------------------ Publisher ------------------ #
def mqtt_publisher_loop(tl_id):
pub_client = mqtt.Client()
pub_client.connect(mqtt_broker, mqtt_port)
pub_client.loop_start()
last_state = None
while True:
current_state = state.singlas_state[tl_id]
if current_state != last_state:
print(f"๐ก state {current_state} on {topic_msg(tl_id)} ,state")
#if current_state != state.status_free:
topic = topic_msg(tl_id)
pub_client.publish(topic, current_state,qos=1)
last_state = current_state
time.sleep(.01)
# ------------------ Startup ------------------ #
def start_mqtt():
# Start listener once
threading.Thread(target=mqtt_listener, daemon=True).start()
# Start one publisher per tl_id
for tl_id in state.singlas_state.keys():
threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
'''
from fastapi import WebSocket, WebSocketDisconnect
import asyncio
@app.websocket("/ws/state")
async def websocket_get_state(websocket: WebSocket):
await websocket.accept()
try:
tl_id = await websocket.receive_text()
print(f"โ
Client subscribed to {tl_id}")
last_state = None
while True:
current_state = get_State(tl_id)
if current_state != last_state:
await websocket.send_text(current_state)
last_state = current_state
await asyncio.sleep(1) # ูู ุซุงููุฉ ูุดูู ุงูุชุบููุฑ
except WebSocketDisconnect:
print("โ Client disconnected")
@app.get("/test-db")
def test_db():
try:
connection = get_db()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
connection.close()
return {"status": "connected", "result": result}
except Exception as e:
return {"status": "error", "detail": str(e)}
'''
|