#!/usr/bin/python # -*- coding: utf-8 -*- # Hive Appier Framework # Copyright (c) 2008-2024 Hive Solutions Lda. # # This file is part of Hive Appier Framework. # # Hive Appier Framework is free software: you can redistribute it and/or modify # it under the terms of the Apache License as published by the Apache # Foundation, either version 2.0 of the License, or (at your option) any # later version. # # Hive Appier Framework is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # Apache License for more details. # # You should have received a copy of the Apache License along with # Hive Appier Framework. If not, see . __author__ = "João Magalhães " """ The author(s) of the module """ __copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." """ The copyright for the module """ __license__ = "Apache License, Version 2.0" """ The license for the module """ import pickle import traceback import threading from . import config from . import legacy from . import redisdb from . import component from . import exceptions class Bus(component.Component): def __init__(self, name="bus", owner=None, *args, **kwargs): component.Component.__init__(self, name=name, owner=owner, *args, **kwargs) load = kwargs.pop("load", True) if load: self.load(*args, **kwargs) @classmethod def new(cls, *args, **kwargs): return cls(*args, **kwargs) def bind(self, name, method): raise exceptions.NotImplementedError() def unbind(self, name, method=None): raise exceptions.NotImplementedError() def trigger(self, name, *args, **kwargs): raise exceptions.NotImplementedError() class MemoryBus(Bus): def __init__(self, name="memory", owner=None, *args, **kwargs): Bus.__init__(self, name=name, owner=owner, *args, **kwargs) def bind(self, name, method): methods = self._events.get(name, []) methods.append(method) self._events[name] = methods def unbind(self, name, method=None): methods = self._events.get(name, []) if method: methods.remove(method) else: del methods[:] def trigger(self, name, *args, **kwargs): methods = self._events.get(name, []) for method in methods: method(*args, **kwargs) def _load(self, *args, **kwargs): Bus._load(self, *args, **kwargs) self._events = dict() def _unload(self, *args, **kwargs): Bus._unload(self, *args, **kwargs) self._events = None def _get_state(self): state = Bus._get_state(self) state.update(_events=self._events) return state def _set_state(self, state): Bus._set_state(self, state) _events = state.get("_events", {}) for name, methods in legacy.iteritems(_events): for method in methods: self.bind(name, method) class RedisBus(Bus): SERIALIZER = pickle """ The serializer to be used for the values contained in the bus (used on top of the class) """ GLOBAL_CHANNEL = "global" """ The name of the global channel to which all of the agents should be subscribed, for global communication """ def __init__(self, name="redis", owner=None, *args, **kwargs): Bus.__init__(self, name=name, owner=owner, *args, **kwargs) self._delay = kwargs.pop("delay", True) def bind(self, name, method): methods = self._events.get(name, []) methods.append(method) self._events[name] = methods channel = self._to_channel(name) self._pubsub.subscribe(channel) def unbind(self, name, method=None): methods = self._events.get(name, []) if method: methods.remove(method) else: del methods[:] channel = self._to_channel(name) self._pubsub.unsubscribe(channel) def trigger(self, name, *args, **kwargs): channel = self._to_channel(name) data = self._serializer.dumps(dict(args=args, kwargs=kwargs)) self._redis.publish(channel, data) def _load(self, *args, **kwargs): Bus._load(self, *args, **kwargs) self._name = config.conf("BUS_NAME", self.owner.name_i) self._name = config.conf("BUS_SCOPE", self._name) self._name = kwargs.pop("name", self._name) self._serializer = kwargs.pop("serializer", self.__class__.SERIALIZER) self._global_channel = kwargs.pop( "global_channel", self.__class__.GLOBAL_CHANNEL ) self._events = dict() self._open() def _unload(self, *args, **kwargs): Bus._unload(self, *args, **kwargs) self._events = None self._close() def _get_state(self): state = Bus._get_state(self) state.update(_events=self._events) return state def _set_state(self, state): Bus._set_state(self, state) _events = state.get("_events", {}) for name, methods in legacy.iteritems(_events): for method in methods: self.bind(name, method) def _open(self): cls = self.__class__ channel = self._to_channel(self._global_channel) self._redis = redisdb.get_connection() self._redis.ping() self._pubsub = self._redis.pubsub() self._pubsub.subscribe(channel) self._listener = RedisListener(self) self._listener.start() def _close(self): self._pubsub.unsubscribe() self._listener.join() self._redis = None self._pubsub = None self._listener = None def _loop(self, safe=True): for item in self._pubsub.listen(): try: if not self.loaded: break self._tick(item, safe=safe) except Exception as exception: self.logger.critical("Unhandled Redis loop exception raised") self.logger.error(exception) lines = traceback.format_exc().splitlines() for line in lines: self.logger.warning(line) def _tick(self, item, safe=True): channel = item.get("channel", None) channel = legacy.str(channel) type = item.get("type", None) data = item.get("data", None) if not type in ("message",): return if ":" in channel: _prefix, name = channel.split(":", 1) else: name = channel data = self._serializer.loads(data) methods = self._events.get(name, []) if self._events else [] for method in methods: if safe: self.owner.schedule( method, args=data["args"], kwargs=data["kwargs"], timeout=-1, safe=True, ) else: method(*data["args"], **data["kwargs"]) def _to_channel(self, name): return self._name + ":" + name class RedisListener(threading.Thread): def __init__(self, bus): threading.Thread.__init__(self, name="RedisListener") self.daemon = True self._bus = bus def run(self): self._bus._loop()