lemesdaniel's picture
Upload folder using huggingface_hub
e00b837 verified
raw
history blame contribute delete
No virus
7.83 kB
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see <http://www.apache.org/licenses/>.
__author__ = "João Magalhães <joamag@hive.pt>"
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import json
import uuid
import heapq
import functools
from . import amqp
from . import legacy
from . import exceptions
class Queue(object):
def length(self):
raise exceptions.NotImplementedError()
def clear(self):
raise exceptions.NotImplementedError()
def push(self, value, priority=None, identify=False):
raise exceptions.NotImplementedError()
def pop(self, block=True, full=False):
raise exceptions.NotImplementedError()
def subscribe(self, callback, full=False):
raise exceptions.NotImplementedError()
def loop(self):
raise exceptions.NotImplementedError()
def unloop(self):
raise exceptions.NotImplementedError()
def ack(self):
raise exceptions.NotImplementedError()
def nack(self):
raise exceptions.NotImplementedError()
def build_value(self, value, priority=None, identify=False, reverse=False):
if identify:
identifier = self.build_identifier()
else:
identifier = None
if priority and reverse:
priority *= -1
return (priority, identifier, value), identifier
def build_identifier(self):
return str(uuid.uuid4())
class MemoryQueue(Queue):
def __init__(self):
Queue.__init__(self)
self._queue = []
def length(self):
return len(self._queue)
def clear(self):
del self._queue[:]
def push(self, value, priority=None, identify=False):
value, identifier = self.build_value(
value, priority=priority, identify=identify, reverse=True
)
heapq.heappush(self._queue, value)
return identifier
def pop(self, block=True, full=False):
priority, identifier, value = heapq.heappop(self._queue)
return (priority, identifier, value) if full else value
class MultiprocessQueue(Queue):
def __init__(self):
try:
import queue
except ImportError:
import Queue as queue
Queue.__init__(self)
self._queue = queue.PriorityQueue()
def length(self):
return self._queue.qsize()
def clear(self):
try:
import queue
except ImportError:
import Queue as queue
self._queue = queue.PriorityQueue()
def push(self, value, priority=None, identify=False):
value, identifier = self.build_value(
value, priority=priority, identify=identify, reverse=True
)
self._queue.put(value)
return identifier
def pop(self, block=True, full=False):
priority, identifier, value = self._queue.get(block)
return (priority, identifier, value) if full else value
class AMQPQueue(Queue):
def __init__(
self,
url=None,
name="default",
durable=False,
max_priority=255,
encoder="pickle",
protocol=2,
encoding="utf-8",
amqp=None,
):
self.url = url
self.name = name
self.durable = durable
self.max_priority = max_priority
self.encoder = encoder
self.protocol = protocol
self.encoding = encoding
self.amqp = amqp
self._build()
def clear(self):
self.channel.queue_purge(queue=self.name)
def push(self, value, priority=None, identify=False):
value, identifier = self.build_value(
value, priority=priority, identify=identify, reverse=False
)
body = self._dump(value)
self._add_callback(
self.channel.basic_publish,
exchange="",
routing_key=self.name,
body=body,
properties=amqp.properties(delivery_mode=2, priority=value[0] or 0),
)
return identifier
def pop(self, block=True, full=False):
_method, _properties, body = self.channel.basic_get(
queue=self.name, auto_ack=False
)
priority, identifier, value = self._load(body)
return (priority, identifier, value) if full else value
def subscribe(
self,
callback,
full=False,
auto_ack=True,
exclusive=False,
consumer_tag=None,
arguments=None,
):
def handler(channel, method, properties, body):
priority, identifier, value = self._load(body)
result = (priority, identifier, value) if full else value
ack = lambda: self.ack(delivery_tag=method.delivery_tag)
nack = lambda: self.nack(delivery_tag=method.delivery_tag)
callback(result) if auto_ack else callback(result, ack, nack)
self.channel.basic_consume(
queue=self.name,
on_message_callback=handler,
auto_ack=auto_ack,
exclusive=exclusive,
consumer_tag=consumer_tag,
arguments=arguments,
)
def ack(self, delivery_tag=None):
self.channel.basic_ack(delivery_tag=delivery_tag)
def nack(self, delivery_tag=None):
self.channel.basic_nack(delivery_tag=delivery_tag)
def loop(self):
self.channel.start_consuming()
def unloop(self):
self.channel.stop_consuming()
def _dump(self, value):
return self._dumper(value)
def _dump_pickle(self, value):
return legacy.cPickle.dumps(value, protocol=self.protocol)
def _dump_json(self, value):
body = json.dumps(value)
return legacy.bytes(body, encoding=self.encoding)
def _load(self, body):
return self._loader(body)
def _load_pickle(self, body):
if legacy.PYTHON_3:
kwargs = dict(encoding="bytes")
else:
kwargs = dict()
return legacy.cPickle.loads(body, **kwargs)
def _load_json(self, body):
if legacy.is_bytes(body):
body = body.decode(self.encoding)
return json.loads(body)
def _build(self):
if not self.amqp:
self.amqp = amqp.AMQP(url=self.url)
self.connection = self.amqp.get_connection()
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.queue = self.channel.queue_declare(
queue=self.name,
durable=self.durable,
arguments={"x-max-priority": self.max_priority},
)
self._dumper = getattr(self, "_dump_" + self.encoder)
self._loader = getattr(self, "_load_" + self.encoder)
def _add_callback(self, callback, *args, **kwargs):
if hasattr(self.connection, "add_callback_threadsafe"):
self.connection.add_callback_threadsafe(
functools.partial(callback, *args, **kwargs)
)
else:
callback(*args, **kwargs)