#!/usr/bin/python # -*- coding: utf-8 -*- # Hive Appier Framework # Copyright (c) 2008-2024 Hive Solutions Lda. # # This file is part of Hive Appier Framework. # # Hive Appier Framework is free software: you can redistribute it and/or modify # it under the terms of the Apache License as published by the Apache # Foundation, either version 2.0 of the License, or (at your option) any # later version. # # Hive Appier Framework is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # Apache License for more details. # # You should have received a copy of the Apache License along with # Hive Appier Framework. If not, see . __author__ = "João Magalhães " """ The author(s) of the module """ __copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." """ The copyright for the module """ __license__ = "Apache License, Version 2.0" """ The license for the module """ from . import util from . import config from . import legacy from . import exceptions try: import pika except ImportError: pika = None URL = "amqp://guest:guest@localhost" """ The default URL to be used for the connection when no other URL is provided (used most of the times) """ TIMEOUT = 100 """ The time the retrieval of a connection waits before returning this avoid possible problems with the current implementation of the blocking client """ connection = None """ The global wide connection to the AMQP server that is meant to be used across sessions """ class AMQP(object): def __init__(self, url=None): self.url = url self._connection = None def get_connection(self, url=None, timeout=TIMEOUT): if self._connection: return self._connection url_c = config.conf("AMQP_URL", None) url_c = config.conf("CLOUDAMQP_URL", url_c) url_c = config.conf("RABBITMQ_URL", url_c) url = url or self.url or url_c or URL url_p = legacy.urlparse(url) username = "guest" if url_p.username == None else url_p.username password = "guest" if url_p.password == None else url_p.password parameters = _pika().ConnectionParameters( host=url_p.hostname, virtual_host=url_p.path or "/", credentials=_pika().PlainCredentials(username, password), ) parameters.socket_timeout = timeout self._connection = _pika().BlockingConnection(parameters) self._connection = _set_fixes(self._connection) return self._connection def get_connection(url=URL, timeout=TIMEOUT): global connection url = config.conf("AMQP_URL", url) url = config.conf("CLOUDAMQP_URL", url) url = config.conf("RABBITMQ_URL", url) url_p = legacy.urlparse(url) parameters = _pika().ConnectionParameters( host=url_p.hostname, virtual_host=url_p.path or "/", credentials=_pika().PlainCredentials(url_p.username, url_p.password), ) parameters.socket_timeout = timeout connection = _pika().BlockingConnection(parameters) connection = _set_fixes(connection) return connection def properties(*args, **kwargs): return _pika().BasicProperties(*args, **kwargs) def _set_fixes(connection): def disconnect(): connection.socket.close() if not hasattr(connection, "disconnect"): connection.disconnect = disconnect return connection def _pika(verify=True): if verify: util.verify( not pika == None, message="Pika library not available", exception=exceptions.OperationalError, ) return pika