Spaces:
Sleeping
Sleeping
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022) | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import annotations | |
import asyncio | |
import time | |
import traceback | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from typing import TYPE_CHECKING, Awaitable, Dict, NamedTuple, Optional, Tuple, Type | |
from typing_extensions import Final | |
from streamlit import config | |
from streamlit.logger import get_logger | |
from streamlit.proto.BackMsg_pb2 import BackMsg | |
from streamlit.proto.ForwardMsg_pb2 import ForwardMsg | |
from streamlit.runtime.app_session import AppSession | |
from streamlit.runtime.caching import ( | |
get_data_cache_stats_provider, | |
get_resource_cache_stats_provider, | |
) | |
from streamlit.runtime.caching.storage.local_disk_cache_storage import ( | |
LocalDiskCacheStorageManager, | |
) | |
from streamlit.runtime.forward_msg_cache import ( | |
ForwardMsgCache, | |
create_reference_msg, | |
populate_hash_if_needed, | |
) | |
from streamlit.runtime.legacy_caching.caching import _mem_caches | |
from streamlit.runtime.media_file_manager import MediaFileManager | |
from streamlit.runtime.media_file_storage import MediaFileStorage | |
from streamlit.runtime.memory_session_storage import MemorySessionStorage | |
from streamlit.runtime.runtime_util import is_cacheable_msg | |
from streamlit.runtime.script_data import ScriptData | |
from streamlit.runtime.scriptrunner.script_cache import ScriptCache | |
from streamlit.runtime.session_manager import ( | |
ActiveSessionInfo, | |
SessionClient, | |
SessionClientDisconnectedError, | |
SessionManager, | |
SessionStorage, | |
) | |
from streamlit.runtime.state import ( | |
SCRIPT_RUN_WITHOUT_ERRORS_KEY, | |
SessionStateStatProvider, | |
) | |
from streamlit.runtime.stats import StatsManager | |
from streamlit.runtime.uploaded_file_manager import UploadedFileManager | |
from streamlit.runtime.websocket_session_manager import WebsocketSessionManager | |
from streamlit.watcher import LocalSourcesWatcher | |
if TYPE_CHECKING: | |
from streamlit.runtime.caching.storage import CacheStorageManager | |
# Wait for the script run result for 60s and if no result is available give up | |
SCRIPT_RUN_CHECK_TIMEOUT: Final = 60 | |
LOGGER: Final = get_logger(__name__) | |
class RuntimeStoppedError(Exception): | |
"""Raised by operations on a Runtime instance that is stopped.""" | |
class RuntimeConfig: | |
"""Config options for StreamlitRuntime.""" | |
# The filesystem path of the Streamlit script to run. | |
script_path: str | |
# The (optional) command line that Streamlit was started with | |
# (e.g. "streamlit run app.py") | |
command_line: Optional[str] | |
# The storage backend for Streamlit's MediaFileManager. | |
media_file_storage: MediaFileStorage | |
# The upload file manager | |
uploaded_file_manager: UploadedFileManager | |
# The cache storage backend for Streamlit's st.cache_data. | |
cache_storage_manager: CacheStorageManager = field( | |
default_factory=LocalDiskCacheStorageManager | |
) | |
# The SessionManager class to be used. | |
session_manager_class: Type[SessionManager] = WebsocketSessionManager | |
# The SessionStorage instance for the SessionManager to use. | |
session_storage: SessionStorage = field(default_factory=MemorySessionStorage) | |
class RuntimeState(Enum): | |
INITIAL = "INITIAL" | |
NO_SESSIONS_CONNECTED = "NO_SESSIONS_CONNECTED" | |
ONE_OR_MORE_SESSIONS_CONNECTED = "ONE_OR_MORE_SESSIONS_CONNECTED" | |
STOPPING = "STOPPING" | |
STOPPED = "STOPPED" | |
class AsyncObjects(NamedTuple): | |
"""Container for all asyncio objects that Runtime manages. | |
These cannot be initialized until the Runtime's eventloop is assigned. | |
""" | |
# The eventloop that Runtime is running on. | |
eventloop: asyncio.AbstractEventLoop | |
# Set after Runtime.stop() is called. Never cleared. | |
must_stop: asyncio.Event | |
# Set when a client connects; cleared when we have no connected clients. | |
has_connection: asyncio.Event | |
# Set after a ForwardMsg is enqueued; cleared when we flush ForwardMsgs. | |
need_send_data: asyncio.Event | |
# Completed when the Runtime has started. | |
started: asyncio.Future[None] | |
# Completed when the Runtime has stopped. | |
stopped: asyncio.Future[None] | |
class Runtime: | |
_instance: Optional[Runtime] = None | |
def instance(cls) -> Runtime: | |
"""Return the singleton Runtime instance. Raise an Error if the | |
Runtime hasn't been created yet. | |
""" | |
if cls._instance is None: | |
raise RuntimeError("Runtime hasn't been created!") | |
return cls._instance | |
def exists(cls) -> bool: | |
"""True if the singleton Runtime instance has been created. | |
When a Streamlit app is running in "raw mode" - that is, when the | |
app is run via `python app.py` instead of `streamlit run app.py` - | |
the Runtime will not exist, and various Streamlit functions need | |
to adapt. | |
""" | |
return cls._instance is not None | |
def __init__(self, config: RuntimeConfig): | |
"""Create a Runtime instance. It won't be started yet. | |
Runtime is *not* thread-safe. Its public methods are generally | |
safe to call only on the same thread that its event loop runs on. | |
Parameters | |
---------- | |
config | |
Config options. | |
""" | |
if Runtime._instance is not None: | |
raise RuntimeError("Runtime instance already exists!") | |
Runtime._instance = self | |
# Will be created when we start. | |
self._async_objs: Optional[AsyncObjects] = None | |
# The task that runs our main loop. We need to save a reference | |
# to it so that it doesn't get garbage collected while running. | |
self._loop_coroutine_task: Optional[asyncio.Task[None]] = None | |
self._main_script_path = config.script_path | |
self._command_line = config.command_line or "" | |
self._state = RuntimeState.INITIAL | |
# Initialize managers | |
self._message_cache = ForwardMsgCache() | |
self._uploaded_file_mgr = config.uploaded_file_manager | |
self._media_file_mgr = MediaFileManager(storage=config.media_file_storage) | |
self._cache_storage_manager = config.cache_storage_manager | |
self._script_cache = ScriptCache() | |
self._session_mgr = config.session_manager_class( | |
session_storage=config.session_storage, | |
uploaded_file_manager=self._uploaded_file_mgr, | |
script_cache=self._script_cache, | |
message_enqueued_callback=self._enqueued_some_message, | |
) | |
self._stats_mgr = StatsManager() | |
self._stats_mgr.register_provider(get_data_cache_stats_provider()) | |
self._stats_mgr.register_provider(get_resource_cache_stats_provider()) | |
self._stats_mgr.register_provider(_mem_caches) | |
self._stats_mgr.register_provider(self._message_cache) | |
self._stats_mgr.register_provider(self._uploaded_file_mgr) | |
self._stats_mgr.register_provider(SessionStateStatProvider(self._session_mgr)) | |
def state(self) -> RuntimeState: | |
return self._state | |
def message_cache(self) -> ForwardMsgCache: | |
return self._message_cache | |
def uploaded_file_mgr(self) -> UploadedFileManager: | |
return self._uploaded_file_mgr | |
def cache_storage_manager(self) -> CacheStorageManager: | |
return self._cache_storage_manager | |
def media_file_mgr(self) -> MediaFileManager: | |
return self._media_file_mgr | |
def stats_mgr(self) -> StatsManager: | |
return self._stats_mgr | |
def stopped(self) -> Awaitable[None]: | |
"""A Future that completes when the Runtime's run loop has exited.""" | |
return self._get_async_objs().stopped | |
# NOTE: A few Runtime methods listed as threadsafe (get_client and | |
# is_active_session) currently rely on the implementation detail that | |
# WebsocketSessionManager's get_active_session_info and is_active_session methods | |
# happen to be threadsafe. This may change with future SessionManager implementations, | |
# at which point we'll need to formalize our thread safety rules for each | |
# SessionManager method. | |
def get_client(self, session_id: str) -> Optional[SessionClient]: | |
"""Get the SessionClient for the given session_id, or None | |
if no such session exists. | |
Notes | |
----- | |
Threading: SAFE. May be called on any thread. | |
""" | |
session_info = self._session_mgr.get_active_session_info(session_id) | |
if session_info is None: | |
return None | |
return session_info.client | |
async def start(self) -> None: | |
"""Start the runtime. This must be called only once, before | |
any other functions are called. | |
When this coroutine returns, Streamlit is ready to accept new sessions. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
# Create our AsyncObjects. We need to have a running eventloop to | |
# instantiate our various synchronization primitives. | |
async_objs = AsyncObjects( | |
eventloop=asyncio.get_running_loop(), | |
must_stop=asyncio.Event(), | |
has_connection=asyncio.Event(), | |
need_send_data=asyncio.Event(), | |
started=asyncio.Future(), | |
stopped=asyncio.Future(), | |
) | |
self._async_objs = async_objs | |
self._loop_coroutine_task = asyncio.create_task( | |
self._loop_coroutine(), name="Runtime.loop_coroutine" | |
) | |
await async_objs.started | |
def stop(self) -> None: | |
"""Request that Streamlit close all sessions and stop running. | |
Note that Streamlit won't stop running immediately. | |
Notes | |
----- | |
Threading: SAFE. May be called from any thread. | |
""" | |
async_objs = self._get_async_objs() | |
def stop_on_eventloop(): | |
if self._state in (RuntimeState.STOPPING, RuntimeState.STOPPED): | |
return | |
LOGGER.debug("Runtime stopping...") | |
self._set_state(RuntimeState.STOPPING) | |
async_objs.must_stop.set() | |
async_objs.eventloop.call_soon_threadsafe(stop_on_eventloop) | |
def is_active_session(self, session_id: str) -> bool: | |
"""True if the session_id belongs to an active session. | |
Notes | |
----- | |
Threading: SAFE. May be called on any thread. | |
""" | |
return self._session_mgr.is_active_session(session_id) | |
def connect_session( | |
self, | |
client: SessionClient, | |
user_info: Dict[str, Optional[str]], | |
existing_session_id: Optional[str] = None, | |
session_id_override: Optional[str] = None, | |
) -> str: | |
"""Create a new session (or connect to an existing one) and return its unique ID. | |
Parameters | |
---------- | |
client | |
A concrete SessionClient implementation for communicating with | |
the session's client. | |
user_info | |
A dict that contains information about the session's user. For now, | |
it only (optionally) contains the user's email address. | |
{ | |
"email": "example@example.com" | |
} | |
existing_session_id | |
The ID of an existing session to reconnect to. If one is not provided, a new | |
session is created. Note that whether the Runtime's SessionManager supports | |
reconnecting to an existing session depends on the SessionManager that this | |
runtime is configured with. | |
session_id_override | |
The ID to assign to a new session being created with this method. Setting | |
this can be useful when the service that a Streamlit Runtime is running in | |
wants to tie the lifecycle of a Streamlit session to some other session-like | |
object that it manages. Only one of existing_session_id and | |
session_id_override should be set. | |
Returns | |
------- | |
str | |
The session's unique string ID. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
assert not ( | |
existing_session_id and session_id_override | |
), "Only one of existing_session_id and session_id_override should be set!" | |
if self._state in (RuntimeState.STOPPING, RuntimeState.STOPPED): | |
raise RuntimeStoppedError(f"Can't connect_session (state={self._state})") | |
session_id = self._session_mgr.connect_session( | |
client=client, | |
script_data=ScriptData(self._main_script_path, self._command_line or ""), | |
user_info=user_info, | |
existing_session_id=existing_session_id, | |
session_id_override=session_id_override, | |
) | |
self._set_state(RuntimeState.ONE_OR_MORE_SESSIONS_CONNECTED) | |
self._get_async_objs().has_connection.set() | |
return session_id | |
def create_session( | |
self, | |
client: SessionClient, | |
user_info: Dict[str, Optional[str]], | |
existing_session_id: Optional[str] = None, | |
session_id_override: Optional[str] = None, | |
) -> str: | |
"""Create a new session (or connect to an existing one) and return its unique ID. | |
Notes | |
----- | |
This method is simply an alias for connect_session added for backwards | |
compatibility. | |
""" | |
LOGGER.warning("create_session is deprecated! Use connect_session instead.") | |
return self.connect_session( | |
client=client, | |
user_info=user_info, | |
existing_session_id=existing_session_id, | |
session_id_override=session_id_override, | |
) | |
def close_session(self, session_id: str) -> None: | |
"""Close and completely shut down a session. | |
This differs from disconnect_session in that it always completely shuts down a | |
session, permanently losing any associated state (session state, uploaded files, | |
etc.). | |
This function may be called multiple times for the same session, | |
which is not an error. (Subsequent calls just no-op.) | |
Parameters | |
---------- | |
session_id | |
The session's unique ID. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
session_info = self._session_mgr.get_session_info(session_id) | |
if session_info: | |
self._message_cache.remove_refs_for_session(session_info.session) | |
self._session_mgr.close_session(session_id) | |
self._on_session_disconnected() | |
def disconnect_session(self, session_id: str) -> None: | |
"""Disconnect a session. It will stop producing ForwardMsgs. | |
Differs from close_session because disconnected sessions can be reconnected to | |
for a brief window (depending on the SessionManager/SessionStorage | |
implementations used by the runtime). | |
This function may be called multiple times for the same session, | |
which is not an error. (Subsequent calls just no-op.) | |
Parameters | |
---------- | |
session_id | |
The session's unique ID. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
session_info = self._session_mgr.get_active_session_info(session_id) | |
if session_info: | |
# NOTE: Ideally, we'd like to keep ForwardMsgCache refs for a session around | |
# when a session is disconnected (and defer their cleanup until the session | |
# is garbage collected), but this would be difficult to do as the | |
# ForwardMsgCache is not thread safe, and we have no guarantee that the | |
# garbage collector will only run on the eventloop thread. Because of this, | |
# we clean up refs now and accept the risk that we're deleting cache entries | |
# that will be useful once the browser tab reconnects. | |
self._message_cache.remove_refs_for_session(session_info.session) | |
self._session_mgr.disconnect_session(session_id) | |
self._on_session_disconnected() | |
def handle_backmsg(self, session_id: str, msg: BackMsg) -> None: | |
"""Send a BackMsg to an active session. | |
Parameters | |
---------- | |
session_id | |
The session's unique ID. | |
msg | |
The BackMsg to deliver to the session. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
if self._state in (RuntimeState.STOPPING, RuntimeState.STOPPED): | |
raise RuntimeStoppedError(f"Can't handle_backmsg (state={self._state})") | |
session_info = self._session_mgr.get_active_session_info(session_id) | |
if session_info is None: | |
LOGGER.debug( | |
"Discarding BackMsg for disconnected session (id=%s)", session_id | |
) | |
return | |
session_info.session.handle_backmsg(msg) | |
def handle_backmsg_deserialization_exception( | |
self, session_id: str, exc: BaseException | |
) -> None: | |
"""Handle an Exception raised during deserialization of a BackMsg. | |
Parameters | |
---------- | |
session_id | |
The session's unique ID. | |
exc | |
The Exception. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
if self._state in (RuntimeState.STOPPING, RuntimeState.STOPPED): | |
raise RuntimeStoppedError( | |
f"Can't handle_backmsg_deserialization_exception (state={self._state})" | |
) | |
session_info = self._session_mgr.get_active_session_info(session_id) | |
if session_info is None: | |
LOGGER.debug( | |
"Discarding BackMsg Exception for disconnected session (id=%s)", | |
session_id, | |
) | |
return | |
session_info.session.handle_backmsg_exception(exc) | |
async def is_ready_for_browser_connection(self) -> Tuple[bool, str]: | |
if self._state not in ( | |
RuntimeState.INITIAL, | |
RuntimeState.STOPPING, | |
RuntimeState.STOPPED, | |
): | |
return True, "ok" | |
return False, "unavailable" | |
async def does_script_run_without_error(self) -> Tuple[bool, str]: | |
"""Load and execute the app's script to verify it runs without an error. | |
Returns | |
------- | |
(True, "ok") if the script completes without error, or (False, err_msg) | |
if the script raises an exception. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
# NOTE: We create an AppSession directly here instead of using the | |
# SessionManager intentionally. This isn't a "real" session and is only being | |
# used to test that the script runs without error. | |
session = AppSession( | |
script_data=ScriptData(self._main_script_path, self._command_line), | |
uploaded_file_manager=self._uploaded_file_mgr, | |
script_cache=self._script_cache, | |
message_enqueued_callback=self._enqueued_some_message, | |
local_sources_watcher=LocalSourcesWatcher(self._main_script_path), | |
user_info={"email": "test@test.com"}, | |
) | |
try: | |
session.request_rerun(None) | |
now = time.perf_counter() | |
while ( | |
SCRIPT_RUN_WITHOUT_ERRORS_KEY not in session.session_state | |
and (time.perf_counter() - now) < SCRIPT_RUN_CHECK_TIMEOUT | |
): | |
await asyncio.sleep(0.1) | |
if SCRIPT_RUN_WITHOUT_ERRORS_KEY not in session.session_state: | |
return False, "timeout" | |
ok = session.session_state[SCRIPT_RUN_WITHOUT_ERRORS_KEY] | |
msg = "ok" if ok else "error" | |
return ok, msg | |
finally: | |
session.shutdown() | |
def _set_state(self, new_state: RuntimeState) -> None: | |
LOGGER.debug("Runtime state: %s -> %s", self._state, new_state) | |
self._state = new_state | |
async def _loop_coroutine(self) -> None: | |
"""The main Runtime loop. | |
This function won't exit until `stop` is called. | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
async_objs = self._get_async_objs() | |
try: | |
if self._state == RuntimeState.INITIAL: | |
self._set_state(RuntimeState.NO_SESSIONS_CONNECTED) | |
elif self._state == RuntimeState.ONE_OR_MORE_SESSIONS_CONNECTED: | |
pass | |
else: | |
raise RuntimeError(f"Bad Runtime state at start: {self._state}") | |
# Signal that we're started and ready to accept sessions | |
async_objs.started.set_result(None) | |
while not async_objs.must_stop.is_set(): | |
if self._state == RuntimeState.NO_SESSIONS_CONNECTED: # type: ignore[comparison-overlap] | |
# mypy 1.4 incorrectly thinks this if-clause is unreachable, | |
# because it thinks self._state must be INITIAL | ONE_OR_MORE_SESSIONS_CONNECTED. | |
await asyncio.wait( # type: ignore[unreachable] | |
( | |
asyncio.create_task(async_objs.must_stop.wait()), | |
asyncio.create_task(async_objs.has_connection.wait()), | |
), | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
elif self._state == RuntimeState.ONE_OR_MORE_SESSIONS_CONNECTED: | |
async_objs.need_send_data.clear() | |
for active_session_info in self._session_mgr.list_active_sessions(): | |
msg_list = active_session_info.session.flush_browser_queue() | |
for msg in msg_list: | |
try: | |
self._send_message(active_session_info, msg) | |
except SessionClientDisconnectedError: | |
self._session_mgr.disconnect_session( | |
active_session_info.session.id | |
) | |
# Yield for a tick after sending a message. | |
await asyncio.sleep(0) | |
# Yield for a few milliseconds between session message | |
# flushing. | |
await asyncio.sleep(0.01) | |
else: | |
# Break out of the thread loop if we encounter any other state. | |
break | |
await asyncio.wait( | |
( | |
asyncio.create_task(async_objs.must_stop.wait()), | |
asyncio.create_task(async_objs.need_send_data.wait()), | |
), | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
# Shut down all AppSessions. | |
for session_info in self._session_mgr.list_sessions(): | |
# NOTE: We want to fully shut down sessions when the runtime stops for | |
# now, but this may change in the future if/when our notion of a session | |
# is no longer so tightly coupled to a browser tab. | |
self._session_mgr.close_session(session_info.session.id) | |
self._set_state(RuntimeState.STOPPED) | |
async_objs.stopped.set_result(None) | |
except Exception as e: | |
async_objs.stopped.set_exception(e) | |
traceback.print_exc() | |
LOGGER.info( | |
""" | |
Please report this bug at https://github.com/streamlit/streamlit/issues. | |
""" | |
) | |
def _send_message(self, session_info: ActiveSessionInfo, msg: ForwardMsg) -> None: | |
"""Send a message to a client. | |
If the client is likely to have already cached the message, we may | |
instead send a "reference" message that contains only the hash of the | |
message. | |
Parameters | |
---------- | |
session_info : ActiveSessionInfo | |
The ActiveSessionInfo associated with websocket | |
msg : ForwardMsg | |
The message to send to the client | |
Notes | |
----- | |
Threading: UNSAFE. Must be called on the eventloop thread. | |
""" | |
msg.metadata.cacheable = is_cacheable_msg(msg) | |
msg_to_send = msg | |
if msg.metadata.cacheable: | |
populate_hash_if_needed(msg) | |
if self._message_cache.has_message_reference( | |
msg, session_info.session, session_info.script_run_count | |
): | |
# This session has probably cached this message. Send | |
# a reference instead. | |
LOGGER.debug("Sending cached message ref (hash=%s)", msg.hash) | |
msg_to_send = create_reference_msg(msg) | |
# Cache the message so it can be referenced in the future. | |
# If the message is already cached, this will reset its | |
# age. | |
LOGGER.debug("Caching message (hash=%s)", msg.hash) | |
self._message_cache.add_message( | |
msg, session_info.session, session_info.script_run_count | |
) | |
# If this was a `script_finished` message, we increment the | |
# script_run_count for this session, and update the cache | |
if ( | |
msg.WhichOneof("type") == "script_finished" | |
and msg.script_finished == ForwardMsg.FINISHED_SUCCESSFULLY | |
): | |
LOGGER.debug( | |
"Script run finished successfully; " | |
"removing expired entries from MessageCache " | |
"(max_age=%s)", | |
config.get_option("global.maxCachedMessageAge"), | |
) | |
session_info.script_run_count += 1 | |
self._message_cache.remove_expired_entries_for_session( | |
session_info.session, session_info.script_run_count | |
) | |
# Ship it off! | |
session_info.client.write_forward_msg(msg_to_send) | |
def _enqueued_some_message(self) -> None: | |
"""Callback called by AppSession after the AppSession has enqueued a | |
message. Sets the "needs_send_data" event, which causes our core | |
loop to wake up and flush client message queues. | |
Notes | |
----- | |
Threading: SAFE. May be called on any thread. | |
""" | |
async_objs = self._get_async_objs() | |
async_objs.eventloop.call_soon_threadsafe(async_objs.need_send_data.set) | |
def _get_async_objs(self) -> AsyncObjects: | |
"""Return our AsyncObjects instance. If the Runtime hasn't been | |
started, this will raise an error. | |
""" | |
if self._async_objs is None: | |
raise RuntimeError("Runtime hasn't started yet!") | |
return self._async_objs | |
def _on_session_disconnected(self) -> None: | |
"""Set the runtime state to NO_SESSIONS_CONNECTED if the last active | |
session was disconnected. | |
""" | |
if ( | |
self._state == RuntimeState.ONE_OR_MORE_SESSIONS_CONNECTED | |
and self._session_mgr.num_active_sessions() == 0 | |
): | |
self._get_async_objs().has_connection.clear() | |
self._set_state(RuntimeState.NO_SESSIONS_CONNECTED) | |