Shyamnath's picture
Push core package and essential files
469eae6
raw
history blame
1.75 kB
"""
Base class for in memory buffer for database transactions
"""
import asyncio
from typing import Optional
from litellm._logging import verbose_proxy_logger
from litellm._service_logger import ServiceLogging
service_logger_obj = (
ServiceLogging()
) # used for tracking metrics for In memory buffer, redis buffer, pod lock manager
from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT, MAX_SIZE_IN_MEMORY_QUEUE
class BaseUpdateQueue:
"""Base class for in memory buffer for database transactions"""
def __init__(self):
self.update_queue = asyncio.Queue()
self.MAX_SIZE_IN_MEMORY_QUEUE = MAX_SIZE_IN_MEMORY_QUEUE
async def add_update(self, update):
"""Enqueue an update."""
verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update)
await self._emit_new_item_added_to_queue_event(
queue_size=self.update_queue.qsize()
)
async def flush_all_updates_from_in_memory_queue(self):
"""Get all updates from the queue."""
updates = []
while not self.update_queue.empty():
# Circuit breaker to ensure we're not stuck dequeuing updates. Protect CPU utilization
if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT:
verbose_proxy_logger.warning(
"Max in memory queue flush count reached, stopping flush"
)
break
updates.append(await self.update_queue.get())
return updates
async def _emit_new_item_added_to_queue_event(
self,
queue_size: Optional[int] = None,
):
"""placeholder, emit event when a new item is added to the queue"""
pass