jexzeb's picture
Upload folder using huggingface_hub
b6068b4
raw
history blame contribute delete
No virus
2.08 kB
"""Async wrappers for spooled temp files and temp directory objects"""
# Imports
import asyncio
from types import coroutine
from ..base import AsyncBase
from ..threadpool.utils import (
delegate_to_executor,
proxy_property_directly,
cond_delegate_to_executor,
)
from functools import partial
@delegate_to_executor("fileno", "rollover")
@cond_delegate_to_executor(
"close",
"flush",
"isatty",
"read",
"readline",
"readlines",
"seek",
"tell",
"truncate",
)
@proxy_property_directly("closed", "encoding", "mode", "name", "newlines")
class AsyncSpooledTemporaryFile(AsyncBase):
"""Async wrapper for SpooledTemporaryFile class"""
async def _check(self):
if self._file._rolled:
return
max_size = self._file._max_size
if max_size and self._file.tell() > max_size:
await self.rollover()
async def write(self, s):
"""Implementation to anticipate rollover"""
if self._file._rolled:
cb = partial(self._file.write, s)
return await self._loop.run_in_executor(self._executor, cb)
else:
file = self._file._file # reference underlying base IO object
rv = file.write(s)
await self._check()
return rv
async def writelines(self, iterable):
"""Implementation to anticipate rollover"""
if self._file._rolled:
cb = partial(self._file.writelines, iterable)
return await self._loop.run_in_executor(self._executor, cb)
else:
file = self._file._file # reference underlying base IO object
rv = file.writelines(iterable)
await self._check()
return rv
@delegate_to_executor("cleanup")
@proxy_property_directly("name")
class AsyncTemporaryDirectory:
"""Async wrapper for TemporaryDirectory class"""
def __init__(self, file, loop, executor):
self._file = file
self._loop = loop
self._executor = executor
async def close(self):
await self.cleanup()