lemesdaniel's picture
Upload folder using huggingface_hub
e00b837 verified
raw
history blame contribute delete
No virus
3.73 kB
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see <http://www.apache.org/licenses/>.
__author__ = "João Magalhães <joamag@hive.pt>"
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
from . import util
from . import config
from . import legacy
from . import exceptions
try:
import pika
except ImportError:
pika = None
URL = "amqp://guest:guest@localhost"
""" The default URL to be used for the connection when
no other URL is provided (used most of the times) """
TIMEOUT = 100
""" The time the retrieval of a connection waits before
returning this avoid possible problems with the current
implementation of the blocking client """
connection = None
""" The global wide connection to the AMQP server
that is meant to be used across sessions """
class AMQP(object):
def __init__(self, url=None):
self.url = url
self._connection = None
def get_connection(self, url=None, timeout=TIMEOUT):
if self._connection:
return self._connection
url_c = config.conf("AMQP_URL", None)
url_c = config.conf("CLOUDAMQP_URL", url_c)
url_c = config.conf("RABBITMQ_URL", url_c)
url = url or self.url or url_c or URL
url_p = legacy.urlparse(url)
username = "guest" if url_p.username == None else url_p.username
password = "guest" if url_p.password == None else url_p.password
parameters = _pika().ConnectionParameters(
host=url_p.hostname,
virtual_host=url_p.path or "/",
credentials=_pika().PlainCredentials(username, password),
)
parameters.socket_timeout = timeout
self._connection = _pika().BlockingConnection(parameters)
self._connection = _set_fixes(self._connection)
return self._connection
def get_connection(url=URL, timeout=TIMEOUT):
global connection
url = config.conf("AMQP_URL", url)
url = config.conf("CLOUDAMQP_URL", url)
url = config.conf("RABBITMQ_URL", url)
url_p = legacy.urlparse(url)
parameters = _pika().ConnectionParameters(
host=url_p.hostname,
virtual_host=url_p.path or "/",
credentials=_pika().PlainCredentials(url_p.username, url_p.password),
)
parameters.socket_timeout = timeout
connection = _pika().BlockingConnection(parameters)
connection = _set_fixes(connection)
return connection
def properties(*args, **kwargs):
return _pika().BasicProperties(*args, **kwargs)
def _set_fixes(connection):
def disconnect():
connection.socket.close()
if not hasattr(connection, "disconnect"):
connection.disconnect = disconnect
return connection
def _pika(verify=True):
if verify:
util.verify(
not pika == None,
message="Pika library not available",
exception=exceptions.OperationalError,
)
return pika