| |
|
|
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
|
|
| import logging |
| import zipfile |
| import tempfile |
| import sys |
| import os |
| import stat |
| import time |
| from typing import Any, Callable |
|
|
|
|
| RENAME_RETRY_COUNT = 100 |
| RENAME_RETRY_DELAY = 0.1 |
|
|
| logging.basicConfig(level=logging.WARNING, format="%(message)s") |
| logger = logging.getLogger("install_package") |
|
|
|
|
| def remove_directory_item(path): |
| if os.path.islink(path) or os.path.isfile(path): |
| try: |
| os.remove(path) |
| except PermissionError: |
| |
| os.chmod(path, stat.S_IRWXU) |
| os.remove(path) |
| else: |
| |
| clean_out_folder = False |
| try: |
| |
| |
| os.chmod(path, stat.S_IRWXU) |
| os.rmdir(path) |
| except OSError: |
| clean_out_folder = True |
|
|
| if clean_out_folder: |
| |
| names = os.listdir(path) |
| for name in names: |
| fullname = os.path.join(path, name) |
| remove_directory_item(fullname) |
| |
| os.rmdir(path) |
|
|
|
|
| class StagingDirectory: |
| def __init__(self, staging_path): |
| self.staging_path = staging_path |
| self.temp_folder_path = None |
| os.makedirs(staging_path, exist_ok=True) |
|
|
| def __enter__(self): |
| self.temp_folder_path = tempfile.mkdtemp(prefix="ver-", dir=self.staging_path) |
| return self |
|
|
| def get_temp_folder_path(self): |
| return self.temp_folder_path |
|
|
| |
| def promote_and_rename(self, folder_name): |
| abs_dst_folder_name = os.path.join(self.staging_path, folder_name) |
| os.rename(self.temp_folder_path, abs_dst_folder_name) |
|
|
| def __exit__(self, type, value, traceback): |
| |
| path = self.temp_folder_path |
| if os.path.isdir(path): |
| remove_directory_item(path) |
|
|
|
|
| def rename_folder(staging_dir: StagingDirectory, folder_name: str): |
| try: |
| staging_dir.promote_and_rename(folder_name) |
| except OSError as exc: |
| |
| |
| abs_dst_folder_name = os.path.join(staging_dir.staging_path, folder_name) |
| if os.path.exists(abs_dst_folder_name): |
| logger.warning( |
| f"Directory {abs_dst_folder_name} already present, package installation already completed" |
| ) |
| else: |
| raise |
|
|
|
|
| def call_with_retry( |
| op_name: str, func: Callable, retry_count: int = 3, retry_delay: float = 20 |
| ) -> Any: |
| retries_left = retry_count |
| while True: |
| try: |
| return func() |
| except (OSError, IOError) as exc: |
| logger.warning(f"Failure while executing {op_name} [{str(exc)}]") |
| if retries_left: |
| retry_str = "retry" if retries_left == 1 else "retries" |
| logger.warning( |
| f"Retrying after {retry_delay} seconds" |
| f" ({retries_left} {retry_str} left) ..." |
| ) |
| time.sleep(retry_delay) |
| else: |
| logger.error("Maximum retries exceeded, giving up") |
| raise |
| retries_left -= 1 |
|
|
|
|
| def rename_folder_with_retry(staging_dir: StagingDirectory, folder_name): |
| dst_path = os.path.join(staging_dir.staging_path, folder_name) |
| call_with_retry( |
| f"rename {staging_dir.get_temp_folder_path()} -> {dst_path}", |
| lambda: rename_folder(staging_dir, folder_name), |
| RENAME_RETRY_COUNT, |
| RENAME_RETRY_DELAY, |
| ) |
|
|
|
|
| def install_package(package_path, install_path): |
| staging_path, version = os.path.split(install_path) |
| with StagingDirectory(staging_path) as staging_dir: |
| output_folder = staging_dir.get_temp_folder_path() |
| with zipfile.ZipFile(package_path, allowZip64=True) as zip_file: |
| zip_file.extractall(output_folder) |
|
|
| |
| rename_folder_with_retry(staging_dir, version) |
|
|
| print(f"Package successfully installed to {install_path}") |
|
|
|
|
| if __name__ == "__main__": |
| install_package(sys.argv[1], sys.argv[2]) |
|
|