Spaces:
Runtime error
Runtime error
File size: 1,971 Bytes
bc0521a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import pika
import threading
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
host='ubuntu',
port=5672, # default RabbitMQ port
virtual_host='/', # default vhost, change if needed
credentials=credentials
)
def queue_listener(queue_name, host="localhost"):
def decorator(func):
def start_consumer():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
try:
func(body.decode()) # Gọi hàm xử lý
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error handling message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f"[+] Listening on queue '{queue_name}'")
channel.start_consuming()
# chạy consumer trên thread riêng (không chặn main thread)
threading.Thread(target=start_consumer, daemon=True).start()
return func
return decorator
@queue_listener("hello-python")
def process_message(msg):
print(f"[x] Received: {msg}")
# publish a message to the queue
def publish_message(queue_name, message):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(f"[x] Sent: {message}")
publish_message("hello-python", "RabbitMQ is running!") |