reach-vb's picture
reach-vb HF staff
8643e0bff997343019440dbb643d796bcec3eb5d247e4cbf77173c5941ca163f
6017349
raw
history blame
8.58 kB
from __future__ import annotations
import os
import pickle
import time
from typing import TYPE_CHECKING
from fsspec.utils import atomic_write
try:
import ujson as json
except ImportError:
if not TYPE_CHECKING:
import json
if TYPE_CHECKING:
from typing import Any, Dict, Iterator, Literal
from typing_extensions import TypeAlias
from .cached import CachingFileSystem
Detail: TypeAlias = Dict[str, Any]
class CacheMetadata:
"""Cache metadata.
All reading and writing of cache metadata is performed by this class,
accessing the cached files and blocks is not.
Metadata is stored in a single file per storage directory in JSON format.
For backward compatibility, also reads metadata stored in pickle format
which is converted to JSON when next saved.
"""
def __init__(self, storage: list[str]):
"""
Parameters
----------
storage: list[str]
Directories containing cached files, must be at least one. Metadata
is stored in the last of these directories by convention.
"""
if not storage:
raise ValueError("CacheMetadata expects at least one storage location")
self._storage = storage
self.cached_files: list[Detail] = [{}]
# Private attribute to force saving of metadata in pickle format rather than
# JSON for use in tests to confirm can read both pickle and JSON formats.
self._force_save_pickle = False
def _load(self, fn: str) -> Detail:
"""Low-level function to load metadata from specific file"""
try:
with open(fn, "r") as f:
return json.load(f)
except ValueError:
with open(fn, "rb") as f:
return pickle.load(f)
def _save(self, metadata_to_save: Detail, fn: str) -> None:
"""Low-level function to save metadata to specific file"""
if self._force_save_pickle:
with atomic_write(fn) as f:
pickle.dump(metadata_to_save, f)
else:
with atomic_write(fn, mode="w") as f:
json.dump(metadata_to_save, f)
def _scan_locations(
self, writable_only: bool = False
) -> Iterator[tuple[str, str, bool]]:
"""Yield locations (filenames) where metadata is stored, and whether
writable or not.
Parameters
----------
writable: bool
Set to True to only yield writable locations.
Returns
-------
Yields (str, str, bool)
"""
n = len(self._storage)
for i, storage in enumerate(self._storage):
writable = i == n - 1
if writable_only and not writable:
continue
yield os.path.join(storage, "cache"), storage, writable
def check_file(
self, path: str, cfs: CachingFileSystem | None
) -> Literal[False] | tuple[Detail, str]:
"""If path is in cache return its details, otherwise return ``False``.
If the optional CachingFileSystem is specified then it is used to
perform extra checks to reject possible matches, such as if they are
too old.
"""
for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files):
if path not in cache:
continue
detail = cache[path].copy()
if cfs is not None:
if cfs.check_files and detail["uid"] != cfs.fs.ukey(path):
# Wrong file as determined by hash of file properties
continue
if cfs.expiry and time.time() - detail["time"] > cfs.expiry:
# Cached file has expired
continue
fn = os.path.join(base, detail["fn"])
if os.path.exists(fn):
return detail, fn
return False
def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]:
"""Remove expired metadata from the cache.
Returns names of files corresponding to expired metadata and a boolean
flag indicating whether the writable cache is empty. Caller is
responsible for deleting the expired files.
"""
expired_files = []
for path, detail in self.cached_files[-1].copy().items():
if time.time() - detail["time"] > expiry_time:
fn = detail.get("fn", "")
if not fn:
raise RuntimeError(
f"Cache metadata does not contain 'fn' for {path}"
)
fn = os.path.join(self._storage[-1], fn)
expired_files.append(fn)
self.cached_files[-1].pop(path)
if self.cached_files[-1]:
cache_path = os.path.join(self._storage[-1], "cache")
self._save(self.cached_files[-1], cache_path)
writable_cache_empty = not self.cached_files[-1]
return expired_files, writable_cache_empty
def load(self) -> None:
"""Load all metadata from disk and store in ``self.cached_files``"""
cached_files = []
for fn, _, _ in self._scan_locations():
if os.path.exists(fn):
# TODO: consolidate blocks here
loaded_cached_files = self._load(fn)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
else:
cached_files.append({})
self.cached_files = cached_files or [{}]
def on_close_cached_file(self, f: Any, path: str) -> None:
"""Perform side-effect actions on closing a cached file.
The actual closing of the file is the responsibility of the caller.
"""
# File must be writeble, so in self.cached_files[-1]
c = self.cached_files[-1][path]
if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size:
c["blocks"] = True
def pop_file(self, path: str) -> str | None:
"""Remove metadata of cached file.
If path is in the cache, return the filename of the cached file,
otherwise return ``None``. Caller is responsible for deleting the
cached file.
"""
details = self.check_file(path, None)
if not details:
return None
_, fn = details
if fn.startswith(self._storage[-1]):
self.cached_files[-1].pop(path)
self.save()
else:
raise PermissionError(
"Can only delete cached file in last, writable cache location"
)
return fn
def save(self) -> None:
"""Save metadata to disk"""
for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files):
if not writable:
continue
if os.path.exists(fn):
cached_files = self._load(fn)
for k, c in cached_files.items():
if k in cache:
if c["blocks"] is True or cache[k]["blocks"] is True:
c["blocks"] = True
else:
# self.cached_files[*][*]["blocks"] must continue to
# point to the same set object so that updates
# performed by MMapCache are propagated back to
# self.cached_files.
blocks = cache[k]["blocks"]
blocks.update(c["blocks"])
c["blocks"] = blocks
c["time"] = max(c["time"], cache[k]["time"])
c["uid"] = cache[k]["uid"]
# Files can be added to cache after it was written once
for k, c in cache.items():
if k not in cached_files:
cached_files[k] = c
else:
cached_files = cache
cache = {k: v.copy() for k, v in cached_files.items()}
for c in cache.values():
if isinstance(c["blocks"], set):
c["blocks"] = list(c["blocks"])
self._save(cache, fn)
self.cached_files[-1] = cached_files
def update_file(self, path: str, detail: Detail) -> None:
"""Update metadata for specific file in memory, do not save"""
self.cached_files[-1][path] = detail