AmmarFahmy
adding all files
105b369
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from pydantic import FilePath
from phi.resource.base import ResourceBase
from phi.k8s.api_client import K8sApiClient
from phi.k8s.constants import DEFAULT_K8S_NAMESPACE
from phi.k8s.helm.cli import run_shell_command
from phi.cli.console import print_info
from phi.utils.log import logger
class HelmChart(ResourceBase):
chart: str
set: Optional[Dict[str, Any]] = None
values: Optional[Union[FilePath, List[FilePath]]] = None
flags: Optional[List[str]] = None
namespace: Optional[str] = None
create_namespace: bool = True
repo: Optional[str] = None
update_repo_before_install: bool = True
k8s_client: Optional[K8sApiClient] = None
resource_type: str = "Chart"
def get_resource_name(self) -> str:
return self.name
def get_namespace(self) -> str:
if self.namespace is not None:
return self.namespace
return DEFAULT_K8S_NAMESPACE
def get_k8s_client(self) -> K8sApiClient:
if self.k8s_client is not None:
return self.k8s_client
self.k8s_client = K8sApiClient()
return self.k8s_client
def _read(self, k8s_client: K8sApiClient) -> Any:
try:
logger.info(f"Getting helm chart: {self.name}\n")
get_args = ["helm", "get", "manifest", self.name]
if self.namespace is not None:
get_args.append(f"--namespace={self.namespace}")
get_result = run_shell_command(get_args, display_result=False, display_error=False)
if get_result.stdout:
import yaml
return yaml.safe_load_all(get_result.stdout)
except Exception:
pass
return None
def read(self, k8s_client: K8sApiClient) -> Any:
# Step 1: Use cached value if available
if self.use_cache and self.active_resource is not None:
return self.active_resource
# Step 2: Skip resource creation if skip_read = True
if self.skip_read:
print_info(f"Skipping read: {self.get_resource_name()}")
return True
# Step 3: Read resource
client: K8sApiClient = k8s_client or self.get_k8s_client()
return self._read(client)
def is_active(self, k8s_client: K8sApiClient) -> bool:
"""Returns True if the resource is active on the k8s cluster"""
self.active_resource = self._read(k8s_client=k8s_client)
return True if self.active_resource is not None else False
def _create(self, k8s_client: K8sApiClient) -> bool:
if self.repo:
try:
logger.info(f"Adding helm repo: {self.name} {self.repo}\n")
add_args = ["helm", "repo", "add", self.name, self.repo]
run_shell_command(add_args)
if self.update_repo_before_install:
logger.info(f"Updating helm repo: {self.name}\n")
update_args = ["helm", "repo", "update", self.name]
run_shell_command(update_args)
except Exception as e:
logger.error(f"Failed to add helm repo: {e}")
return False
try:
logger.info(f"Installing helm chart: {self.name}\n")
install_args = ["helm", "install", self.name, self.chart]
if self.set is not None:
for key, value in self.set.items():
install_args.append(f"--set {key}={value}")
if self.flags:
install_args.extend(self.flags)
if self.values:
if isinstance(self.values, Path):
install_args.append(f"--values={str(self.values)}")
elif isinstance(self.values, list):
for value in self.values:
install_args.append(f"--values={str(value)}")
if self.namespace is not None:
install_args.append(f"--namespace={self.namespace}")
if self.create_namespace:
install_args.append("--create-namespace")
run_shell_command(install_args)
return True
except Exception as e:
logger.error(f"Failed to install helm chart: {e}")
return False
def create(self, k8s_client: K8sApiClient) -> bool:
# Step 1: Skip resource creation if skip_create = True
if self.skip_create:
print_info(f"Skipping create: {self.get_resource_name()}")
return True
# Step 2: Check if resource is active and use_cache = True
client: K8sApiClient = k8s_client or self.get_k8s_client()
if self.use_cache and self.is_active(client):
self.resource_created = True
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} already exists")
return True
# Step 3: Create the resource
else:
self.resource_created = self._create(client)
if self.resource_created:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} created")
# Step 4: Run post create steps
if self.resource_created:
if self.save_output:
self.save_output_file()
logger.debug(f"Running post-create for {self.get_resource_type()}: {self.get_resource_name()}")
return self.post_create(client)
logger.error(f"Failed to create {self.get_resource_type()}: {self.get_resource_name()}")
return self.resource_created
def post_create(self, k8s_client: K8sApiClient) -> bool:
return True
def _update(self, k8s_client: K8sApiClient) -> Any:
try:
logger.info(f"Updating helm chart: {self.name}\n")
update_args = ["helm", "upgrade", self.name, self.chart]
if self.set is not None:
for key, value in self.set.items():
update_args.append(f"--set {key}={value}")
if self.flags:
update_args.extend(self.flags)
if self.values:
if isinstance(self.values, Path):
update_args.append(f"--values={str(self.values)}")
if self.namespace is not None:
update_args.append(f"--namespace={self.namespace}")
run_shell_command(update_args)
return True
except Exception as e:
logger.error(f"Failed to update helm chart: {e}")
return False
def update(self, k8s_client: K8sApiClient) -> bool:
# Step 1: Skip resource update if skip_update = True
if self.skip_update:
print_info(f"Skipping update: {self.get_resource_name()}")
return True
# Step 2: Update the resource
client: K8sApiClient = k8s_client or self.get_k8s_client()
if self.is_active(client):
self.resource_updated = self._update(client)
else:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} does not exist")
return True
# Step 3: Run post update steps
if self.resource_updated:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} updated")
if self.save_output:
self.save_output_file()
logger.debug(f"Running post-update for {self.get_resource_type()}: {self.get_resource_name()}")
return self.post_update(client)
logger.error(f"Failed to update {self.get_resource_type()}: {self.get_resource_name()}")
return self.resource_updated
def post_update(self, k8s_client: K8sApiClient) -> bool:
return True
def _delete(self, k8s_client: K8sApiClient) -> Any:
try:
logger.info(f"Deleting helm chart: {self.name}\n")
delete_args = ["helm", "uninstall", self.name]
if self.namespace is not None:
delete_args.append(f"--namespace={self.namespace}")
run_shell_command(delete_args)
return True
except Exception as e:
logger.error(f"Failed to delete helm chart: {e}")
return False
def delete(self, k8s_client: K8sApiClient) -> bool:
# Step 1: Skip resource deletion if skip_delete = True
if self.skip_delete:
print_info(f"Skipping delete: {self.get_resource_name()}")
return True
# Step 2: Delete the resource
client: K8sApiClient = k8s_client or self.get_k8s_client()
if self.is_active(client):
self.resource_deleted = self._delete(client)
else:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} does not exist")
return True
# Step 3: Run post delete steps
if self.resource_deleted:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} deleted")
if self.save_output:
self.delete_output_file()
logger.debug(f"Running post-delete for {self.get_resource_type()}: {self.get_resource_name()}.")
return self.post_delete(client)
logger.error(f"Failed to delete {self.get_resource_type()}: {self.get_resource_name()}")
return self.resource_deleted
def post_delete(self, k8s_client: K8sApiClient) -> bool:
return True