npv2k1 commited on
Commit
4390951
·
1 Parent(s): 4a485db

feat: implement asynchronous Kafka and Redis pub/sub mechanisms with message handling

Browse files
src/modules/transporter/kafka.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3
+ from functools import wraps
4
+ from typing import Callable, Dict
5
+
6
+
7
+ class AsyncKafkaPubSub:
8
+ def __init__(self, bootstrap_servers="ubuntu:9092", group_id="default-py-group"):
9
+ self.bootstrap_servers = bootstrap_servers
10
+ self.group_id = group_id
11
+ self.handlers: Dict[str, Callable[[str], None]] = {}
12
+ self._consumer_started = False
13
+ self._lock = asyncio.Lock()
14
+ self._producer = None
15
+
16
+ def subscribe(self, topic: str):
17
+ print(f"Subscribing to topic: {topic}")
18
+ def decorator(func: Callable[[str], None]):
19
+ self.handlers[topic] = func
20
+ asyncio.create_task(self._ensure_consumer())
21
+ return func
22
+ return decorator
23
+
24
+ async def publish(self, topic: str, message: str):
25
+ if not self._producer:
26
+ self._producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
27
+ await self._producer.start()
28
+ await self._producer.send_and_wait(topic, message.encode())
29
+
30
+ async def _ensure_consumer(self):
31
+ async with self._lock:
32
+ if self._consumer_started:
33
+ return
34
+ self._consumer_started = True
35
+ await self._start_consumer()
36
+
37
+ async def _start_consumer(self):
38
+ consumer = AIOKafkaConsumer(
39
+ *self.handlers.keys(),
40
+ bootstrap_servers=self.bootstrap_servers,
41
+ group_id=self.group_id,
42
+ auto_offset_reset='latest',
43
+ )
44
+ await consumer.start()
45
+ print(f"Subscribed to: {', '.join(self.handlers.keys())}")
46
+
47
+ async def listen():
48
+ try:
49
+ async for msg in consumer:
50
+ print(f"Received message on topic '{msg.topic}': {msg.value.decode()}")
51
+ topic = msg.topic
52
+ data = msg.value.decode()
53
+ handler = self.handlers.get(topic)
54
+ if handler:
55
+ await self._maybe_async(handler, data)
56
+ finally:
57
+ await consumer.stop()
58
+
59
+ asyncio.create_task(listen())
60
+
61
+ async def _maybe_async(self, func: Callable, *args, **kwargs):
62
+ result = func(*args, **kwargs)
63
+ if asyncio.iscoroutine(result):
64
+ await result
65
+
66
+
67
+ kafka_pubsub = AsyncKafkaPubSub()
68
+
69
+
70
+ @kafka_pubsub.subscribe("chat")
71
+ def handle_chat(msg):
72
+ print(f"[chat kafka] {msg}")
73
+
src/modules/transporter/rabbitmq.py CHANGED
@@ -2,6 +2,7 @@ import pika
2
  import threading
3
 
4
  credentials = pika.PlainCredentials('guest', 'guest')
 
5
  parameters = pika.ConnectionParameters(
6
  host='ubuntu',
7
  port=5672, # default RabbitMQ port
@@ -39,6 +40,12 @@ def process_message(msg):
39
  print(f"[x] Received: {msg}")
40
 
41
 
 
 
 
 
 
 
42
  # publish a message to the queue
43
  def publish_message(queue_name, message):
44
  connection = pika.BlockingConnection(parameters)
 
2
  import threading
3
 
4
  credentials = pika.PlainCredentials('guest', 'guest')
5
+
6
  parameters = pika.ConnectionParameters(
7
  host='ubuntu',
8
  port=5672, # default RabbitMQ port
 
40
  print(f"[x] Received: {msg}")
41
 
42
 
43
+ @queue_listener("test")
44
+ def test_queue_listener(msg):
45
+ # This function is just for testing the queue listener
46
+ print(f"[test] Received message: {msg}")
47
+
48
+
49
  # publish a message to the queue
50
  def publish_message(queue_name, message):
51
  connection = pika.BlockingConnection(parameters)
src/modules/transporter/redis_client.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import redis
3
+
4
+ # Connect to Redis
5
+ r = redis.Redis(host='ubuntu', port=6379, decode_responses=True)
6
+
7
+ # Set a value
8
+ r.set('foo', 'bar')
9
+
10
+ # Get a value
11
+ value = r.get('foo')
12
+ print(value) # Output: bar
13
+
14
+
15
+ print("Redis client is running...", r.ping())
16
+
17
+
18
+ import asyncio
19
+ from redis.asyncio import Redis
20
+ from functools import wraps
21
+ from typing import Callable, Dict
22
+
23
+
24
+ class AsyncRedisPubSub:
25
+ def __init__(self, host="ubuntu", port=6379, db=0):
26
+ self.redis = Redis(host=host, port=port, db=db)
27
+ self.channel_handlers: Dict[str, Callable[[str], None]] = {}
28
+ self._subscriber_started = False
29
+ self._lock = asyncio.Lock()
30
+
31
+ def subscribe(self, channel: str):
32
+ def decorator(func: Callable[[str], None]):
33
+ self.channel_handlers[channel] = func
34
+ asyncio.create_task(self._ensure_subscriber())
35
+ return func
36
+ return decorator
37
+
38
+ async def publish(self, channel: str, message: str):
39
+ await self.redis.publish(channel, message)
40
+
41
+ async def _ensure_subscriber(self):
42
+ async with self._lock:
43
+ if self._subscriber_started:
44
+ return
45
+ self._subscriber_started = True
46
+ await self._start_subscriber()
47
+
48
+ async def _start_subscriber(self):
49
+ pubsub = self.redis.pubsub()
50
+ await pubsub.subscribe(*self.channel_handlers.keys())
51
+ print(f"Subscribed to: {', '.join(self.channel_handlers.keys())}")
52
+
53
+ async def listen():
54
+ async for message in pubsub.listen():
55
+ if message["type"] != "message":
56
+ continue
57
+ channel = message["channel"].decode()
58
+ data = message["data"].decode()
59
+ handler = self.channel_handlers.get(channel)
60
+ if handler:
61
+ await self._maybe_async(handler, data)
62
+
63
+ asyncio.create_task(listen())
64
+
65
+ async def _maybe_async(self, func: Callable, *args, **kwargs):
66
+ result = func(*args, **kwargs)
67
+ if asyncio.iscoroutine(result):
68
+ await result
69
+
70
+
71
+ pubsub = AsyncRedisPubSub()
72
+
73
+ @pubsub.subscribe("chat")
74
+ def handle_chat(msg):
75
+ print(f"[chat redis] Received: {msg}")
76
+
77
+ @pubsub.subscribe("news")
78
+ def handle_news(msg):
79
+ print(f"[news] Breaking: {msg}")