File size: 4,397 Bytes
ee3c704
722435e
ee3c704
 
722435e
 
 
ee3c704
722435e
ee3c704
 
 
 
 
 
 
 
722435e
 
ee3c704
722435e
 
 
 
 
 
 
70b4505
95608ed
e970125
80dfab2
722435e
 
 
9c74df0
 
6424caa
1591d77
9c74df0
5c32678
95608ed
618e662
e18efce
bb00cfa
39c781d
95608ed
 
 
 
bb00cfa
ee3c704
bb00cfa
 
7dce64e
722435e
ee3c704
7da974d
ee3c704
722435e
3eeeacd
722435e
1ec1680
 
ee3c704
722435e
 
 
 
ee3c704
722435e
 
 
ee3c704
 
 
da5366e
95608ed
7da974d
287b9d1
6aefd93
a41de55
 
39c781d
9004c90
 
7da974d
6faade7
95608ed
ee3c704
3eeeacd
722435e
 
 
 
ee3c704
722435e
 
 
ee3c704
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import threading
import time
import os
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
import state  # ูŠุญุชูˆูŠ ุนู„ู‰ `singlas_state`


# ------------------ Load .env ------------------ #
load_dotenv(dotenv_path="keys.env")

mqtt_broker = os.getenv("MQTT_BROKER")
mqtt_port = int(os.getenv("MQTT_PORT"))
MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")

def topic_msg(tl_id):
    return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}"

def topic_rep(tl_id):
    return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}"
    

# ------------------ Get state ------------------ #
def get_State(tl_id):
    return state.singlas_state.get(tl_id, "FREE")

def checked_ql(ql):
    max_queue_length = 16 # 15 # this is the maximum queue length for the road
    return int(ql)<max_queue_length # for now, always return True, can be improved later to check if the queue length is valid or not
# ------------------ Listener ------------------ #
def mqtt_listener():
    def on_message(client, userdata, message):
        msg = message.payload.decode()
        topic = message.topic
        tl_id = topic.split('/')[1]
        print(f"{state.request}-----{tl_id}")
        print(f"๐Ÿ“ฅ Reply Received on {topic}: {msg}")

        if (msg == 'AVBL'):
            state.request[tl_id]['accepted'] = True
            state.singlas_state[tl_id] = state.request[tl_id]['State']
            next_request = 'DONE QL?' # ask for queue length if to open road in emergency
            client.publish(topic_msg(tl_id), next_request, qos=1)

        if (msg.startswith("QL")):
            ql = msg.split()[1]
            if checked_ql(ql):
              client.publish(topic_msg(tl_id), state.singlas_state[tl_id] +" "+str(state.request[tl_id]['Duration']), qos=1)  

        #if(state.request[tl_id]['State']==state.status_emr):
        #    client.publish(topic_msg(tl_id), state.status_emr +" "+str(state.request[tl_id]['Duration']), qos=1)  

    client = mqtt.Client()
    client.connect(mqtt_broker, mqtt_port)


    # ุงุดุชุฑูƒ ููŠ ูƒู„ ุงู„ุฑุฏูˆุฏ
    for tl_id in state.singlas_state.keys():
        client.subscribe(topic_rep(tl_id))
        print(f"โœ… Subscribed to {topic_rep(tl_id)}")

    
    client.on_message = on_message
    client.loop_forever()

# ------------------ Publisher ------------------ #
def mqtt_publisher_loop(tl_id):
    pub_client = mqtt.Client()
    pub_client.connect(mqtt_broker, mqtt_port)
    pub_client.loop_start()

    last_state = None
    while True:
        current_state = state.singlas_state[tl_id] 

        if current_state != last_state:
            print(f"๐Ÿ“ก state {current_state} on {topic_msg(tl_id)} ,state")

            #if current_state != state.status_free:
            topic = topic_msg(tl_id)
            pub_client.publish(topic, current_state,qos=1)
            
            last_state = current_state


        time.sleep(.01)


# ------------------ Startup ------------------ #
def start_mqtt():
    # Start listener once
    threading.Thread(target=mqtt_listener, daemon=True).start()

    # Start one publisher per tl_id
    for tl_id in state.singlas_state.keys():
        threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()


'''
from fastapi import WebSocket, WebSocketDisconnect
import asyncio

@app.websocket("/ws/state")
async def websocket_get_state(websocket: WebSocket):
    await websocket.accept()
    try:
        tl_id = await websocket.receive_text()
        print(f"โœ… Client subscribed to {tl_id}")

        last_state = None
        while True:
            current_state = get_State(tl_id)
            if current_state != last_state:
                await websocket.send_text(current_state)
                last_state = current_state
            await asyncio.sleep(1)  # ูƒู„ ุซุงู†ูŠุฉ ูŠุดูŠูƒ ุงู„ุชุบูŠูŠุฑ
    except WebSocketDisconnect:
        print("โŒ Client disconnected")

        
@app.get("/test-db")
def test_db():
    try:
        connection = get_db()
        with connection.cursor() as cursor:
            cursor.execute("SELECT 1")
            result = cursor.fetchone()
        connection.close()
        return {"status": "connected", "result": result}
    except Exception as e:
        return {"status": "error", "detail": str(e)}

'''