| """Kubernetes manifest validator and simulator — deterministic, rule-based.""" |
|
|
| import re |
| from typing import Any, Dict, List, Optional |
|
|
| import yaml |
|
|
| from server.models import FileContent |
|
|
|
|
| |
| VALID_KINDS = { |
| "Deployment", "StatefulSet", "DaemonSet", "ReplicaSet", |
| "Pod", "Service", "Ingress", "ConfigMap", "Secret", |
| "PersistentVolumeClaim", "PersistentVolume", |
| "Job", "CronJob", "Namespace", "ServiceAccount", |
| "Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding", |
| "HorizontalPodAutoscaler", "NetworkPolicy", |
| } |
|
|
| VALID_API_VERSIONS = { |
| "v1", "apps/v1", "batch/v1", "networking.k8s.io/v1", |
| "rbac.authorization.k8s.io/v1", "autoscaling/v2", |
| "autoscaling/v1", "policy/v1", |
| } |
|
|
|
|
| def _parse_memory(mem_str: str) -> int: |
| """Parse K8s memory string to bytes.""" |
| mem_str = str(mem_str).strip() |
| multipliers = { |
| "Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4, |
| "K": 1000, "M": 1000**2, "G": 1000**3, "T": 1000**4, |
| } |
| for suffix, mult in multipliers.items(): |
| if mem_str.endswith(suffix): |
| return int(mem_str[:-len(suffix)]) * mult |
| if mem_str.isdigit(): |
| return int(mem_str) |
| return 0 |
|
|
|
|
| class KubernetesSimulator: |
| """Simulates kubectl apply / kubectl get output. |
| |
| Validates K8s manifests without a real cluster. |
| """ |
|
|
| def validate(self, manifests: Dict[str, FileContent]) -> Dict[str, Any]: |
| """Validate all Kubernetes manifests in the file set. |
| |
| Returns dict with keys: |
| valid: bool |
| errors: list of error strings |
| pod_status: simulated pod status |
| service_status: simulated service endpoint status |
| """ |
| k8s_files: Dict[str, Any] = {} |
| errors: List[str] = [] |
|
|
| |
| for path, fc in manifests.items(): |
| if fc.file_type.value != "kubernetes": |
| continue |
| try: |
| docs = list(yaml.safe_load_all(fc.content)) |
| for doc in docs: |
| if doc and isinstance(doc, dict): |
| k8s_files[path] = doc |
| except yaml.YAMLError as exc: |
| errors.append(f"YAML parse error in {path}: {exc}") |
|
|
| if not k8s_files and not errors: |
| return {"valid": True, "errors": [], "pod_status": "N/A", "service_status": "N/A"} |
|
|
| if errors: |
| return {"valid": False, "errors": errors, "pod_status": "Error", "service_status": "Error"} |
|
|
| |
| all_resources: List[Dict[str, Any]] = [] |
| for path, doc in k8s_files.items(): |
| resource_errors = self._validate_resource(path, doc) |
| errors.extend(resource_errors) |
| all_resources.append({"path": path, "doc": doc}) |
|
|
| |
| cross_errors = self._validate_cross_resources(all_resources) |
| errors.extend(cross_errors) |
|
|
| |
| pod_status = self._simulate_pod_status(all_resources) |
| service_status = self._simulate_service_status(all_resources) |
|
|
| return { |
| "valid": len(errors) == 0, |
| "errors": errors, |
| "pod_status": pod_status, |
| "service_status": service_status, |
| } |
|
|
| def _validate_resource(self, path: str, doc: Dict[str, Any]) -> List[str]: |
| """Validate a single K8s resource document.""" |
| errors: List[str] = [] |
|
|
| kind = doc.get("kind", "") |
| api_version = doc.get("apiVersion", "") |
|
|
| if not kind: |
| errors.append(f"{path}: missing 'kind' field") |
| elif kind not in VALID_KINDS: |
| errors.append(f"{path}: unknown kind '{kind}'") |
|
|
| if not api_version: |
| errors.append(f"{path}: missing 'apiVersion' field") |
| elif api_version not in VALID_API_VERSIONS: |
| errors.append(f"{path}: unknown apiVersion '{api_version}'") |
|
|
| metadata = doc.get("metadata", {}) |
| if not isinstance(metadata, dict) or not metadata.get("name"): |
| errors.append(f"{path}: metadata.name is required") |
|
|
| |
| if kind == "Deployment": |
| errors.extend(self._validate_deployment(path, doc)) |
| elif kind == "Service": |
| errors.extend(self._validate_service(path, doc)) |
| elif kind == "Ingress": |
| errors.extend(self._validate_ingress(path, doc)) |
|
|
| return errors |
|
|
| def _validate_deployment(self, path: str, doc: Dict[str, Any]) -> List[str]: |
| errors: List[str] = [] |
| spec = doc.get("spec", {}) |
| if not isinstance(spec, dict): |
| errors.append(f"{path}: Deployment spec must be a mapping") |
| return errors |
|
|
| selector = spec.get("selector", {}) |
| template = spec.get("template", {}) |
|
|
| if not selector or not selector.get("matchLabels"): |
| errors.append(f"{path}: Deployment must have spec.selector.matchLabels") |
| return errors |
|
|
| tmpl_labels = template.get("metadata", {}).get("labels", {}) |
| sel_labels = selector.get("matchLabels", {}) |
|
|
| |
| for k, v in sel_labels.items(): |
| if tmpl_labels.get(k) != v: |
| errors.append( |
| f"{path}: selector matchLabels ({k}={v}) does not match template labels" |
| ) |
|
|
| |
| containers = template.get("spec", {}).get("containers", []) |
| if not containers: |
| errors.append(f"{path}: Deployment must have at least one container") |
|
|
| for c in containers: |
| if not c.get("image"): |
| errors.append(f"{path}: container '{c.get('name', '?')}' missing image") |
|
|
| return errors |
|
|
| def _validate_service(self, path: str, doc: Dict[str, Any]) -> List[str]: |
| errors: List[str] = [] |
| spec = doc.get("spec", {}) |
| if not isinstance(spec, dict): |
| errors.append(f"{path}: Service spec must be a mapping") |
| return errors |
|
|
| if not spec.get("selector"): |
| errors.append(f"{path}: Service must have spec.selector") |
|
|
| ports = spec.get("ports", []) |
| if not ports: |
| errors.append(f"{path}: Service must define at least one port") |
|
|
| for p in ports: |
| if not p.get("port"): |
| errors.append(f"{path}: Service port entry missing 'port' field") |
|
|
| return errors |
|
|
| def _validate_ingress(self, path: str, doc: Dict[str, Any]) -> List[str]: |
| errors: List[str] = [] |
| spec = doc.get("spec", {}) |
| rules = spec.get("rules", []) |
| if not rules: |
| errors.append(f"{path}: Ingress must define at least one rule") |
| return errors |
|
|
| def _validate_cross_resources(self, resources: List[Dict[str, Any]]) -> List[str]: |
| """Validate cross-resource dependencies (e.g. Service selector matches Deployment labels).""" |
| errors: List[str] = [] |
|
|
| |
| pod_labels_by_name: Dict[str, Dict[str, str]] = {} |
| for r in resources: |
| doc = r["doc"] |
| kind = doc.get("kind", "") |
| if kind in ("Deployment", "StatefulSet", "DaemonSet"): |
| tmpl = doc.get("spec", {}).get("template", {}) |
| labels = tmpl.get("metadata", {}).get("labels", {}) |
| name = doc.get("metadata", {}).get("name", "?") |
| pod_labels_by_name[name] = labels |
|
|
| |
| for r in resources: |
| doc = r["doc"] |
| if doc.get("kind") != "Service": |
| continue |
| svc_name = doc.get("metadata", {}).get("name", "?") |
| selector = doc.get("spec", {}).get("selector", {}) |
| if not selector: |
| continue |
|
|
| matched = False |
| for dep_name, labels in pod_labels_by_name.items(): |
| if all(labels.get(k) == v for k, v in selector.items()): |
| matched = True |
| break |
| if not matched and pod_labels_by_name: |
| errors.append( |
| f"Service '{svc_name}' selector {selector} does not match any pod labels" |
| ) |
|
|
| return errors |
|
|
| def _simulate_pod_status(self, resources: List[Dict[str, Any]]) -> str: |
| """Simulate what pod status would be.""" |
| for r in resources: |
| doc = r["doc"] |
| kind = doc.get("kind", "") |
| if kind not in ("Deployment", "StatefulSet", "DaemonSet", "Pod"): |
| continue |
|
|
| if kind == "Pod": |
| containers = doc.get("spec", {}).get("containers", []) |
| else: |
| containers = doc.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []) |
|
|
| for c in containers: |
| image = c.get("image", "") |
|
|
| |
| if image and ":" in image: |
| tag = image.split(":")[-1] |
| if tag in ("latset", "lates", "latets"): |
| return "ImagePullBackOff" |
|
|
| |
| if "OWNER/REPO" in image or "TAG" in image: |
| return "ImagePullBackOff" |
|
|
| |
| resources_spec = c.get("resources", {}) |
| limits = resources_spec.get("limits", {}) |
| mem_limit = limits.get("memory", "") |
| if mem_limit: |
| mem_bytes = _parse_memory(str(mem_limit)) |
| |
| if 0 < mem_bytes < 128 * 1024 * 1024: |
| return "CrashLoopBackOff (OOMKilled)" |
|
|
| |
| command = c.get("command", []) |
| if command and isinstance(command, list): |
| if any("wrong" in str(cmd).lower() or "typo" in str(cmd).lower() for cmd in command): |
| return "CrashLoopBackOff" |
|
|
| |
| env_from = c.get("envFrom", []) |
| for ef in env_from: |
| cm_ref = ef.get("configMapRef", {}) |
| if cm_ref and cm_ref.get("name"): |
| |
| cm_exists = any( |
| res["doc"].get("kind") == "ConfigMap" |
| and res["doc"].get("metadata", {}).get("name") == cm_ref["name"] |
| for res in resources |
| ) |
| if not cm_exists: |
| return f"CreateContainerConfigError (ConfigMap '{cm_ref['name']}' not found)" |
|
|
| return "Running" |
|
|
| def _simulate_service_status(self, resources: List[Dict[str, Any]]) -> str: |
| """Simulate service endpoint status.""" |
| services = [r for r in resources if r["doc"].get("kind") == "Service"] |
| deployments = [r for r in resources if r["doc"].get("kind") in ("Deployment", "StatefulSet")] |
|
|
| if not services: |
| return "N/A" |
|
|
| for svc_r in services: |
| svc = svc_r["doc"] |
| selector = svc.get("spec", {}).get("selector", {}) |
| if not selector: |
| continue |
|
|
| matched = False |
| for dep_r in deployments: |
| dep = dep_r["doc"] |
| tmpl_labels = dep.get("spec", {}).get("template", {}).get("metadata", {}).get("labels", {}) |
| if all(tmpl_labels.get(k) == v for k, v in selector.items()): |
| matched = True |
|
|
| |
| svc_ports = svc.get("spec", {}).get("ports", []) |
| container_ports = [] |
| for c in dep.get("spec", {}).get("template", {}).get("spec", {}).get("containers", []): |
| for p in (c.get("ports") or []): |
| container_ports.append(p.get("containerPort")) |
|
|
| for sp in svc_ports: |
| tp = sp.get("targetPort") |
| if tp and tp not in container_ports and container_ports: |
| return f"Service port mismatch: targetPort {tp} not in container ports {container_ports}" |
| break |
|
|
| if not matched: |
| svc_name = svc.get("metadata", {}).get("name", "?") |
| return f"No endpoints (selector {selector} matches no pods)" |
|
|
| return "Endpoints active" |
|
|