#!/usr/bin/python # -*- coding: utf-8 -*- # Hive Appier Framework # Copyright (c) 2008-2024 Hive Solutions Lda. # # This file is part of Hive Appier Framework. # # Hive Appier Framework is free software: you can redistribute it and/or modify # it under the terms of the Apache License as published by the Apache # Foundation, either version 2.0 of the License, or (at your option) any # later version. # # Hive Appier Framework is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # Apache License for more details. # # You should have received a copy of the Apache License along with # Hive Appier Framework. If not, see . __author__ = "João Magalhães " """ The author(s) of the module """ __copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." """ The copyright for the module """ __license__ = "Apache License, Version 2.0" """ The license for the module """ import inspect from . import legacy from . import async_old class AwaitWrapper(object): _is_generator = True def __init__(self, generator, generate=False): if generate: generator = self.generate(generator) self.generator = generator self.is_generator = legacy.is_generator(generator) def __await__(self): if self.is_generator: return self._await_generator() else: return self._await_basic() def __iter__(self): return self def __next__(self): return next(self.generator) def next(self): return self.__next__() def generate(self, value): yield value def _await_generator(self): value = yield from self.generator return value def _await_basic(self): return self.generator yield class CoroutineWrapper(object): def __init__(self, coroutine): self.coroutine = coroutine self._buffer = None def __iter__(self): return self def __next__(self): if self._buffer: return self._buffer.pop(0) return self.coroutine.send(None) def next(self): return self.__next__() def restore(self, value): if self._buffer == None: self._buffer = [] self._buffer.append(value) class AyncgenWrapper(object): def __init__(self, async_iter): self.async_iter = async_iter self.current = None self._buffer = None def __iter__(self): return self def __next__(self): if self._buffer: return self._buffer.pop(0) try: if self.current == None: self.current = self.async_iter.asend(None) try: return next(self.current) except StopIteration as exception: self.current = None if not exception.args: return None return exception.args[0] except StopAsyncIteration: # @UndefinedVariable raise StopIteration() def next(self): return self.__next__() def restore(self, value): if self._buffer == None: self._buffer = [] self._buffer.append(value) def await_wrap(generator): return AwaitWrapper(generator) def await_yield(value): return AwaitWrapper(value, generate=True) def ensure_generator(value): if legacy.is_generator(value): return True, value if hasattr(value, "_generator"): return True, value if hasattr(inspect, "iscoroutine") and inspect.iscoroutine( value ): # @UndefinedVariable return True, CoroutineWrapper(value) if hasattr(inspect, "isasyncgen") and inspect.isasyncgen( value ): # @UndefinedVariable return True, AyncgenWrapper(value) return False, value def is_coroutine(callable): if hasattr(callable, "_is_coroutine"): return True if hasattr(inspect, "iscoroutinefunction") and inspect.iscoroutinefunction( callable ): # @UndefinedVariable return True return False def is_coroutine_object(generator): if legacy.is_generator(generator): return True if hasattr(inspect, "iscoroutine") and inspect.iscoroutine( generator ): # @UndefinedVariable return True return False def is_coroutine_native(generator): if hasattr(inspect, "iscoroutine") and inspect.iscoroutine( generator ): # @UndefinedVariable return True return False def to_coroutine(callable, *args, **kwargs): # sets the original reference to the future variable, this # should never be the final result, otherwise error occurs future = None # yields the complete set of values from the generator created # by the call to the to coroutine converter of the callable for value in async_old.to_coroutine(callable, *args, **kwargs): yield value future = value # returns the final result, retrieving it from the future object # that has been returned from the coroutine generator function return future.result()