#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import logging import os import shutil from typing import List, Optional logger = logging.getLogger(__file__) try: from iopath.common.file_io import g_pathmgr as IOPathManager try: # [FB only - for now] AWS PathHandler for PathManager from .fb_pathhandlers import S3PathHandler IOPathManager.register_handler(S3PathHandler()) except KeyError: logging.warning("S3PathHandler already registered.") except ImportError: logging.debug( "S3PathHandler couldn't be imported. Either missing fb-only files, or boto3 module." ) except ImportError: IOPathManager = None class PathManager: """ Wrapper for insulating OSS I/O (using Python builtin operations) from iopath's PathManager abstraction (for transparently handling various internal backends). """ @staticmethod def open( path: str, mode: str = "r", buffering: int = -1, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, ): if IOPathManager: return IOPathManager.open( path=path, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, ) return open( path, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, ) @staticmethod def copy(src_path: str, dst_path: str, overwrite: bool = False) -> bool: if IOPathManager: return IOPathManager.copy( src_path=src_path, dst_path=dst_path, overwrite=overwrite ) return shutil.copyfile(src_path, dst_path) @staticmethod def get_local_path(path: str, **kwargs) -> str: if IOPathManager: return IOPathManager.get_local_path(path, **kwargs) return path @staticmethod def exists(path: str) -> bool: if IOPathManager: return IOPathManager.exists(path) return os.path.exists(path) @staticmethod def isfile(path: str) -> bool: if IOPathManager: return IOPathManager.isfile(path) return os.path.isfile(path) @staticmethod def ls(path: str) -> List[str]: if IOPathManager: return IOPathManager.ls(path) return os.listdir(path) @staticmethod def mkdirs(path: str) -> None: if IOPathManager: return IOPathManager.mkdirs(path) os.makedirs(path, exist_ok=True) @staticmethod def rm(path: str) -> None: if IOPathManager: return IOPathManager.rm(path) os.remove(path) @staticmethod def chmod(path: str, mode: int) -> None: if not PathManager.path_requires_pathmanager(path): os.chmod(path, mode) @staticmethod def register_handler(handler) -> None: if IOPathManager: return IOPathManager.register_handler(handler=handler) @staticmethod def copy_from_local( local_path: str, dst_path: str, overwrite: bool = False, **kwargs ) -> None: if IOPathManager: return IOPathManager.copy_from_local( local_path=local_path, dst_path=dst_path, overwrite=overwrite, **kwargs ) return shutil.copyfile(local_path, dst_path) @staticmethod def path_requires_pathmanager(path: str) -> bool: """Do we require PathManager to access given path?""" if IOPathManager: for p in IOPathManager._path_handlers.keys(): if path.startswith(p): return True return False @staticmethod def supports_rename(path: str) -> bool: # PathManager doesn't yet support renames return not PathManager.path_requires_pathmanager(path) @staticmethod def rename(src: str, dst: str): os.rename(src, dst) """ ioPath async PathManager methods: """ @staticmethod def opena( path: str, mode: str = "r", buffering: int = -1, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, ): """ Return file descriptor with asynchronous write operations. """ global IOPathManager if not IOPathManager: logging.info("ioPath is initializing PathManager.") try: from iopath.common.file_io import PathManager IOPathManager = PathManager() except Exception: logging.exception("Failed to initialize ioPath PathManager object.") return IOPathManager.opena( path=path, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, ) @staticmethod def async_close() -> bool: """ Wait for files to be written and clean up asynchronous PathManager. NOTE: `PathManager.async_close()` must be called at the end of any script that uses `PathManager.opena(...)`. """ global IOPathManager if IOPathManager: return IOPathManager.async_close() return False