test / modules /files_cache.py
bilegentile's picture
Upload folder using huggingface_hub
c19ca42 verified
import itertools
import os
from collections import UserDict
from dataclasses import dataclass, field
from typing import Callable, Dict, Iterator, List, Optional, Union
from installer import log
class Directory: # forward declaration
...
FilePathList = List[str]
FilePathIterator = Iterator[str]
DirectoryPathList = List[str]
DirectoryPathIterator = Iterator[str]
DirectoryList = List[Directory]
DirectoryIterator = Iterator[Directory]
DirectoryCollection = Dict[str, Directory]
ExtensionFilter = Callable
ExtensionList = list[str]
RecursiveType = Union[bool,Callable]
def real_path(directory_path:str) -> Union[str, None]:
try:
return os.path.abspath(os.path.expanduser(directory_path))
except Exception:
pass
return None
@dataclass(frozen=True)
class Directory(Directory): # pylint: disable=E0102
path: str = field(default_factory=str)
mtime: float = field(default_factory=float, init=False)
files: FilePathList = field(default_factory=list)
directories: DirectoryPathList = field(default_factory=list)
def __post_init__(self):
object.__setattr__(self, 'mtime', self.live_mtime)
@classmethod
def from_dict(cls, dict_object: dict) -> Directory:
directory = cls.__new__(cls)
object.__setattr__(directory, 'path', dict_object.get('path'))
object.__setattr__(directory, 'mtime', dict_object.get('mtime'))
object.__setattr__(directory, 'files', dict_object.get('files'))
object.__setattr__(directory, 'directories', dict_object.get('directories'))
return directory
def clear(self) -> None:
self._update(Directory.from_dict({
'path': None,
'mtime': float(),
'files': [],
'directories': []
}))
def update(self, source_directory: Directory) -> Directory:
if source_directory is not self:
self._update(source_directory)
return self
def _update(self, source:Directory) -> None:
assert not source.path or source.path == self.path, f'When updating a directory, the paths must match. Attemped to update Directory `{self.path}` with `{source.path}`'
for dead_path in self.directories:
if dead_path not in source.directories:
delete_cached_directory(dead_path)
self.directories[:] = source.directories
self.files[:] = source.files
object.__setattr__(self, 'mtime', source.mtime)
@property
def exists(self) -> bool:
return self.path and os.path.exists(self.path)
@property
def is_directory(self) -> bool:
return self.exists and os.path.isdir(self.path)
@property
def live_mtime(self) -> float:
return os.path.getmtime(self.path) if self.is_directory else 0
@property
def is_stale(self) -> bool:
return not self.is_directory or self.mtime != self.live_mtime
class DirectoryCache(UserDict, DirectoryCollection):
def __delattr__(self, directory_path: str) -> None:
directory: Directory = get_directory(directory_path, fetch=False)
if directory:
map(delete_cached_directory, directory.directories)
directory.clear()
del self.data[directory_path]
def clean_directory(directory: Directory, /, recursive: RecursiveType=False) -> bool:
if not directory.is_directory:
is_clean = False
delete_cached_directory(directory.path)
else:
is_clean = not directory.is_stale
if not is_clean:
directory.update(fetch_directory(directory.path))
else:
for directory_path in directory.directories[:]:
try:
recurse = recursive and (not callable(recursive) or recursive(directory.path))
directory = get_directory(directory_path, fetch=recurse)
if directory:
if directory.is_directory:
if recurse:
is_clean = clean_directory(directory, recursive=recurse) and is_clean
continue
delete_cached_directory(directory_path)
# If we had intended to fetch this directory, but didn't, that means it doesn't exist. Purge.
if recurse:
directory.directories.remove(directory_path)
is_clean = False
except Exception:
pass
return is_clean
def get_directory(directory_or_path: str, /, fetch:bool=True) -> Union[Directory, None]:
if isinstance(directory_or_path, Directory):
if directory_or_path.is_directory:
return directory_or_path
else:
directory_or_path = directory_or_path.path
directory_or_path = real_path(directory_or_path)
if not cache_folders.get(directory_or_path, None):
if fetch:
directory = fetch_directory(directory_path=directory_or_path)
if directory:
cache_folders[directory_or_path] = directory
else:
clean_directory(cache_folders[directory_or_path])
return cache_folders[directory_or_path] if directory_or_path in cache_folders else None
def fetch_directory(directory_path: str) -> Union[Directory, None]:
directory: Directory
for directory in _walk(directory_path, recurse=False):
return directory # The return is intentional, we get a generator, we only need the one
return None
def _walk(top, recurse:RecursiveType=True) -> Directory:
# reimplemented `path.walk()`
nondirs = []
walk_dirs = []
try:
scandir_it = os.scandir(top)
except OSError:
return
with scandir_it:
while True:
try:
entry = next(scandir_it)
except StopIteration:
break
if not entry.is_dir():
nondirs.append(entry.path)
else:
if entry.is_symlink() and not os.path.exists(entry.path):
log.error(f'Files broken symlink: {entry.path}')
else:
walk_dirs.append(entry.path)
yield Directory(top, nondirs, walk_dirs)
if recurse:
for new_path in walk_dirs:
if callable(recurse) and not recurse(new_path):
continue
yield from _walk(new_path, recurse=recurse)
def _cached_walk(top, recurse:RecursiveType=True) -> Directory:
top = get_directory(top)
if not top:
return
yield top
if recurse:
for child_directory in top.directories:
if os.path.basename(child_directory).startswith('models--'):
continue
if callable(recurse) and not recurse(child_directory):
continue
yield from _cached_walk(child_directory, recurse=recurse)
def walk(top, recurse:RecursiveType=True, cached=True) -> Directory:
yield from _cached_walk(top, recurse=recurse) if cached else _walk(top, recurse=recurse)
def delete_cached_directory(directory_path:str) -> bool:
global cache_folders # pylint: disable=W0602
if directory_path in cache_folders:
del cache_folders[directory_path]
def is_directory(dir_path:str) -> bool:
return dir_path and os.path.exists(dir_path) and os.path.isdir(dir_path)
def directory_mtime(directory_path:str, /, recursive:RecursiveType=True) -> float:
return float(max(0, *[directory.mtime for directory in get_directories(directory_path, recursive=recursive)]))
def unique_directories(directories:DirectoryPathList, /, recursive:RecursiveType=True) -> DirectoryPathIterator:
'''Ensure no empty, or duplicates'''
'''If we are going recursive, then directories that are children of other directories are redundant'''
''' @todo this is incredibly inneficient. the hit is small, but it is ugly, no? '''
directories = sorted(unique_paths(directories), reverse=True)
while directories:
directory = directories.pop()
yield directory
if not recursive:
continue
_directory = os.path.join(directory, '')
child_directory = None
while directories and directories[-1].startswith(_directory):
if not callable(recursive) or not child_directory:
directories.pop()
continue
child_directory = directories[-1][len(directory):]
if child_directory:
next_directory = _directory
if not callable(recursive):
_remove_directory = next_directory
else:
for sub_directory in child_directory.split(os.path.sep):
next_directory = os.path.join(next_directory, sub_directory)
if recursive(next_directory):
_remove_directory = os.path.join(next_directory, '')
break
while _remove_directory and directories:
_d = directories.pop()
if not directories[-1].startswith(_remove_directory):
del _remove_directory
def unique_paths(directory_paths:DirectoryPathList) -> DirectoryPathIterator:
realpaths = (real_path(directory_path) for directory_path in filter(bool, directory_paths))
return {real_directory_path: True for real_directory_path in filter(bool, realpaths)}.keys()
def get_directories(*directory_paths: DirectoryPathList, fetch:bool=True, recursive:RecursiveType=True) -> DirectoryCollection:
directory_paths = unique_directories(directory_paths, recursive=recursive)
directories = (get_directory(directory_path, fetch=fetch) for directory_path in directory_paths)
return filter(bool, directories)
def directory_files(*directories_or_paths: Union[DirectoryPathList, DirectoryList], recursive: RecursiveType=True) -> FilePathIterator:
return itertools.chain.from_iterable(
itertools.chain(
directory_object.files,
[]
if not recursive
else itertools.chain.from_iterable(
directory_files(directory, recursive=recursive)
for directory
in filter(
bool,
map(get_directory, filter(((bool if recursive else False) if not callable(recursive) else recursive), directory_object.directories))
)
)
)
for directory_object
in filter(bool, map(get_directory, directories_or_paths))
)
def extension_filter(ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> ExtensionFilter:
if ext_filter:
ext_filter = [*map(str.upper, ext_filter)]
if ext_blacklist:
ext_blacklist = [*map(str.upper, ext_blacklist)]
def filter_functon(fp:str):
return (not ext_filter or any(fp.upper().endswith(ew) for ew in ext_filter)) and (not ext_blacklist or not any(fp.upper().endswith(ew) for ew in ext_blacklist))
return filter_functon
def not_hidden(filepath: str) -> bool:
return not os.path.basename(filepath).startswith('.')
def filter_files(file_paths: FilePathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> FilePathIterator:
return filter(extension_filter(ext_filter, ext_blacklist), file_paths)
def list_files(*directory_paths:DirectoryPathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None, recursive:RecursiveType=True) -> FilePathIterator:
return filter_files(itertools.chain.from_iterable(
directory_files(directory, recursive=recursive)
for directory in get_directories(*directory_paths, recursive=recursive)
), ext_filter, ext_blacklist)
cache_folders = DirectoryCache({})