Spaces:
Sleeping
Sleeping
""" | |
Custom Logger that handles batching logic | |
Use this if you want your logs to be stored in memory and flushed periodically. | |
""" | |
import asyncio | |
import time | |
from typing import List, Optional | |
import litellm | |
from litellm._logging import verbose_logger | |
from litellm.integrations.custom_logger import CustomLogger | |
class CustomBatchLogger(CustomLogger): | |
def __init__( | |
self, | |
flush_lock: Optional[asyncio.Lock] = None, | |
batch_size: Optional[int] = None, | |
flush_interval: Optional[int] = None, | |
**kwargs, | |
) -> None: | |
""" | |
Args: | |
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching | |
""" | |
self.log_queue: List = [] | |
self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS | |
self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE | |
self.last_flush_time = time.time() | |
self.flush_lock = flush_lock | |
super().__init__(**kwargs) | |
async def periodic_flush(self): | |
while True: | |
await asyncio.sleep(self.flush_interval) | |
verbose_logger.debug( | |
f"CustomLogger periodic flush after {self.flush_interval} seconds" | |
) | |
await self.flush_queue() | |
async def flush_queue(self): | |
if self.flush_lock is None: | |
return | |
async with self.flush_lock: | |
if self.log_queue: | |
verbose_logger.debug( | |
"CustomLogger: Flushing batch of %s events", len(self.log_queue) | |
) | |
await self.async_send_batch() | |
self.log_queue.clear() | |
self.last_flush_time = time.time() | |
async def async_send_batch(self, *args, **kwargs): | |
pass | |