| """ |
| Message queue - buffer system for async message processing. |
| """ |
| from typing import Optional, List |
| from collections import deque |
| import asyncio |
| import json |
| from datetime import datetime |
|
|
|
|
| class MessageQueue: |
| """ |
| In-memory message queue with async support. |
| In production, use Redis for distributed processing. |
| """ |
| |
| def __init__(self, max_size: int = 10000): |
| self._queue = deque() |
| self._max_size = max_size |
| self._lock = asyncio.Lock() |
| |
| async def enqueue(self, message) -> bool: |
| """ |
| Add a message to the queue. |
| |
| Args: |
| message: Message object to queue |
| |
| Returns: |
| True if enqueued successfully |
| """ |
| async with self._lock: |
| if len(self._queue) >= self._max_size: |
| return False |
| |
| |
| msg_data = { |
| "id": message.id, |
| "sender_id": message.sender_id, |
| "receiver_id": message.receiver_id, |
| "group_id": message.group_id, |
| "content": message.content, |
| "type": message.type, |
| "created_at": message.created_at.isoformat() if message.created_at else None |
| } |
| self._queue.append(msg_data) |
| return True |
| |
| async def dequeue(self) -> Optional[dict]: |
| """ |
| Remove and return the oldest message from queue. |
| |
| Returns: |
| Message data dict or None if empty |
| """ |
| async with self._lock: |
| if not self._queue: |
| return None |
| return self._queue.popleft() |
| |
| async def peek(self) -> Optional[dict]: |
| """View the oldest message without removing it.""" |
| async with self._lock: |
| if not self._queue: |
| return None |
| return self._queue[0] |
| |
| async def get_batch(self, size: int = 100) -> List[dict]: |
| """Get multiple messages at once for batch processing.""" |
| async with self._lock: |
| batch = [] |
| for _ in range(min(size, len(self._queue))): |
| if self._queue: |
| batch.append(self._queue.popleft()) |
| return batch |
| |
| async def clear(self): |
| """Clear all messages from queue.""" |
| async with self._lock: |
| self._queue.clear() |
| |
| def size(self) -> int: |
| """Get current queue size (non-async).""" |
| return len(self._queue) |
| |
| async def is_empty(self) -> bool: |
| """Check if queue is empty.""" |
| async with self._lock: |
| return len(self._queue) == 0 |
|
|
|
|
| |
| message_queue = MessageQueue() |