reach-vb's picture
reach-vb HF staff
6d3bea4b268fcd6555b43f7754e3b8a3bbf7a5122b1e4f66294f0383b953f763
3962680
raw
history blame
6.36 kB
class Callback:
"""
Base class and interface for callback mechanism
This class can be used directly for monitoring file transfers by
providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument,
below), or subclassed for more specialised behaviour.
Parameters
----------
size: int (optional)
Nominal quantity for the value that corresponds to a complete
transfer, e.g., total number of tiles or total number of
bytes
value: int (0)
Starting internal counter value
hooks: dict or None
A dict of named functions to be called on each update. The signature
of these must be ``f(size, value, **kwargs)``
"""
def __init__(self, size=None, value=0, hooks=None, **kwargs):
self.size = size
self.value = value
self.hooks = hooks or {}
self.kw = kwargs
def set_size(self, size):
"""
Set the internal maximum size attribute
Usually called if not initially set at instantiation. Note that this
triggers a ``call()``.
Parameters
----------
size: int
"""
self.size = size
self.call()
def absolute_update(self, value):
"""
Set the internal value state
Triggers ``call()``
Parameters
----------
value: int
"""
self.value = value
self.call()
def relative_update(self, inc=1):
"""
Delta increment the internal counter
Triggers ``call()``
Parameters
----------
inc: int
"""
self.value += inc
self.call()
def call(self, hook_name=None, **kwargs):
"""
Execute hook(s) with current state
Each function is passed the internal size and current value
Parameters
----------
hook_name: str or None
If given, execute on this hook
kwargs: passed on to (all) hook(s)
"""
if not self.hooks:
return
kw = self.kw.copy()
kw.update(kwargs)
if hook_name:
if hook_name not in self.hooks:
return
return self.hooks[hook_name](self.size, self.value, **kw)
for hook in self.hooks.values() or []:
hook(self.size, self.value, **kw)
def wrap(self, iterable):
"""
Wrap an iterable to call ``relative_update`` on each iterations
Parameters
----------
iterable: Iterable
The iterable that is being wrapped
"""
for item in iterable:
self.relative_update()
yield item
def branch(self, path_1, path_2, kwargs):
"""
Set callbacks for child transfers
If this callback is operating at a higher level, e.g., put, which may
trigger transfers that can also be monitored. The passed kwargs are
to be *mutated* to add ``callback=``, if this class supports branching
to children.
Parameters
----------
path_1: str
Child's source path
path_2: str
Child's destination path
kwargs: dict
arguments passed to child method, e.g., put_file.
Returns
-------
"""
return None
def no_op(self, *_, **__):
pass
def __getattr__(self, item):
"""
If undefined methods are called on this class, nothing happens
"""
return self.no_op
@classmethod
def as_callback(cls, maybe_callback=None):
"""Transform callback=... into Callback instance
For the special value of ``None``, return the global instance of
``NoOpCallback``. This is an alternative to including
``callback=_DEFAULT_CALLBACK`` directly in a method signature.
"""
if maybe_callback is None:
return _DEFAULT_CALLBACK
return maybe_callback
class NoOpCallback(Callback):
"""
This implementation of Callback does exactly nothing
"""
def call(self, *args, **kwargs):
return None
class DotPrinterCallback(Callback):
"""
Simple example Callback implementation
Almost identical to Callback with a hook that prints a char; here we
demonstrate how the outer layer may print "#" and the inner layer "."
"""
def __init__(self, chr_to_print="#", **kwargs):
self.chr = chr_to_print
super().__init__(**kwargs)
def branch(self, path_1, path_2, kwargs):
"""Mutate kwargs to add new instance with different print char"""
kwargs["callback"] = DotPrinterCallback(".")
def call(self, **kwargs):
"""Just outputs a character"""
print(self.chr, end="")
class TqdmCallback(Callback):
"""
A callback to display a progress bar using tqdm
Parameters
----------
tqdm_kwargs : dict, (optional)
Any argument accepted by the tqdm constructor.
See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_.
Will be forwarded to tqdm.
Examples
--------
>>> import fsspec
>>> from fsspec.callbacks import TqdmCallback
>>> fs = fsspec.filesystem("memory")
>>> path2distant_data = "/your-path"
>>> fs.upload(
".",
path2distant_data,
recursive=True,
callback=TqdmCallback(),
)
You can forward args to tqdm using the ``tqdm_kwargs`` parameter.
>>> fs.upload(
".",
path2distant_data,
recursive=True,
callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}),
)
"""
def __init__(self, tqdm_kwargs=None, *args, **kwargs):
try:
import tqdm
self._tqdm = tqdm
except ImportError as exce:
raise ImportError(
"Using TqdmCallback requires tqdm to be installed"
) from exce
self._tqdm_kwargs = tqdm_kwargs or {}
super().__init__(*args, **kwargs)
def set_size(self, size):
self.tqdm = self._tqdm.tqdm(total=size, **self._tqdm_kwargs)
def relative_update(self, inc=1):
self.tqdm.update(inc)
def __del__(self):
self.tqdm.close()
self.tqdm = None
_DEFAULT_CALLBACK = NoOpCallback()