Spaces:
Paused
Paused
| #!/usr/bin/python | |
| # -*- coding: utf-8 -*- | |
| # Hive Appier Framework | |
| # Copyright (c) 2008-2024 Hive Solutions Lda. | |
| # | |
| # This file is part of Hive Appier Framework. | |
| # | |
| # Hive Appier Framework is free software: you can redistribute it and/or modify | |
| # it under the terms of the Apache License as published by the Apache | |
| # Foundation, either version 2.0 of the License, or (at your option) any | |
| # later version. | |
| # | |
| # Hive Appier Framework is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # Apache License for more details. | |
| # | |
| # You should have received a copy of the Apache License along with | |
| # Hive Appier Framework. If not, see <http://www.apache.org/licenses/>. | |
| __author__ = "João Magalhães <joamag@hive.pt>" | |
| """ The author(s) of the module """ | |
| __copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." | |
| """ The copyright for the module """ | |
| __license__ = "Apache License, Version 2.0" | |
| """ The license for the module """ | |
| from . import util | |
| from . import common | |
| from . import legacy | |
| from . import exceptions | |
| class Git(object): | |
| def is_git(cls, path=None): | |
| path = path or common.base().get_base_path() | |
| try: | |
| result = util.execute(["git", "status"], path=path) | |
| except OSError: | |
| return False | |
| code = result["code"] | |
| return code == 0 | |
| def clone(cls, url, path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "clone", url], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def fetch(cls, flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "fetch"] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def pull(cls, flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "pull"] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def push(cls, flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "push"] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def commit(cls, message="Update", flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "commit", "-m", message] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def checkout(cls, branch="master", flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "checkout", branch] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def add(cls, target="*", flags=[], path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "add", target] + flags, path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def add_upstream(cls, url, name="upstream", path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "remote", "add", name, url], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def remove_upstream(cls, name="upstream", path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "remote", "remove", name], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def config(cls, key, value, _global=True, path=None, raise_e=True): | |
| path = path or common.base().get_base_path() | |
| result = util.execute( | |
| ["git", "config", "--global" if _global else "", key, value], path=path | |
| ) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| return message | |
| def get_config(cls, key, _global=True, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| result = util.execute( | |
| ["git", "config", "--global" if _global else "", "--get", key], path=path | |
| ) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| value = message.strip() | |
| return value | |
| def get_branches(cls, names=False, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "branch"], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| branches = message.strip() | |
| branches = branches.split("\n") | |
| branches = [(value.lstrip("* "), value.startswith("*")) for value in branches] | |
| if names: | |
| branches = [branch[0] for branch in branches] | |
| return branches | |
| def get_branch(cls, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| branches = cls.get_branches(path=path, raise_e=raise_e) | |
| for branch, selected in branches: | |
| if not selected: | |
| continue | |
| return branch | |
| return None | |
| def get_commit(cls, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "rev-parse", "HEAD"], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| commit = message.strip() | |
| return commit | |
| def get_origin(cls, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| result = util.execute( | |
| ["git", "config", "--get", "remote.origin.url"], path=path | |
| ) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| origin = message.strip() | |
| origin = cls.safe_origin(origin) | |
| return origin | |
| def get_repo_path(cls, path=None, raise_e=False): | |
| path = path or common.base().get_base_path() | |
| result = util.execute(["git", "rev-parse", "--show-toplevel"], path=path) | |
| if cls._wrap_error(result, raise_e=raise_e): | |
| return None | |
| message = result.get("stdout", "") | |
| repo_path = message.strip() | |
| return repo_path | |
| def safe_origin(cls, origin, display_l=3): | |
| parse = legacy.urlparse(origin) | |
| safe_l = [] | |
| if parse.scheme: | |
| safe_l.append(parse.scheme + "://") | |
| if parse.username: | |
| safe_l.append(parse.username) | |
| if parse.password: | |
| obfuscated = util.obfuscate(parse.password) | |
| safe_l.append(":" + obfuscated) | |
| if parse.username: | |
| safe_l.append("@") | |
| if parse.hostname: | |
| safe_l.append(parse.hostname) | |
| if parse.path: | |
| safe_l.append(parse.path) | |
| return "".join(safe_l) | |
| def norm_origin(cls, origin, prefix="https://"): | |
| if origin.startswith(("http://", "https://")): | |
| return origin | |
| if origin.endswith(".git"): | |
| origin = origin[:-4] | |
| origin = origin.replace(":", "/") | |
| origin = prefix + origin | |
| return origin | |
| def parse_origin(cls, origin, safe=True): | |
| parse = legacy.urlparse(origin) | |
| if safe and not parse.scheme: | |
| origin = cls.norm_origin(origin) | |
| return cls.parse_origin(origin, safe=False) | |
| scheme = parse.scheme if parse.scheme else "ssh" | |
| username = parse.username if parse.username else None | |
| password = parse.password if parse.password else None | |
| hostname = parse.hostname if parse.hostname else None | |
| path = parse.path if parse.path else None | |
| return dict( | |
| scheme=scheme, | |
| username=username, | |
| password=password, | |
| hostname=hostname, | |
| path=path, | |
| ) | |
| def _wrap_error(cls, result, raise_e=False): | |
| code = result["code"] | |
| if code == 0: | |
| return False | |
| if raise_e: | |
| raise exceptions.OperationalError( | |
| message=result.get("stderr", "") or result.get("stdout", "") | |
| ) | |
| return True | |