Spaces:
Runtime error
Runtime error
import pika | |
import threading | |
credentials = pika.PlainCredentials('guest', 'guest') | |
parameters = pika.ConnectionParameters( | |
host='ubuntu', | |
port=5672, # default RabbitMQ port | |
virtual_host='/', # default vhost, change if needed | |
credentials=credentials | |
) | |
def queue_listener(queue_name, host="localhost"): | |
def decorator(func): | |
def start_consumer(): | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
channel.queue_declare(queue=queue_name, durable=True) | |
def callback(ch, method, properties, body): | |
try: | |
func(body.decode()) # Gọi hàm xử lý | |
ch.basic_ack(delivery_tag=method.delivery_tag) | |
except Exception as e: | |
print(f"Error handling message: {e}") | |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) | |
channel.basic_qos(prefetch_count=1) | |
channel.basic_consume(queue=queue_name, on_message_callback=callback) | |
print(f"[+] Listening on queue '{queue_name}'") | |
channel.start_consuming() | |
# chạy consumer trên thread riêng (không chặn main thread) | |
threading.Thread(target=start_consumer, daemon=True).start() | |
return func | |
return decorator | |
def process_message(msg): | |
print(f"[x] Received: {msg}") | |
# publish a message to the queue | |
def publish_message(queue_name, message): | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
# channel.queue_declare(queue=queue_name, durable=True) | |
channel.basic_publish( | |
exchange='', | |
routing_key=queue_name, | |
body=message, | |
properties=pika.BasicProperties( | |
delivery_mode=2, # make message persistent | |
) | |
) | |
print(f"[x] Sent: {message}") | |
publish_message("hello-python", "RabbitMQ is running!") |