Spaces:
Runtime error
Runtime error
File size: 1,912 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
import functools
from types import coroutine
def delegate_to_executor(*attrs):
def cls_builder(cls):
for attr_name in attrs:
setattr(cls, attr_name, _make_delegate_method(attr_name))
return cls
return cls_builder
def proxy_method_directly(*attrs):
def cls_builder(cls):
for attr_name in attrs:
setattr(cls, attr_name, _make_proxy_method(attr_name))
return cls
return cls_builder
def proxy_property_directly(*attrs):
def cls_builder(cls):
for attr_name in attrs:
setattr(cls, attr_name, _make_proxy_property(attr_name))
return cls
return cls_builder
def cond_delegate_to_executor(*attrs):
def cls_builder(cls):
for attr_name in attrs:
setattr(cls, attr_name, _make_cond_delegate_method(attr_name))
return cls
return cls_builder
def _make_delegate_method(attr_name):
@coroutine
def method(self, *args, **kwargs):
cb = functools.partial(getattr(self._file, attr_name), *args, **kwargs)
return (yield from self._loop.run_in_executor(self._executor, cb))
return method
def _make_proxy_method(attr_name):
def method(self, *args, **kwargs):
return getattr(self._file, attr_name)(*args, **kwargs)
return method
def _make_proxy_property(attr_name):
def proxy_property(self):
return getattr(self._file, attr_name)
return property(proxy_property)
def _make_cond_delegate_method(attr_name):
"""For spooled temp files, delegate only if rolled to file object"""
async def method(self, *args, **kwargs):
if self._file._rolled:
cb = functools.partial(getattr(self._file, attr_name), *args, **kwargs)
return await self._loop.run_in_executor(self._executor, cb)
else:
return getattr(self._file, attr_name)(*args, **kwargs)
return method
|