File size: 3,734 Bytes
e00b837
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see <http://www.apache.org/licenses/>.

__author__ = "João Magalhães <joamag@hive.pt>"
""" The author(s) of the module """

__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """

__license__ = "Apache License, Version 2.0"
""" The license for the module """

from . import util
from . import config
from . import legacy
from . import exceptions

try:
    import pika
except ImportError:
    pika = None

URL = "amqp://guest:guest@localhost"
""" The default URL to be used for the connection when
no other URL is provided (used most of the times) """

TIMEOUT = 100
""" The time the retrieval of a connection waits before
returning this avoid possible problems with the current
implementation of the blocking client """

connection = None
""" The global wide connection to the AMQP server
that is meant to be used across sessions """


class AMQP(object):
    def __init__(self, url=None):
        self.url = url
        self._connection = None

    def get_connection(self, url=None, timeout=TIMEOUT):
        if self._connection:
            return self._connection
        url_c = config.conf("AMQP_URL", None)
        url_c = config.conf("CLOUDAMQP_URL", url_c)
        url_c = config.conf("RABBITMQ_URL", url_c)
        url = url or self.url or url_c or URL
        url_p = legacy.urlparse(url)
        username = "guest" if url_p.username == None else url_p.username
        password = "guest" if url_p.password == None else url_p.password
        parameters = _pika().ConnectionParameters(
            host=url_p.hostname,
            virtual_host=url_p.path or "/",
            credentials=_pika().PlainCredentials(username, password),
        )
        parameters.socket_timeout = timeout
        self._connection = _pika().BlockingConnection(parameters)
        self._connection = _set_fixes(self._connection)
        return self._connection


def get_connection(url=URL, timeout=TIMEOUT):
    global connection
    url = config.conf("AMQP_URL", url)
    url = config.conf("CLOUDAMQP_URL", url)
    url = config.conf("RABBITMQ_URL", url)
    url_p = legacy.urlparse(url)
    parameters = _pika().ConnectionParameters(
        host=url_p.hostname,
        virtual_host=url_p.path or "/",
        credentials=_pika().PlainCredentials(url_p.username, url_p.password),
    )
    parameters.socket_timeout = timeout
    connection = _pika().BlockingConnection(parameters)
    connection = _set_fixes(connection)
    return connection


def properties(*args, **kwargs):
    return _pika().BasicProperties(*args, **kwargs)


def _set_fixes(connection):
    def disconnect():
        connection.socket.close()

    if not hasattr(connection, "disconnect"):
        connection.disconnect = disconnect
    return connection


def _pika(verify=True):
    if verify:
        util.verify(
            not pika == None,
            message="Pika library not available",
            exception=exceptions.OperationalError,
        )
    return pika