|
import os |
|
import json |
|
from typing import Tuple |
|
from easydict import EasyDict |
|
import yaml |
|
import subprocess |
|
from enum import Enum, unique |
|
from ding.interaction.base import split_http_address |
|
from .default_helper import one_time_warning |
|
|
|
DEFAULT_NAMESPACE = 'default' |
|
DEFAULT_POD_NAME = 'dijob-example-coordinator' |
|
DEFAULT_API_VERSION = '/v1alpha1' |
|
|
|
DEFAULT_K8S_COLLECTOR_PORT = 22270 |
|
DEFAULT_K8S_LEARNER_PORT = 22271 |
|
DEFAULT_K8S_AGGREGATOR_SLAVE_PORT = 22272 |
|
DEFAULT_K8S_COORDINATOR_PORT = 22273 |
|
DEFAULT_K8S_AGGREGATOR_MASTER_PORT = 22273 |
|
|
|
|
|
def get_operator_server_kwargs(cfg: EasyDict) -> dict: |
|
""" |
|
Overview: |
|
Get kwarg dict from config file |
|
Arguments: |
|
- cfg (:obj:`EasyDict`) System config |
|
Returns: |
|
- result (:obj:`dict`) Containing ``api_version``, ``namespace``, ``name``, ``port``, ``host``. |
|
""" |
|
|
|
namespace = os.environ.get('KUBERNETES_POD_NAMESPACE', DEFAULT_NAMESPACE) |
|
name = os.environ.get('KUBERNETES_POD_NAME', DEFAULT_POD_NAME) |
|
url = cfg.get('system_addr', None) or os.environ.get('KUBERNETES_SERVER_URL', None) |
|
assert url, 'please set environment variable KUBERNETES_SERVER_URL in Kubenetes platform.' |
|
api_version = cfg.get('api_version', None) \ |
|
or os.environ.get('KUBERNETES_SERVER_API_VERSION', DEFAULT_API_VERSION) |
|
try: |
|
host, port = url.split(":")[0], int(url.split(":")[1]) |
|
except Exception as e: |
|
host, port, _, _ = split_http_address(url) |
|
|
|
return { |
|
'api_version': api_version, |
|
'namespace': namespace, |
|
'name': name, |
|
'host': host, |
|
'port': port, |
|
} |
|
|
|
|
|
def exist_operator_server() -> bool: |
|
""" |
|
Overview: |
|
Check if the 'KUBERNETES_SERVER_URL' environment variable exists. |
|
""" |
|
|
|
return 'KUBERNETES_SERVER_URL' in os.environ |
|
|
|
|
|
def pod_exec_command(kubeconfig: str, name: str, namespace: str, cmd: str) -> Tuple[int, str]: |
|
""" |
|
Overview: |
|
Execute command in pod |
|
Arguments: |
|
- kubeconfig (:obj:`str`) The path of kubeconfig file |
|
- name (:obj:`str`) The name of pod |
|
- namespace (:obj:`str`) The namespace of pod |
|
""" |
|
|
|
try: |
|
from kubernetes import config |
|
from kubernetes.client import CoreV1Api |
|
from kubernetes.client.rest import ApiException |
|
from kubernetes.stream import stream |
|
except ModuleNotFoundError as e: |
|
one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") |
|
exit(-1) |
|
|
|
config.load_kube_config(config_file=kubeconfig) |
|
core_v1 = CoreV1Api() |
|
resp = None |
|
try: |
|
resp = core_v1.read_namespaced_pod(name=name, namespace=namespace) |
|
except ApiException as e: |
|
if e.status != 404: |
|
return -1, "Unknown error: %s" % e |
|
if not resp: |
|
return -1, f"Pod {name} does not exist." |
|
if resp.status.phase != 'Running': |
|
return -1, f"Pod {name} is not in Running." |
|
exec_command = ['/bin/sh', '-c', cmd] |
|
resp = stream( |
|
core_v1.connect_get_namespaced_pod_exec, |
|
name, |
|
namespace, |
|
command=exec_command, |
|
stderr=False, |
|
stdin=False, |
|
stdout=True, |
|
tty=False |
|
) |
|
resp = resp.replace("\'", "\"") \ |
|
.replace('None', 'null') \ |
|
.replace(': False', ': 0') \ |
|
.replace(': True', ': 1') \ |
|
.replace('"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"', '\\"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\\"') |
|
resp = json.loads(resp) |
|
return resp['code'], resp['message'] |
|
|
|
|
|
@unique |
|
class K8sType(Enum): |
|
Local = 1 |
|
K3s = 2 |
|
|
|
|
|
class K8sLauncher(object): |
|
""" |
|
Overview: |
|
object to manage the K8s cluster |
|
Interfaces: |
|
``__init__``, ``_load``, ``create_cluster``, ``_check_k3d_tools``, ``delete_cluster``, ``preload_images`` |
|
""" |
|
|
|
def __init__(self, config_path: str) -> None: |
|
""" |
|
Overview: |
|
Initialize the K8sLauncher object. |
|
Arguments: |
|
- config_path (:obj:`str`): The path of the config file. |
|
""" |
|
|
|
self.name = None |
|
self.servers = 1 |
|
self.agents = 0 |
|
self.type = K8sType.Local |
|
self._images = [] |
|
|
|
self._load(config_path) |
|
self._check_k3d_tools() |
|
|
|
def _load(self, config_path: str) -> None: |
|
""" |
|
Overview: |
|
Load the config file. |
|
Arguments: |
|
- config_path (:obj:`str`): The path of the config file. |
|
""" |
|
|
|
with open(config_path, 'r') as f: |
|
data = yaml.safe_load(f) |
|
self.name = data.get('name') if data.get('name') else self.name |
|
if data.get('servers'): |
|
if type(data.get('servers')) is not int: |
|
raise TypeError(f"servers' type is expected int, actual {type(data.get('servers'))}") |
|
self.servers = data.get('servers') |
|
if data.get('agents'): |
|
if type(data.get('agents')) is not int: |
|
raise TypeError(f"agents' type is expected int, actual {type(data.get('agents'))}") |
|
self.agents = data.get('agents') |
|
if data.get('type'): |
|
if data.get('type') == 'k3s': |
|
self.type = K8sType.K3s |
|
elif data.get('type') == 'local': |
|
self.type = K8sType.Local |
|
else: |
|
raise ValueError(f"no type found for {data.get('type')}") |
|
if data.get('preload_images'): |
|
if type(data.get('preload_images')) is not list: |
|
raise TypeError(f"preload_images' type is expected list, actual {type(data.get('preload_images'))}") |
|
self._images = data.get('preload_images') |
|
|
|
def _check_k3d_tools(self) -> None: |
|
""" |
|
Overview: |
|
Check if the k3d tools exist. |
|
""" |
|
|
|
if self.type != K8sType.K3s: |
|
return |
|
args = ['which', 'k3d'] |
|
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
out, _ = proc.communicate() |
|
if out.decode('utf-8') == '': |
|
raise FileNotFoundError( |
|
"No k3d tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" |
|
) |
|
|
|
def create_cluster(self) -> None: |
|
""" |
|
Overview: |
|
Create the k8s cluster. |
|
""" |
|
|
|
print('Creating k8s cluster...') |
|
if self.type != K8sType.K3s: |
|
return |
|
args = ['k3d', 'cluster', 'create', f'{self.name}', f'--servers={self.servers}', f'--agents={self.agents}'] |
|
proc = subprocess.Popen(args, stderr=subprocess.PIPE) |
|
_, err = proc.communicate() |
|
err_str = err.decode('utf-8').strip() |
|
if err_str != '' and 'WARN' not in err_str: |
|
if 'already exists' in err_str: |
|
print('K8s cluster already exists') |
|
else: |
|
raise RuntimeError(f'Failed to create cluster {self.name}: {err_str}') |
|
|
|
|
|
self.preload_images(self._images) |
|
|
|
def delete_cluster(self) -> None: |
|
""" |
|
Overview: |
|
Delete the k8s cluster. |
|
""" |
|
|
|
print('Deleting k8s cluster...') |
|
if self.type != K8sType.K3s: |
|
return |
|
args = ['k3d', 'cluster', 'delete', f'{self.name}'] |
|
proc = subprocess.Popen(args, stderr=subprocess.PIPE) |
|
_, err = proc.communicate() |
|
err_str = err.decode('utf-8').strip() |
|
if err_str != '' and 'WARN' not in err_str and \ |
|
'NotFound' not in err_str: |
|
raise RuntimeError(f'Failed to delete cluster {self.name}: {err_str}') |
|
|
|
def preload_images(self, images: list) -> None: |
|
""" |
|
Overview: |
|
Preload images. |
|
""" |
|
|
|
if self.type != K8sType.K3s or len(images) == 0: |
|
return |
|
args = ['k3d', 'image', 'import', f'--cluster={self.name}'] |
|
args += images |
|
|
|
proc = subprocess.Popen(args, stderr=subprocess.PIPE) |
|
_, err = proc.communicate() |
|
err_str = err.decode('utf-8').strip() |
|
if err_str != '' and 'WARN' not in err_str: |
|
raise RuntimeError(f'Failed to preload images: {err_str}') |
|
|