Spaces:
Runtime error
Runtime error
File size: 10,016 Bytes
105b369 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
from typing import List, Optional
from kubernetes.client import AppsV1Api
from kubernetes.client.models.v1_deployment import V1Deployment
from kubernetes.client.models.v1_deployment_list import V1DeploymentList
from kubernetes.client.models.v1_deployment_spec import V1DeploymentSpec
from kubernetes.client.models.v1_status import V1Status
from pydantic import Field
from phi.k8s.api_client import K8sApiClient
from phi.k8s.resource.base import K8sResource, K8sObject
from phi.k8s.resource.apps.v1.deployment_strategy import DeploymentStrategy
from phi.k8s.resource.core.v1.pod_template_spec import PodTemplateSpec
from phi.k8s.resource.meta.v1.label_selector import LabelSelector
from phi.utils.dttm import current_datetime_utc_str
from phi.utils.log import logger
class DeploymentSpec(K8sObject):
"""
Reference:
- https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#deploymentspec-v1-apps
"""
resource_type: str = "DeploymentSpec"
# Minimum number of seconds for which a newly created pod should be ready
# without any of its container crashing, for it to be considered available.
# Defaults to 0 (pod will be considered available as soon as it is ready)
min_ready_seconds: Optional[int] = Field(None, alias="minReadySeconds")
# Indicates that the deployment is paused.
paused: Optional[bool] = None
# The maximum time in seconds for a deployment to make progress before it is considered to be failed.
# The deployment controller will continue to process failed deployments and a condition with a
# ProgressDeadlineExceeded reason will be surfaced in the deployment status.
# Note that progress will not be estimated during the time a deployment is paused.
# Defaults to 600s.
progress_deadline_seconds: Optional[int] = Field(None, alias="progressDeadlineSeconds")
replicas: Optional[int] = None
# The number of old ReplicaSets to retain to allow rollback.
# This is a pointer to distinguish between explicit zero and not specified.
# Defaults to 10.
revision_history_limit: Optional[int] = Field(None, alias="revisionHistoryLimit")
# The selector field defines how the Deployment finds which Pods to manage
selector: LabelSelector
strategy: Optional[DeploymentStrategy] = None
template: PodTemplateSpec
def get_k8s_object(self) -> V1DeploymentSpec:
# Return a V1DeploymentSpec object
# https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_deployment_spec.py
_strategy = self.strategy.get_k8s_object() if self.strategy else None
_v1_deployment_spec = V1DeploymentSpec(
min_ready_seconds=self.min_ready_seconds,
paused=self.paused,
progress_deadline_seconds=self.progress_deadline_seconds,
replicas=self.replicas,
revision_history_limit=self.revision_history_limit,
selector=self.selector.get_k8s_object(),
strategy=_strategy,
template=self.template.get_k8s_object(),
)
return _v1_deployment_spec
class Deployment(K8sResource):
"""
Deployments are used to run containers.
Containers are run in Pods or ReplicaSets, and Deployments manages those Pods or ReplicaSets.
A Deployment provides declarative updates for Pods and ReplicaSets.
References:
- Docs:
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#deployment-v1-apps
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
- Type: https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_deployment.py
"""
resource_type: str = "Deployment"
spec: DeploymentSpec
# If True, adds `kubectl.kubernetes.io/restartedAt` annotation on update
# so the deployment is restarted even without any data change
restart_on_update: bool = True
# List of attributes to include in the K8s manifest
fields_for_k8s_manifest: List[str] = ["spec"]
def get_k8s_object(self) -> V1Deployment:
"""Creates a body for this Deployment"""
# Return a V1Deployment object
# https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_deployment.py
_v1_deployment = V1Deployment(
api_version=self.api_version.value,
kind=self.kind.value,
metadata=self.metadata.get_k8s_object(),
spec=self.spec.get_k8s_object(),
)
return _v1_deployment
@staticmethod
def get_from_cluster(
k8s_client: K8sApiClient, namespace: Optional[str] = None, **kwargs
) -> Optional[List[V1Deployment]]:
"""Reads Deployments from K8s cluster.
Args:
k8s_client: The K8sApiClient for the current Cluster
namespace: Namespace to use.
"""
apps_v1_api: AppsV1Api = k8s_client.apps_v1_api
deploy_list: Optional[V1DeploymentList] = None
if namespace:
# logger.debug(f"Getting deploys for ns: {namespace}")
deploy_list = apps_v1_api.list_namespaced_deployment(namespace=namespace, **kwargs)
else:
# logger.debug("Getting deploys for all namespaces")
deploy_list = apps_v1_api.list_deployment_for_all_namespaces(**kwargs)
deploys: Optional[List[V1Deployment]] = None
if deploy_list:
deploys = deploy_list.items
# logger.debug(f"deploys: {deploys}")
# logger.debug(f"deploys type: {type(deploys)}")
return deploys
def _create(self, k8s_client: K8sApiClient) -> bool:
apps_v1_api: AppsV1Api = k8s_client.apps_v1_api
k8s_object: V1Deployment = self.get_k8s_object()
namespace = self.get_namespace()
logger.debug("Creating: {}".format(self.get_resource_name()))
v1_deployment: V1Deployment = apps_v1_api.create_namespaced_deployment(
namespace=namespace,
body=k8s_object,
async_req=self.async_req,
pretty=self.pretty,
)
# logger.debug("Created: {}".format(v1_deployment))
if v1_deployment.metadata.creation_timestamp is not None:
logger.debug("Deployment Created")
self.active_resource = v1_deployment
return True
logger.error("Deployment could not be created")
return False
def _read(self, k8s_client: K8sApiClient) -> Optional[V1Deployment]:
"""Returns the "Active" Deployment from the cluster"""
namespace = self.get_namespace()
active_resource: Optional[V1Deployment] = None
active_resources: Optional[List[V1Deployment]] = self.get_from_cluster(
k8s_client=k8s_client,
namespace=namespace,
)
# logger.debug(f"Active Resources: {active_resources}")
if active_resources is None:
return None
active_resources_dict = {_deploy.metadata.name: _deploy for _deploy in active_resources}
deploy_name = self.get_resource_name()
if deploy_name in active_resources_dict:
active_resource = active_resources_dict[deploy_name]
self.active_resource = active_resource
logger.debug(f"Found active {deploy_name}")
return active_resource
def _update(self, k8s_client: K8sApiClient) -> bool:
if self.recreate_on_update:
logger.info("Recreating Deployment")
resource_deleted = self._delete(k8s_client=k8s_client)
if not resource_deleted:
logger.error("Could not delete resource, please delete manually")
return False
return self._create(k8s_client=k8s_client)
# update `spec.template.metadata` section
# to add `kubectl.kubernetes.io/restartedAt` annotation
# https://github.com/kubernetes-client/python/issues/1378#issuecomment-779323573
if self.restart_on_update:
if self.spec.template.metadata.annotations is None:
self.spec.template.metadata.annotations = {}
self.spec.template.metadata.annotations["kubectl.kubernetes.io/restartedAt"] = current_datetime_utc_str()
logger.debug(f"annotations: {self.spec.template.metadata.annotations}")
apps_v1_api: AppsV1Api = k8s_client.apps_v1_api
deploy_name = self.get_resource_name()
k8s_object: V1Deployment = self.get_k8s_object()
namespace = self.get_namespace()
logger.debug("Updating: {}".format(deploy_name))
v1_deployment: V1Deployment = apps_v1_api.patch_namespaced_deployment(
name=deploy_name,
namespace=namespace,
body=k8s_object,
async_req=self.async_req,
pretty=self.pretty,
)
# logger.debug("Updated: {}".format(v1_deployment))
if v1_deployment.metadata.creation_timestamp is not None:
logger.debug("Deployment Updated")
self.active_resource = v1_deployment
return True
logger.error("Deployment could not be updated")
return False
def _delete(self, k8s_client: K8sApiClient) -> bool:
apps_v1_api: AppsV1Api = k8s_client.apps_v1_api
deploy_name = self.get_resource_name()
namespace = self.get_namespace()
logger.debug("Deleting: {}".format(deploy_name))
self.active_resource = None
# https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_status.py
delete_status: V1Status = apps_v1_api.delete_namespaced_deployment(
name=deploy_name,
namespace=namespace,
async_req=self.async_req,
pretty=self.pretty,
)
logger.debug("delete_status: {}".format(delete_status.status))
if delete_status.status == "Success":
logger.debug("Deployment Deleted")
return True
logger.error("Deployment could not be deleted")
return False
|