| | from __future__ import annotations |
| |
|
| | import os |
| | import pickle |
| | import time |
| | from typing import TYPE_CHECKING |
| |
|
| | from fsspec.utils import atomic_write |
| |
|
| | try: |
| | import ujson as json |
| | except ImportError: |
| | if not TYPE_CHECKING: |
| | import json |
| |
|
| | if TYPE_CHECKING: |
| | from collections.abc import Iterator |
| | from typing import Any, Literal, TypeAlias |
| |
|
| | from .cached import CachingFileSystem |
| |
|
| | Detail: TypeAlias = dict[str, Any] |
| |
|
| |
|
| | class CacheMetadata: |
| | """Cache metadata. |
| | |
| | All reading and writing of cache metadata is performed by this class, |
| | accessing the cached files and blocks is not. |
| | |
| | Metadata is stored in a single file per storage directory in JSON format. |
| | For backward compatibility, also reads metadata stored in pickle format |
| | which is converted to JSON when next saved. |
| | """ |
| |
|
| | def __init__(self, storage: list[str]): |
| | """ |
| | |
| | Parameters |
| | ---------- |
| | storage: list[str] |
| | Directories containing cached files, must be at least one. Metadata |
| | is stored in the last of these directories by convention. |
| | """ |
| | if not storage: |
| | raise ValueError("CacheMetadata expects at least one storage location") |
| |
|
| | self._storage = storage |
| | self.cached_files: list[Detail] = [{}] |
| |
|
| | |
| | |
| | self._force_save_pickle = False |
| |
|
| | def _load(self, fn: str) -> Detail: |
| | """Low-level function to load metadata from specific file""" |
| | try: |
| | with open(fn, "r") as f: |
| | loaded = json.load(f) |
| | except ValueError: |
| | with open(fn, "rb") as f: |
| | loaded = pickle.load(f) |
| | for c in loaded.values(): |
| | if isinstance(c.get("blocks"), list): |
| | c["blocks"] = set(c["blocks"]) |
| | return loaded |
| |
|
| | def _save(self, metadata_to_save: Detail, fn: str) -> None: |
| | """Low-level function to save metadata to specific file""" |
| | if self._force_save_pickle: |
| | with atomic_write(fn) as f: |
| | pickle.dump(metadata_to_save, f) |
| | else: |
| | with atomic_write(fn, mode="w") as f: |
| | json.dump(metadata_to_save, f) |
| |
|
| | def _scan_locations( |
| | self, writable_only: bool = False |
| | ) -> Iterator[tuple[str, str, bool]]: |
| | """Yield locations (filenames) where metadata is stored, and whether |
| | writable or not. |
| | |
| | Parameters |
| | ---------- |
| | writable: bool |
| | Set to True to only yield writable locations. |
| | |
| | Returns |
| | ------- |
| | Yields (str, str, bool) |
| | """ |
| | n = len(self._storage) |
| | for i, storage in enumerate(self._storage): |
| | writable = i == n - 1 |
| | if writable_only and not writable: |
| | continue |
| | yield os.path.join(storage, "cache"), storage, writable |
| |
|
| | def check_file( |
| | self, path: str, cfs: CachingFileSystem | None |
| | ) -> Literal[False] | tuple[Detail, str]: |
| | """If path is in cache return its details, otherwise return ``False``. |
| | |
| | If the optional CachingFileSystem is specified then it is used to |
| | perform extra checks to reject possible matches, such as if they are |
| | too old. |
| | """ |
| | for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files): |
| | if path not in cache: |
| | continue |
| | detail = cache[path].copy() |
| |
|
| | if cfs is not None: |
| | if cfs.check_files and detail["uid"] != cfs.fs.ukey(path): |
| | |
| | continue |
| | if cfs.expiry and time.time() - detail["time"] > cfs.expiry: |
| | |
| | continue |
| |
|
| | fn = os.path.join(base, detail["fn"]) |
| | if os.path.exists(fn): |
| | return detail, fn |
| | return False |
| |
|
| | def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]: |
| | """Remove expired metadata from the cache. |
| | |
| | Returns names of files corresponding to expired metadata and a boolean |
| | flag indicating whether the writable cache is empty. Caller is |
| | responsible for deleting the expired files. |
| | """ |
| | expired_files = [] |
| | for path, detail in self.cached_files[-1].copy().items(): |
| | if time.time() - detail["time"] > expiry_time: |
| | fn = detail.get("fn", "") |
| | if not fn: |
| | raise RuntimeError( |
| | f"Cache metadata does not contain 'fn' for {path}" |
| | ) |
| | fn = os.path.join(self._storage[-1], fn) |
| | expired_files.append(fn) |
| | self.cached_files[-1].pop(path) |
| |
|
| | if self.cached_files[-1]: |
| | cache_path = os.path.join(self._storage[-1], "cache") |
| | self._save(self.cached_files[-1], cache_path) |
| |
|
| | writable_cache_empty = not self.cached_files[-1] |
| | return expired_files, writable_cache_empty |
| |
|
| | def load(self) -> None: |
| | """Load all metadata from disk and store in ``self.cached_files``""" |
| | cached_files = [] |
| | for fn, _, _ in self._scan_locations(): |
| | if os.path.exists(fn): |
| | |
| | cached_files.append(self._load(fn)) |
| | else: |
| | cached_files.append({}) |
| | self.cached_files = cached_files or [{}] |
| |
|
| | def on_close_cached_file(self, f: Any, path: str) -> None: |
| | """Perform side-effect actions on closing a cached file. |
| | |
| | The actual closing of the file is the responsibility of the caller. |
| | """ |
| | |
| | c = self.cached_files[-1][path] |
| | if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size: |
| | c["blocks"] = True |
| |
|
| | def pop_file(self, path: str) -> str | None: |
| | """Remove metadata of cached file. |
| | |
| | If path is in the cache, return the filename of the cached file, |
| | otherwise return ``None``. Caller is responsible for deleting the |
| | cached file. |
| | """ |
| | details = self.check_file(path, None) |
| | if not details: |
| | return None |
| | _, fn = details |
| | if fn.startswith(self._storage[-1]): |
| | self.cached_files[-1].pop(path) |
| | self.save() |
| | else: |
| | raise PermissionError( |
| | "Can only delete cached file in last, writable cache location" |
| | ) |
| | return fn |
| |
|
| | def save(self) -> None: |
| | """Save metadata to disk""" |
| | for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files): |
| | if not writable: |
| | continue |
| |
|
| | if os.path.exists(fn): |
| | cached_files = self._load(fn) |
| | for k, c in cached_files.items(): |
| | if k in cache: |
| | if c["blocks"] is True or cache[k]["blocks"] is True: |
| | c["blocks"] = True |
| | else: |
| | |
| | |
| | |
| | |
| | blocks = cache[k]["blocks"] |
| | blocks.update(c["blocks"]) |
| | c["blocks"] = blocks |
| | c["time"] = max(c["time"], cache[k]["time"]) |
| | c["uid"] = cache[k]["uid"] |
| |
|
| | |
| | for k, c in cache.items(): |
| | if k not in cached_files: |
| | cached_files[k] = c |
| | else: |
| | cached_files = cache |
| | cache = {k: v.copy() for k, v in cached_files.items()} |
| | for c in cache.values(): |
| | if isinstance(c["blocks"], set): |
| | c["blocks"] = list(c["blocks"]) |
| | self._save(cache, fn) |
| | self.cached_files[-1] = cached_files |
| |
|
| | def update_file(self, path: str, detail: Detail) -> None: |
| | """Update metadata for specific file in memory, do not save""" |
| | self.cached_files[-1][path] = detail |
| |
|