File size: 1,971 Bytes
bc0521a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import pika
import threading

credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
    host='ubuntu',
    port=5672,  # default RabbitMQ port
    virtual_host='/',  # default vhost, change if needed
    credentials=credentials
)

def queue_listener(queue_name, host="localhost"):
    def decorator(func):
        def start_consumer():
            connection = pika.BlockingConnection(parameters)
            channel = connection.channel()
            channel.queue_declare(queue=queue_name, durable=True)

            def callback(ch, method, properties, body):
                try:
                    func(body.decode())  # Gọi hàm xử lý
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as e:
                    print(f"Error handling message: {e}")
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(queue=queue_name, on_message_callback=callback)
            print(f"[+] Listening on queue '{queue_name}'")
            channel.start_consuming()

        # chạy consumer trên thread riêng (không chặn main thread)
        threading.Thread(target=start_consumer, daemon=True).start()
        return func
    return decorator

@queue_listener("hello-python")
def process_message(msg):
    print(f"[x] Received: {msg}")


# publish a message to the queue
def publish_message(queue_name, message):
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    # channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        )
    )
    print(f"[x] Sent: {message}")

publish_message("hello-python", "RabbitMQ is running!")