Spaces:
No application file
No application file
| """Default classes for Comm and CommManager, for usage in IPython. | |
| """ | |
| # Copyright (c) IPython Development Team. | |
| # Distributed under the terms of the Modified BSD License. | |
| import logging | |
| import uuid | |
| from traitlets.utils.importstring import import_item | |
| import comm | |
| logger = logging.getLogger("Comm") | |
| class BaseComm: | |
| """Class for communicating between a Frontend and a Kernel | |
| Must be subclassed with a publish_msg method implementation which | |
| sends comm messages through the iopub channel. | |
| """ | |
| def __init__( | |
| self, | |
| target_name="comm", | |
| data=None, | |
| metadata=None, | |
| buffers=None, | |
| comm_id=None, | |
| primary=True, | |
| target_module=None, | |
| topic=None, | |
| _open_data=None, | |
| _close_data=None, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.comm_id = comm_id if comm_id else uuid.uuid4().hex | |
| self.primary = primary | |
| self.target_name = target_name | |
| self.target_module = target_module | |
| self.topic = topic if topic else ("comm-%s" % self.comm_id).encode("ascii") | |
| self._open_data = _open_data if _open_data else {} | |
| self._close_data = _close_data if _close_data else {} | |
| self._msg_callback = None | |
| self._close_callback = None | |
| self._closed = True | |
| if self.primary: | |
| # I am primary, open my peer. | |
| self.open(data=data, metadata=metadata, buffers=buffers) | |
| else: | |
| self._closed = False | |
| def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys): | |
| raise NotImplementedError("publish_msg Comm method is not implemented") | |
| def __del__(self): | |
| """trigger close on gc""" | |
| self.close(deleting=True) | |
| # publishing messages | |
| def open(self, data=None, metadata=None, buffers=None): # noqa | |
| """Open the frontend-side version of this comm""" | |
| if data is None: | |
| data = self._open_data | |
| comm_manager = comm.get_comm_manager() | |
| if comm_manager is None: | |
| raise RuntimeError("Comms cannot be opened without a comm_manager.") | |
| comm_manager.register_comm(self) | |
| try: | |
| self.publish_msg( | |
| "comm_open", | |
| data=data, | |
| metadata=metadata, | |
| buffers=buffers, | |
| target_name=self.target_name, | |
| target_module=self.target_module, | |
| ) | |
| self._closed = False | |
| except Exception: | |
| comm_manager.unregister_comm(self) | |
| raise | |
| def close(self, data=None, metadata=None, buffers=None, deleting=False): | |
| """Close the frontend-side version of this comm""" | |
| if self._closed: | |
| # only close once | |
| return | |
| self._closed = True | |
| if data is None: | |
| data = self._close_data | |
| self.publish_msg( | |
| "comm_close", | |
| data=data, | |
| metadata=metadata, | |
| buffers=buffers, | |
| ) | |
| if not deleting: | |
| # If deleting, the comm can't be registered | |
| comm.get_comm_manager().unregister_comm(self) | |
| def send(self, data=None, metadata=None, buffers=None): | |
| """Send a message to the frontend-side version of this comm""" | |
| self.publish_msg( | |
| "comm_msg", | |
| data=data, | |
| metadata=metadata, | |
| buffers=buffers, | |
| ) | |
| # registering callbacks | |
| def on_close(self, callback): | |
| """Register a callback for comm_close | |
| Will be called with the `data` of the close message. | |
| Call `on_close(None)` to disable an existing callback. | |
| """ | |
| self._close_callback = callback | |
| def on_msg(self, callback): | |
| """Register a callback for comm_msg | |
| Will be called with the `data` of any comm_msg messages. | |
| Call `on_msg(None)` to disable an existing callback. | |
| """ | |
| self._msg_callback = callback | |
| # handling of incoming messages | |
| def handle_close(self, msg): | |
| """Handle a comm_close message""" | |
| logger.debug("handle_close[%s](%s)", self.comm_id, msg) | |
| if self._close_callback: | |
| self._close_callback(msg) | |
| def handle_msg(self, msg): | |
| """Handle a comm_msg message""" | |
| logger.debug("handle_msg[%s](%s)", self.comm_id, msg) | |
| if self._msg_callback: | |
| from IPython import get_ipython # type:ignore | |
| shell = get_ipython() | |
| if shell: | |
| shell.events.trigger("pre_execute") | |
| self._msg_callback(msg) | |
| if shell: | |
| shell.events.trigger("post_execute") | |
| class CommManager: | |
| """Default CommManager singleton implementation for Comms in the Kernel""" | |
| # Public APIs | |
| def __init__(self): | |
| self.comms = {} | |
| self.targets = {} | |
| def register_target(self, target_name, f): | |
| """Register a callable f for a given target name | |
| f will be called with two arguments when a comm_open message is received with `target`: | |
| - the Comm instance | |
| - the `comm_open` message itself. | |
| f can be a Python callable or an import string for one. | |
| """ | |
| if isinstance(f, str): | |
| f = import_item(f) | |
| self.targets[target_name] = f | |
| def unregister_target(self, target_name, f): | |
| """Unregister a callable registered with register_target""" | |
| return self.targets.pop(target_name) | |
| def register_comm(self, comm): | |
| """Register a new comm""" | |
| comm_id = comm.comm_id | |
| self.comms[comm_id] = comm | |
| return comm_id | |
| def unregister_comm(self, comm): | |
| """Unregister a comm, and close its counterpart""" | |
| # unlike get_comm, this should raise a KeyError | |
| comm = self.comms.pop(comm.comm_id) | |
| def get_comm(self, comm_id): | |
| """Get a comm with a particular id | |
| Returns the comm if found, otherwise None. | |
| This will not raise an error, | |
| it will log messages if the comm cannot be found. | |
| """ | |
| try: | |
| return self.comms[comm_id] | |
| except KeyError: | |
| logger.warning("No such comm: %s", comm_id) | |
| if logger.isEnabledFor(logging.DEBUG): | |
| # don't create the list of keys if debug messages aren't enabled | |
| logger.debug("Current comms: %s", list(self.comms.keys())) | |
| # Message handlers | |
| def comm_open(self, stream, ident, msg): | |
| """Handler for comm_open messages""" | |
| from comm import create_comm | |
| content = msg["content"] | |
| comm_id = content["comm_id"] | |
| target_name = content["target_name"] | |
| f = self.targets.get(target_name, None) | |
| comm = create_comm( | |
| comm_id=comm_id, | |
| primary=False, | |
| target_name=target_name, | |
| ) | |
| self.register_comm(comm) | |
| if f is None: | |
| logger.error("No such comm target registered: %s", target_name) | |
| else: | |
| try: | |
| f(comm, msg) | |
| return | |
| except Exception: | |
| logger.error("Exception opening comm with target: %s", target_name, exc_info=True) | |
| # Failure. | |
| try: | |
| comm.close() | |
| except Exception: | |
| logger.error( | |
| """Could not close comm during `comm_open` failure | |
| clean-up. The comm may not have been opened yet.""", | |
| exc_info=True, | |
| ) | |
| def comm_msg(self, stream, ident, msg): | |
| """Handler for comm_msg messages""" | |
| content = msg["content"] | |
| comm_id = content["comm_id"] | |
| comm = self.get_comm(comm_id) | |
| if comm is None: | |
| return | |
| try: | |
| comm.handle_msg(msg) | |
| except Exception: | |
| logger.error("Exception in comm_msg for %s", comm_id, exc_info=True) | |
| def comm_close(self, stream, ident, msg): | |
| """Handler for comm_close messages""" | |
| content = msg["content"] | |
| comm_id = content["comm_id"] | |
| comm = self.get_comm(comm_id) | |
| if comm is None: | |
| return | |
| self.comms[comm_id]._closed = True | |
| del self.comms[comm_id] | |
| try: | |
| comm.handle_close(msg) | |
| except Exception: | |
| logger.error("Exception in comm_close for %s", comm_id, exc_info=True) | |
| __all__ = ["CommManager", "BaseComm"] | |