|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import os |
|
import shutil |
|
from typing import List, Optional |
|
|
|
|
|
logger = logging.getLogger(__file__) |
|
|
|
|
|
try: |
|
from iopath.common.file_io import g_pathmgr as IOPathManager |
|
|
|
try: |
|
|
|
from .fb_pathhandlers import S3PathHandler |
|
|
|
IOPathManager.register_handler(S3PathHandler()) |
|
except KeyError: |
|
logging.warning("S3PathHandler already registered.") |
|
except ImportError: |
|
logging.debug( |
|
"S3PathHandler couldn't be imported. Either missing fb-only files, or boto3 module." |
|
) |
|
|
|
except ImportError: |
|
IOPathManager = None |
|
|
|
|
|
class PathManager: |
|
""" |
|
Wrapper for insulating OSS I/O (using Python builtin operations) from |
|
iopath's PathManager abstraction (for transparently handling various |
|
internal backends). |
|
""" |
|
|
|
@staticmethod |
|
def open( |
|
path: str, |
|
mode: str = "r", |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
): |
|
if IOPathManager: |
|
return IOPathManager.open( |
|
path=path, |
|
mode=mode, |
|
buffering=buffering, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
) |
|
return open( |
|
path, |
|
mode=mode, |
|
buffering=buffering, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
) |
|
|
|
@staticmethod |
|
def copy(src_path: str, dst_path: str, overwrite: bool = False) -> bool: |
|
if IOPathManager: |
|
return IOPathManager.copy( |
|
src_path=src_path, dst_path=dst_path, overwrite=overwrite |
|
) |
|
return shutil.copyfile(src_path, dst_path) |
|
|
|
@staticmethod |
|
def get_local_path(path: str, **kwargs) -> str: |
|
if IOPathManager: |
|
return IOPathManager.get_local_path(path, **kwargs) |
|
return path |
|
|
|
@staticmethod |
|
def exists(path: str) -> bool: |
|
if IOPathManager: |
|
return IOPathManager.exists(path) |
|
return os.path.exists(path) |
|
|
|
@staticmethod |
|
def isfile(path: str) -> bool: |
|
if IOPathManager: |
|
return IOPathManager.isfile(path) |
|
return os.path.isfile(path) |
|
|
|
@staticmethod |
|
def ls(path: str) -> List[str]: |
|
if IOPathManager: |
|
return IOPathManager.ls(path) |
|
return os.listdir(path) |
|
|
|
@staticmethod |
|
def mkdirs(path: str) -> None: |
|
if IOPathManager: |
|
return IOPathManager.mkdirs(path) |
|
os.makedirs(path, exist_ok=True) |
|
|
|
@staticmethod |
|
def rm(path: str) -> None: |
|
if IOPathManager: |
|
return IOPathManager.rm(path) |
|
os.remove(path) |
|
|
|
@staticmethod |
|
def chmod(path: str, mode: int) -> None: |
|
if not PathManager.path_requires_pathmanager(path): |
|
os.chmod(path, mode) |
|
|
|
@staticmethod |
|
def register_handler(handler) -> None: |
|
if IOPathManager: |
|
return IOPathManager.register_handler(handler=handler) |
|
|
|
@staticmethod |
|
def copy_from_local( |
|
local_path: str, dst_path: str, overwrite: bool = False, **kwargs |
|
) -> None: |
|
if IOPathManager: |
|
return IOPathManager.copy_from_local( |
|
local_path=local_path, dst_path=dst_path, overwrite=overwrite, **kwargs |
|
) |
|
return shutil.copyfile(local_path, dst_path) |
|
|
|
@staticmethod |
|
def path_requires_pathmanager(path: str) -> bool: |
|
"""Do we require PathManager to access given path?""" |
|
if IOPathManager: |
|
for p in IOPathManager._path_handlers.keys(): |
|
if path.startswith(p): |
|
return True |
|
return False |
|
|
|
@staticmethod |
|
def supports_rename(path: str) -> bool: |
|
|
|
return not PathManager.path_requires_pathmanager(path) |
|
|
|
@staticmethod |
|
def rename(src: str, dst: str): |
|
os.rename(src, dst) |
|
|
|
""" |
|
ioPath async PathManager methods: |
|
""" |
|
|
|
@staticmethod |
|
def opena( |
|
path: str, |
|
mode: str = "r", |
|
buffering: int = -1, |
|
encoding: Optional[str] = None, |
|
errors: Optional[str] = None, |
|
newline: Optional[str] = None, |
|
): |
|
""" |
|
Return file descriptor with asynchronous write operations. |
|
""" |
|
global IOPathManager |
|
if not IOPathManager: |
|
logging.info("ioPath is initializing PathManager.") |
|
try: |
|
from iopath.common.file_io import PathManager |
|
|
|
IOPathManager = PathManager() |
|
except Exception: |
|
logging.exception("Failed to initialize ioPath PathManager object.") |
|
return IOPathManager.opena( |
|
path=path, |
|
mode=mode, |
|
buffering=buffering, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
) |
|
|
|
@staticmethod |
|
def async_close() -> bool: |
|
""" |
|
Wait for files to be written and clean up asynchronous PathManager. |
|
NOTE: `PathManager.async_close()` must be called at the end of any |
|
script that uses `PathManager.opena(...)`. |
|
""" |
|
global IOPathManager |
|
if IOPathManager: |
|
return IOPathManager.async_close() |
|
return False |
|
|