import asyncio from .log import logger def create_event_loop() -> asyncio.AbstractEventLoop: """ Ensure that there is always an event loop available. This function tries to get the current event loop. If the current event loop is closed or does not exist, it creates a new event loop and sets it as the current event loop. Returns: asyncio.AbstractEventLoop: The current or newly created event loop. """ try: # Try to get the current event loop current_loop = asyncio.get_event_loop() if current_loop.is_closed(): raise RuntimeError("Event loop is closed.") return current_loop except RuntimeError: # If no event loop exists or it is closed, create a new one logger.info("Creating a new event loop in main thread.") new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) return new_loop