File size: 59,192 Bytes
105b369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
from collections import OrderedDict
from enum import Enum
from pathlib import Path
from typing import Optional, Dict, Any, Union, List, TYPE_CHECKING
from typing_extensions import Literal

from pydantic import field_validator, Field, model_validator
from pydantic_core.core_schema import FieldValidationInfo

from phi.app.base import AppBase
from phi.app.context import ContainerContext
from phi.k8s.app.context import K8sBuildContext
from phi.k8s.enums.restart_policy import RestartPolicy
from phi.k8s.enums.image_pull_policy import ImagePullPolicy
from phi.k8s.enums.service_type import ServiceType
from phi.utils.log import logger

if TYPE_CHECKING:
    from phi.k8s.resource.base import K8sResource


class K8sWorkspaceVolumeType(str, Enum):
    HostPath = "HostPath"
    EmptyDir = "EmptyDir"


class AppVolumeType(str, Enum):
    HostPath = "HostPath"
    EmptyDir = "EmptyDir"
    AwsEbs = "AwsEbs"
    AwsEfs = "AwsEfs"
    PersistentVolume = "PersistentVolume"


class LoadBalancerProvider(str, Enum):
    AWS = "AWS"


class K8sApp(AppBase):
    # -*- Workspace Configuration
    # Path to the workspace directory inside the container
    # NOTE: if workspace_parent_dir_container_path is provided
    #   workspace_dir_container_path is ignored and
    #   derived using {workspace_parent_dir_container_path}/{workspace_name}
    workspace_dir_container_path: str = "/usr/local/app"
    # Path to the parent directory of the workspace inside the container
    # When using git-sync, the git repo is cloned inside this directory
    #   i.e. this is the parent directory of the workspace
    workspace_parent_dir_container_path: Optional[str] = None

    # Mount the workspace directory inside the container
    mount_workspace: bool = False
    # -*- If workspace_volume_type is None or K8sWorkspaceVolumeType.EmptyDir
    #   Create an empty volume with the name workspace_volume_name
    #   which is mounted to workspace_parent_dir_container_path
    # -*- If workspace_volume_type is K8sWorkspaceVolumeType.HostPath
    #   Mount the workspace_root to workspace_dir_container_path
    #   i.e. {workspace_parent_dir_container_path}/{workspace_name}
    workspace_volume_type: Optional[K8sWorkspaceVolumeType] = None
    workspace_volume_name: Optional[str] = None
    # Load the workspace from git using a git-sync sidecar
    enable_gitsync: bool = False
    # Use an init-container to create an initial copy of the workspace
    create_gitsync_init_container: bool = True
    gitsync_image_name: str = "registry.k8s.io/git-sync/git-sync"
    gitsync_image_tag: str = "v4.0.0"
    # Repository to sync
    gitsync_repo: Optional[str] = None
    # Branch to sync
    gitsync_ref: Optional[str] = None
    gitsync_period: Optional[str] = None
    # Add configuration using env vars to the gitsync container
    gitsync_env: Optional[Dict[str, str]] = None

    # -*- App Volume
    # Create a volume for container storage
    # Used for mounting app data like database, notebooks, models, etc.
    create_volume: bool = False
    volume_name: Optional[str] = None
    volume_type: AppVolumeType = AppVolumeType.EmptyDir
    # Path to mount the app volume inside the container
    volume_container_path: str = "/mnt/app"
    # -*- If volume_type is HostPath
    volume_host_path: Optional[str] = None
    # -*- If volume_type is AwsEbs
    # Provide Ebs Volume-id manually
    ebs_volume_id: Optional[str] = None
    # OR derive the volume_id, region, and az from an EbsVolume resource
    ebs_volume: Optional[Any] = None
    ebs_volume_region: Optional[str] = None
    ebs_volume_az: Optional[str] = None
    # Add NodeSelectors to Pods, so they are scheduled in the same region and zone as the ebs_volume
    schedule_pods_in_ebs_topology: bool = True
    # -*- If volume_type=AppVolumeType.AwsEfs
    # Provide Efs Volume-id manually
    efs_volume_id: Optional[str] = None
    # OR derive the volume_id from an EfsVolume resource
    efs_volume: Optional[Any] = None
    # -*- If volume_type=AppVolumeType.PersistentVolume
    # AccessModes is a list of ways the volume can be mounted.
    # More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes#access-modes
    # Type: phidata.infra.k8s.enums.pv.PVAccessMode
    pv_access_modes: Optional[List[Any]] = None
    pv_requests_storage: Optional[str] = None
    # A list of mount options, e.g. ["ro", "soft"]. Not validated - mount will simply fail if one is invalid.
    # More info: https://kubernetes.io/docs/concepts/storage/persistent-volumes/#mount-options
    pv_mount_options: Optional[List[str]] = None
    # What happens to a persistent volume when released from its claim.
    #   The default policy is Retain.
    # Literal["Delete", "Recycle", "Retain"]
    pv_reclaim_policy: Optional[str] = None
    pv_storage_class: str = ""
    pv_labels: Optional[Dict[str, str]] = None

    # -*- Container Configuration
    container_name: Optional[str] = None
    container_labels: Optional[Dict[str, str]] = None

    # -*- Pod Configuration
    pod_name: Optional[str] = None
    pod_annotations: Optional[Dict[str, str]] = None
    pod_node_selector: Optional[Dict[str, str]] = None

    # -*- Secret Configuration
    secret_name: Optional[str] = None

    # -*- Configmap Configuration
    configmap_name: Optional[str] = None

    # -*- Deployment Configuration
    replicas: int = 1
    deploy_name: Optional[str] = None
    image_pull_policy: Optional[ImagePullPolicy] = None
    restart_policy: Optional[RestartPolicy] = None
    deploy_labels: Optional[Dict[str, Any]] = None
    termination_grace_period_seconds: Optional[int] = None
    # Key to spread the pods across a topology
    topology_spread_key: str = "kubernetes.io/hostname"
    # The degree to which pods may be unevenly distributed
    topology_spread_max_skew: int = 2
    # How to deal with a pod if it doesn't satisfy the spread constraint.
    topology_spread_when_unsatisfiable: Literal["DoNotSchedule", "ScheduleAnyway"] = "ScheduleAnyway"

    # -*- Service Configuration
    create_service: bool = False
    service_name: Optional[str] = None
    service_type: Optional[ServiceType] = None
    # -*- Enable HTTPS on the Service if service_type = ServiceType.LOAD_BALANCER
    # Must provide an ACM Certificate ARN or ACM Certificate Summary File to work
    enable_https: bool = False
    # The port exposed by the service
    # Preferred over port_number if both are set
    service_port: Optional[int] = Field(None, validate_default=True)
    # The node_port exposed by the service if service_type = ServiceType.NODE_PORT
    service_node_port: Optional[int] = None
    # The target_port is the port to access on the pods targeted by the service.
    # It can be the port number or port name on the pod.
    service_target_port: Optional[Union[str, int]] = None
    # Extra ports exposed by the service. Type: List[CreatePort]
    service_ports: Optional[List[Any]] = None
    # Labels to add to the service
    service_labels: Optional[Dict[str, Any]] = None
    # Annotations to add to the service
    service_annotations: Optional[Dict[str, str]] = None

    # -*- LoadBalancer configuration
    health_check_node_port: Optional[int] = None
    internal_traffic_policy: Optional[str] = None
    load_balancer_ip: Optional[str] = None
    # https://kubernetes-sigs.github.io/aws-load-balancer-controller/v2.5/guide/service/nlb/
    load_balancer_class: Optional[str] = None
    # Limit the IPs that can access this endpoint
    # You can provide the load_balancer_source_ranges as a list here
    # or as LOAD_BALANCER_SOURCE_RANGES in the secrets_file
    # Using the secrets_file is recommended
    load_balancer_source_ranges: Optional[List[str]] = None
    allocate_load_balancer_node_ports: Optional[bool] = None

    # -*- AWS LoadBalancer configuration
    # If ServiceType == ServiceType.LoadBalancer, the load balancer is created using the AWS LoadBalancer Controller
    # and the following configuration are added as annotations to the service
    use_nlb: bool = True
    # Specifies the target type to configure for NLB. You can choose between instance and ip.
    # `instance` mode will route traffic to all EC2 instances within cluster on the NodePort opened for your service.
    #       service must be of type NodePort or LoadBalancer for instance targets
    #       for k8s 1.22 and later if spec.allocateLoadBalancerNodePorts is set to false,
    #       NodePort must be allocated manually
    # `ip` mode will route traffic directly to the pod IP.
    #       network plugin must use native AWS VPC networking configuration for pod IP,
    #       for example Amazon VPC CNI plugin.
    nlb_target_type: Literal["instance", "ip"] = "ip"
    # If None, default is "internet-facing"
    load_balancer_scheme: Literal["internal", "internet-facing"] = "internet-facing"
    # Write Access Logs to s3
    write_access_logs_to_s3: bool = False
    # The name of the aws S3 bucket where the access logs are stored
    access_logs_s3_bucket: Optional[str] = None
    # The logical hierarchy you created for your aws S3 bucket, for example `my-bucket-prefix/prod`
    access_logs_s3_bucket_prefix: Optional[str] = None
    acm_certificate_arn: Optional[str] = None
    acm_certificate_summary_file: Optional[Path] = None
    # Enable proxy protocol for NLB
    enable_load_balancer_proxy_protocol: bool = True
    # Enable cross-zone load balancing
    enable_cross_zone_load_balancing: bool = True
    # Manually specify the subnets to use for the load balancer
    load_balancer_subnets: Optional[List[str]] = None

    # -*- Ingress Configuration
    create_ingress: bool = False
    ingress_name: Optional[str] = None
    ingress_class_name: Literal["alb", "nlb"] = "alb"
    ingress_annotations: Optional[Dict[str, str]] = None

    # -*- Namespace Configuration
    create_namespace: bool = False
    # Create a Namespace with name ns_name & default values
    ns_name: Optional[str] = None
    # or Provide the full Namespace definition
    # Type: CreateNamespace
    namespace: Optional[Any] = None

    # -*- RBAC Configuration
    # If create_rbac = True, create a ServiceAccount, ClusterRole, and ClusterRoleBinding
    create_rbac: bool = False
    # -*- ServiceAccount Configuration
    create_service_account: Optional[bool] = Field(None, validate_default=True)
    # Create a ServiceAccount with name sa_name & default values
    sa_name: Optional[str] = None
    # or Provide the full ServiceAccount definition
    # Type: CreateServiceAccount
    service_account: Optional[Any] = None
    # -*- ClusterRole Configuration
    create_cluster_role: Optional[bool] = Field(None, validate_default=True)
    # Create a ClusterRole with name cr_name & default values
    cr_name: Optional[str] = None
    # or Provide the full ClusterRole definition
    # Type: CreateClusterRole
    cluster_role: Optional[Any] = None
    # -*- ClusterRoleBinding Configuration
    create_cluster_role_binding: Optional[bool] = Field(None, validate_default=True)
    # Create a ClusterRoleBinding with name crb_name & default values
    crb_name: Optional[str] = None
    # or Provide the full ClusterRoleBinding definition
    # Type: CreateClusterRoleBinding
    cluster_role_binding: Optional[Any] = None

    # -*- Add additional Kubernetes resources to the App
    # Type: CreateSecret
    add_secrets: Optional[List[Any]] = None
    # Type: CreateConfigMap
    add_configmaps: Optional[List[Any]] = None
    # Type: CreateService
    add_services: Optional[List[Any]] = None
    # Type: CreateDeployment
    add_deployments: Optional[List[Any]] = None
    # Type: CreateContainer
    add_containers: Optional[List[Any]] = None
    # Type: CreateContainer
    add_init_containers: Optional[List[Any]] = None
    # Type: CreatePort
    add_ports: Optional[List[Any]] = None
    # Type: CreateVolume
    add_volumes: Optional[List[Any]] = None
    # Type: K8sResource or CreateK8sResource
    add_resources: Optional[List[Any]] = None

    # -*- Add additional YAML resources to the App
    # Type: YamlResource
    yaml_resources: Optional[List[Any]] = None

    @field_validator("service_port", mode="before")
    def set_service_port(cls, v, info: FieldValidationInfo):
        port_number = info.data.get("port_number")
        service_type: Optional[ServiceType] = info.data.get("service_type")
        enable_https = info.data.get("enable_https")
        if v is None:
            if service_type == ServiceType.LOAD_BALANCER:
                if enable_https:
                    v = 443
                else:
                    v = 80
            elif port_number is not None:
                v = port_number
        return v

    @field_validator("create_service_account", mode="before")
    def set_create_service_account(cls, v, info: FieldValidationInfo):
        create_rbac = info.data.get("create_rbac")
        if v is None and create_rbac:
            v = create_rbac
        return v

    @field_validator("create_cluster_role", mode="before")
    def set_create_cluster_role(cls, v, info: FieldValidationInfo):
        create_rbac = info.data.get("create_rbac")
        if v is None and create_rbac:
            v = create_rbac
        return v

    @field_validator("create_cluster_role_binding", mode="before")
    def set_create_cluster_role_binding(cls, v, info: FieldValidationInfo):
        create_rbac = info.data.get("create_rbac")
        if v is None and create_rbac:
            v = create_rbac
        return v

    @model_validator(mode="after")
    def validate_model(self) -> "K8sApp":
        if self.enable_https:
            if self.acm_certificate_arn is None and self.acm_certificate_summary_file is None:
                raise ValueError(
                    "Must provide an ACM Certificate ARN or ACM Certificate Summary File if enable_https=True"
                )
        return self

    def get_cr_name(self) -> str:
        from phi.utils.defaults import get_default_cr_name

        return self.cr_name or get_default_cr_name(self.name)

    def get_crb_name(self) -> str:
        from phi.utils.defaults import get_default_crb_name

        return self.crb_name or get_default_crb_name(self.name)

    def get_configmap_name(self) -> str:
        from phi.utils.defaults import get_default_configmap_name

        return self.configmap_name or get_default_configmap_name(self.name)

    def get_secret_name(self) -> str:
        from phi.utils.defaults import get_default_secret_name

        return self.secret_name or get_default_secret_name(self.name)

    def get_container_name(self) -> str:
        from phi.utils.defaults import get_default_container_name

        return self.container_name or get_default_container_name(self.name)

    def get_deploy_name(self) -> str:
        from phi.utils.defaults import get_default_deploy_name

        return self.deploy_name or get_default_deploy_name(self.name)

    def get_pod_name(self) -> str:
        from phi.utils.defaults import get_default_pod_name

        return self.pod_name or get_default_pod_name(self.name)

    def get_service_name(self) -> str:
        from phi.utils.defaults import get_default_service_name

        return self.service_name or get_default_service_name(self.name)

    def get_service_port(self) -> Optional[int]:
        return self.service_port

    def get_service_annotations(self) -> Optional[Dict[str, str]]:
        service_annotations = self.service_annotations

        # Add annotations to create an AWS LoadBalancer
        # https://kubernetes-sigs.github.io/aws-load-balancer-controller/v2.5/guide/service/nlb/
        if self.service_type == ServiceType.LOAD_BALANCER:
            if service_annotations is None:
                service_annotations = OrderedDict()
            if self.use_nlb:
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-type"] = "nlb"
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-nlb-target-type"] = (
                    self.nlb_target_type
                )

            if self.load_balancer_scheme is not None:
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-scheme"] = self.load_balancer_scheme
                if self.load_balancer_scheme == "internal":
                    service_annotations["service.beta.kubernetes.io/aws-load-balancer-internal"] = "true"

            # https://kubernetes-sigs.github.io/aws-load-balancer-controller/v2.4/guide/service/annotations/#load-balancer-attributes
            # Deprecated docs: # https://kubernetes.io/docs/concepts/services-networking/service/#elb-access-logs-on-aws
            if self.write_access_logs_to_s3:
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-access-log-enabled"] = "true"
                lb_attributes = "access_logs.s3.enabled=true"
                if self.access_logs_s3_bucket is not None:
                    service_annotations["service.beta.kubernetes.io/aws-load-balancer-access-log-s3-bucket-name"] = (
                        self.access_logs_s3_bucket
                    )
                    lb_attributes += f",access_logs.s3.bucket={self.access_logs_s3_bucket}"
                if self.access_logs_s3_bucket_prefix is not None:
                    service_annotations["service.beta.kubernetes.io/aws-load-balancer-access-log-s3-bucket-prefix"] = (
                        self.access_logs_s3_bucket_prefix
                    )
                    lb_attributes += f",access_logs.s3.prefix={self.access_logs_s3_bucket_prefix}"
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-attributes"] = lb_attributes

            # https://kubernetes.io/docs/concepts/services-networking/service/#ssl-support-on-aws
            if self.enable_https:
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-ssl-ports"] = str(
                    self.get_service_port()
                )

                # https://kubernetes-sigs.github.io/aws-load-balancer-controller/v2.4/guide/service/annotations/#ssl-cert
                if self.acm_certificate_arn is not None:
                    service_annotations["service.beta.kubernetes.io/aws-load-balancer-ssl-cert"] = (
                        self.acm_certificate_arn
                    )
                # if acm_certificate_summary_file is provided, use that
                if self.acm_certificate_summary_file is not None and isinstance(
                    self.acm_certificate_summary_file, Path
                ):
                    if self.acm_certificate_summary_file.exists() and self.acm_certificate_summary_file.is_file():
                        from phi.aws.resource.acm.certificate import CertificateSummary

                        file_contents = self.acm_certificate_summary_file.read_text()
                        cert_summary = CertificateSummary.model_validate(file_contents)
                        certificate_arn = cert_summary.CertificateArn
                        logger.debug(f"CertificateArn: {certificate_arn}")
                        service_annotations["service.beta.kubernetes.io/aws-load-balancer-ssl-cert"] = certificate_arn
                    else:
                        logger.warning(f"Does not exist: {self.acm_certificate_summary_file}")

            # Enable proxy protocol for NLB
            if self.enable_load_balancer_proxy_protocol:
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-proxy-protocol"] = "*"

            # Enable cross-zone load balancing
            if self.enable_cross_zone_load_balancing:
                service_annotations[
                    "service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled"
                ] = "true"

            # Add subnets to NLB
            if self.load_balancer_subnets is not None and isinstance(self.load_balancer_subnets, list):
                service_annotations["service.beta.kubernetes.io/aws-load-balancer-subnets"] = ", ".join(
                    self.load_balancer_subnets
                )

        return service_annotations

    def get_ingress_name(self) -> str:
        from phi.utils.defaults import get_default_ingress_name

        return self.ingress_name or get_default_ingress_name(self.name)

    def get_ingress_annotations(self) -> Optional[Dict[str, str]]:
        ingress_annotations = {"alb.ingress.kubernetes.io/load-balancer-name": self.get_ingress_name()}

        if self.load_balancer_scheme == "internal":
            ingress_annotations["alb.ingress.kubernetes.io/scheme"] = "internal"
        else:
            ingress_annotations["alb.ingress.kubernetes.io/scheme"] = "internet-facing"

        if self.load_balancer_subnets is not None and isinstance(self.load_balancer_subnets, list):
            ingress_annotations["alb.ingress.kubernetes.io/subnets"] = ", ".join(self.load_balancer_subnets)

        if self.ingress_annotations is not None:
            ingress_annotations.update(self.ingress_annotations)

        return ingress_annotations

    def get_ingress_rules(self) -> List[Any]:
        from kubernetes.client.models.v1_ingress_rule import V1IngressRule
        from kubernetes.client.models.v1_ingress_backend import V1IngressBackend
        from kubernetes.client.models.v1_ingress_service_backend import V1IngressServiceBackend
        from kubernetes.client.models.v1_http_ingress_path import V1HTTPIngressPath
        from kubernetes.client.models.v1_http_ingress_rule_value import V1HTTPIngressRuleValue
        from kubernetes.client.models.v1_service_port import V1ServicePort

        return [
            V1IngressRule(
                http=V1HTTPIngressRuleValue(
                    paths=[
                        V1HTTPIngressPath(
                            path="/",
                            path_type="Prefix",
                            backend=V1IngressBackend(
                                service=V1IngressServiceBackend(
                                    name=self.get_service_name(),
                                    port=V1ServicePort(
                                        name=self.container_port_name,
                                        port=self.get_service_port(),
                                    ),
                                )
                            ),
                        ),
                    ]
                ),
            )
        ]

    def get_load_balancer_source_ranges(self) -> Optional[List[str]]:
        if self.load_balancer_source_ranges is not None:
            return self.load_balancer_source_ranges

        load_balancer_source_ranges = self.get_secret_from_file("LOAD_BALANCER_SOURCE_RANGES")
        if isinstance(load_balancer_source_ranges, str):
            return [load_balancer_source_ranges]
        return load_balancer_source_ranges

    def get_cr_policy_rules(self) -> List[Any]:
        from phi.k8s.create.rbac_authorization_k8s_io.v1.cluster_role import (
            PolicyRule,
        )

        return [
            PolicyRule(
                api_groups=[""],
                resources=["pods", "secrets", "configmaps"],
                verbs=["get", "list", "watch", "create", "update", "patch", "delete"],
            ),
            PolicyRule(
                api_groups=[""],
                resources=["pods/logs"],
                verbs=["get", "list", "watch"],
            ),
            PolicyRule(
                api_groups=[""],
                resources=["pods/exec"],
                verbs=["get", "create", "watch", "delete"],
            ),
        ]

    def get_container_context(self) -> Optional[ContainerContext]:
        logger.debug("Building ContainerContext")

        if self.container_context is not None:
            return self.container_context

        workspace_name = self.workspace_name
        if workspace_name is None:
            raise Exception("Could not determine workspace_name")

        workspace_root_in_container: str = self.workspace_dir_container_path
        # if workspace_parent_dir_container_path is provided
        # derive workspace_root_in_container from workspace_parent_dir_container_path
        workspace_parent_in_container: Optional[str] = self.workspace_parent_dir_container_path
        if workspace_parent_in_container is not None:
            workspace_root_in_container = f"{self.workspace_parent_dir_container_path}/{workspace_name}"

        if workspace_root_in_container is None:
            raise Exception("Could not determine workspace_root in container")

        # if workspace_parent_in_container is not provided
        # derive workspace_parent_in_container from workspace_root_in_container
        if workspace_parent_in_container is None:
            workspace_parent_paths = workspace_root_in_container.split("/")[0:-1]
            workspace_parent_in_container = "/".join(workspace_parent_paths)

        self.container_context = ContainerContext(
            workspace_name=workspace_name,
            workspace_root=workspace_root_in_container,
            workspace_parent=workspace_parent_in_container,
        )

        if self.workspace_settings is not None and self.workspace_settings.scripts_dir is not None:
            self.container_context.scripts_dir = f"{workspace_root_in_container}/{self.workspace_settings.scripts_dir}"

        if self.workspace_settings is not None and self.workspace_settings.storage_dir is not None:
            self.container_context.storage_dir = f"{workspace_root_in_container}/{self.workspace_settings.storage_dir}"

        if self.workspace_settings is not None and self.workspace_settings.workflows_dir is not None:
            self.container_context.workflows_dir = (
                f"{workspace_root_in_container}/{self.workspace_settings.workflows_dir}"
            )

        if self.workspace_settings is not None and self.workspace_settings.workspace_dir is not None:
            self.container_context.workspace_dir = (
                f"{workspace_root_in_container}/{self.workspace_settings.workspace_dir}"
            )

        if self.workspace_settings is not None and self.workspace_settings.ws_schema is not None:
            self.container_context.workspace_schema = self.workspace_settings.ws_schema

        if self.requirements_file is not None:
            self.container_context.requirements_file = f"{workspace_root_in_container}/{self.requirements_file}"

        return self.container_context

    def get_container_env(self, container_context: ContainerContext) -> Dict[str, str]:
        from phi.constants import (
            PHI_RUNTIME_ENV_VAR,
            PYTHONPATH_ENV_VAR,
            REQUIREMENTS_FILE_PATH_ENV_VAR,
            SCRIPTS_DIR_ENV_VAR,
            STORAGE_DIR_ENV_VAR,
            WORKFLOWS_DIR_ENV_VAR,
            WORKSPACE_DIR_ENV_VAR,
            WORKSPACE_HASH_ENV_VAR,
            WORKSPACE_ID_ENV_VAR,
            WORKSPACE_ROOT_ENV_VAR,
        )

        # Container Environment
        container_env: Dict[str, str] = self.container_env or {}
        container_env.update(
            {
                "INSTALL_REQUIREMENTS": str(self.install_requirements),
                "MOUNT_WORKSPACE": str(self.mount_workspace),
                "PRINT_ENV_ON_LOAD": str(self.print_env_on_load),
                PHI_RUNTIME_ENV_VAR: "kubernetes",
                REQUIREMENTS_FILE_PATH_ENV_VAR: container_context.requirements_file or "",
                SCRIPTS_DIR_ENV_VAR: container_context.scripts_dir or "",
                STORAGE_DIR_ENV_VAR: container_context.storage_dir or "",
                WORKFLOWS_DIR_ENV_VAR: container_context.workflows_dir or "",
                WORKSPACE_DIR_ENV_VAR: container_context.workspace_dir or "",
                WORKSPACE_ROOT_ENV_VAR: container_context.workspace_root or "",
            }
        )

        try:
            if container_context.workspace_schema is not None:
                if container_context.workspace_schema.id_workspace is not None:
                    container_env[WORKSPACE_ID_ENV_VAR] = str(container_context.workspace_schema.id_workspace) or ""
                if container_context.workspace_schema.ws_hash is not None:
                    container_env[WORKSPACE_HASH_ENV_VAR] = container_context.workspace_schema.ws_hash
        except Exception:
            pass

        if self.set_python_path:
            python_path = self.python_path
            if python_path is None:
                python_path = container_context.workspace_root
                if self.add_python_paths is not None:
                    python_path = "{}:{}".format(python_path, ":".join(self.add_python_paths))
            if python_path is not None:
                container_env[PYTHONPATH_ENV_VAR] = python_path

        # Set aws region and profile
        self.set_aws_env_vars(env_dict=container_env)

        # Update the container env using env_file
        env_data_from_file = self.get_env_file_data()
        if env_data_from_file is not None:
            container_env.update({k: str(v) for k, v in env_data_from_file.items() if v is not None})

        # Update the container env with user provided env_vars
        # this overwrites any existing variables with the same key
        if self.env_vars is not None and isinstance(self.env_vars, dict):
            container_env.update({k: str(v) for k, v in self.env_vars.items() if v is not None})

        # logger.debug("Container Environment: {}".format(container_env))
        return container_env

    def get_container_args(self) -> Optional[List[str]]:
        if isinstance(self.command, str):
            return self.command.strip().split(" ")
        return self.command

    def get_container_labels(self, common_labels: Optional[Dict[str, str]]) -> Dict[str, str]:
        labels: Dict[str, str] = common_labels or {}
        if self.container_labels is not None and isinstance(self.container_labels, dict):
            labels.update(self.container_labels)
        return labels

    def get_deployment_labels(self, common_labels: Optional[Dict[str, str]]) -> Dict[str, str]:
        labels: Dict[str, str] = common_labels or {}
        if self.container_labels is not None and isinstance(self.container_labels, dict):
            labels.update(self.container_labels)
        return labels

    def get_service_labels(self, common_labels: Optional[Dict[str, str]]) -> Dict[str, str]:
        labels: Dict[str, str] = common_labels or {}
        if self.container_labels is not None and isinstance(self.container_labels, dict):
            labels.update(self.container_labels)
        return labels

    def get_secrets(self) -> List[Any]:
        return self.add_secrets or []

    def get_configmaps(self) -> List[Any]:
        return self.add_configmaps or []

    def get_services(self) -> List[Any]:
        return self.add_services or []

    def get_deployments(self) -> List[Any]:
        return self.add_deployments or []

    def get_containers(self) -> List[Any]:
        return self.add_containers or []

    def get_volumes(self) -> List[Any]:
        return self.add_volumes or []

    def get_ports(self) -> List[Any]:
        return self.add_ports or []

    def get_init_containers(self) -> List[Any]:
        return self.add_init_containers or []

    def add_app_resources(self, namespace: str, service_account_name: Optional[str]) -> List[Any]:
        return self.add_resources or []

    def build_resources(self, build_context: K8sBuildContext) -> List["K8sResource"]:
        from phi.k8s.create.apps.v1.deployment import CreateDeployment
        from phi.k8s.create.base import CreateK8sResource
        from phi.k8s.create.common.port import CreatePort
        from phi.k8s.create.core.v1.config_map import CreateConfigMap
        from phi.k8s.create.core.v1.container import CreateContainer
        from phi.k8s.create.core.v1.namespace import CreateNamespace
        from phi.k8s.create.core.v1.secret import CreateSecret
        from phi.k8s.create.core.v1.service import CreateService
        from phi.k8s.create.core.v1.service_account import CreateServiceAccount
        from phi.k8s.create.core.v1.volume import (
            CreateVolume,
            HostPathVolumeSource,
            AwsElasticBlockStoreVolumeSource,
            VolumeType,
        )
        from phi.k8s.create.networking_k8s_io.v1.ingress import CreateIngress
        from phi.k8s.create.rbac_authorization_k8s_io.v1.cluste_role_binding import CreateClusterRoleBinding
        from phi.k8s.create.rbac_authorization_k8s_io.v1.cluster_role import CreateClusterRole
        from phi.k8s.resource.base import K8sResource
        from phi.k8s.resource.yaml import YamlResource
        from phi.utils.defaults import get_default_volume_name, get_default_sa_name

        logger.debug(f"------------ Building {self.get_app_name()} ------------")
        # -*- Initialize K8s resources
        ns: Optional[CreateNamespace] = self.namespace
        sa: Optional[CreateServiceAccount] = self.service_account
        cr: Optional[CreateClusterRole] = self.cluster_role
        crb: Optional[CreateClusterRoleBinding] = self.cluster_role_binding
        secrets: List[CreateSecret] = self.get_secrets()
        config_maps: List[CreateConfigMap] = self.get_configmaps()
        services: List[CreateService] = self.get_services()
        deployments: List[CreateDeployment] = self.get_deployments()
        containers: List[CreateContainer] = self.get_containers()
        init_containers: List[CreateContainer] = self.get_init_containers()
        ports: List[CreatePort] = self.get_ports()
        volumes: List[CreateVolume] = self.get_volumes()

        # -*- Namespace name for this App
        # Use the Namespace name provided by the App or the default from the build_context
        # If self.create_rbac is True, the Namespace is created by the App if self.namespace is None
        ns_name: str = self.ns_name or build_context.namespace

        # -*- Service Account name for this App
        # Use the Service Account provided by the App or the default from the build_context
        sa_name: Optional[str] = self.sa_name or build_context.service_account_name

        # Use the labels from the build_context as common labels for all resources
        common_labels: Optional[Dict[str, str]] = build_context.labels

        # -*- Create Namespace
        if self.create_namespace:
            if ns is None:
                ns = CreateNamespace(
                    ns=ns_name,
                    app_name=self.get_app_name(),
                    labels=common_labels,
                )
            ns_name = ns.ns

        # -*- Create Service Account
        if self.create_service_account:
            if sa is None:
                sa = CreateServiceAccount(
                    sa_name=sa_name or get_default_sa_name(self.get_app_name()),
                    app_name=self.get_app_name(),
                    namespace=ns_name,
                )
            sa_name = sa.sa_name

        # -*- Create Cluster Role
        if self.create_cluster_role:
            if cr is None:
                cr = CreateClusterRole(
                    cr_name=self.get_cr_name(),
                    rules=self.get_cr_policy_rules(),
                    app_name=self.get_app_name(),
                    labels=common_labels,
                )

        # -*- Create ClusterRoleBinding
        if self.create_cluster_role_binding:
            if crb is None:
                if cr is None:
                    logger.error(
                        "ClusterRoleBinding requires a ClusterRole. "
                        "Please set create_cluster_role = True or provide a ClusterRole"
                    )
                    return []
                if sa is None:
                    logger.error(
                        "ClusterRoleBinding requires a ServiceAccount. "
                        "Please set create_service_account = True or provide a ServiceAccount"
                    )
                    return []
                crb = CreateClusterRoleBinding(
                    crb_name=self.get_crb_name(),
                    cr_name=cr.cr_name,
                    service_account_name=sa.sa_name,
                    app_name=self.get_app_name(),
                    namespace=ns_name,
                    labels=common_labels,
                )

        # -*- Get Container Context
        container_context: Optional[ContainerContext] = self.get_container_context()
        if container_context is None:
            raise Exception("Could not build ContainerContext")
        logger.debug(f"ContainerContext: {container_context.model_dump_json(indent=2)}")

        # -*- Get Container Environment
        container_env: Dict[str, str] = self.get_container_env(container_context=container_context)

        # -*- Get ConfigMaps
        container_env_cm = CreateConfigMap(
            cm_name=self.get_configmap_name(),
            app_name=self.get_app_name(),
            namespace=ns_name,
            data=container_env,
            labels=common_labels,
        )
        config_maps.append(container_env_cm)

        # -*- Get Secrets
        secret_data_from_file = self.get_secret_file_data()
        if secret_data_from_file is not None:
            container_env_secret = CreateSecret(
                secret_name=self.get_secret_name(),
                app_name=self.get_app_name(),
                string_data=secret_data_from_file,
                namespace=ns_name,
                labels=common_labels,
            )
            secrets.append(container_env_secret)

        # -*- Get Container Volumes
        if self.mount_workspace:
            # Build workspace_volume_name
            workspace_volume_name = self.workspace_volume_name
            if workspace_volume_name is None:
                workspace_volume_name = get_default_volume_name(
                    f"{self.get_app_name()}-{container_context.workspace_name}-ws"
                )

            # If workspace_volume_type is None or EmptyDir
            if self.workspace_volume_type is None or self.workspace_volume_type == K8sWorkspaceVolumeType.EmptyDir:
                logger.debug("Creating EmptyDir")
                logger.debug(f"    at: {container_context.workspace_parent}")
                workspace_volume = CreateVolume(
                    volume_name=workspace_volume_name,
                    app_name=self.get_app_name(),
                    mount_path=container_context.workspace_parent,
                    volume_type=VolumeType.EMPTY_DIR,
                )
                volumes.append(workspace_volume)

                if self.enable_gitsync:
                    if self.gitsync_repo is not None:
                        git_sync_env: Dict[str, str] = {
                            "GITSYNC_REPO": self.gitsync_repo,
                            "GITSYNC_ROOT": container_context.workspace_parent,
                            "GITSYNC_LINK": container_context.workspace_name,
                        }
                        if self.gitsync_ref is not None:
                            git_sync_env["GITSYNC_REF"] = self.gitsync_ref
                        if self.gitsync_period is not None:
                            git_sync_env["GITSYNC_PERIOD"] = self.gitsync_period
                        if self.gitsync_env is not None:
                            git_sync_env.update(self.gitsync_env)
                        gitsync_container = CreateContainer(
                            container_name="git-sync",
                            app_name=self.get_app_name(),
                            image_name=self.gitsync_image_name,
                            image_tag=self.gitsync_image_tag,
                            env_vars=git_sync_env,
                            envs_from_configmap=[cm.cm_name for cm in config_maps] if len(config_maps) > 0 else None,
                            envs_from_secret=[secret.secret_name for secret in secrets] if len(secrets) > 0 else None,
                            volumes=[workspace_volume],
                        )
                        containers.append(gitsync_container)

                        if self.create_gitsync_init_container:
                            git_sync_init_env: Dict[str, str] = {"GITSYNC_ONE_TIME": "True"}
                            git_sync_init_env.update(git_sync_env)
                            _git_sync_init_container = CreateContainer(
                                container_name="git-sync-init",
                                app_name=gitsync_container.app_name,
                                image_name=gitsync_container.image_name,
                                image_tag=gitsync_container.image_tag,
                                env_vars=git_sync_init_env,
                                envs_from_configmap=gitsync_container.envs_from_configmap,
                                envs_from_secret=gitsync_container.envs_from_secret,
                                volumes=gitsync_container.volumes,
                            )
                            init_containers.append(_git_sync_init_container)
                    else:
                        logger.error("GITSYNC_REPO invalid")

            # If workspace_volume_type is HostPath
            elif self.workspace_volume_type == K8sWorkspaceVolumeType.HostPath:
                workspace_root_in_container = container_context.workspace_root
                workspace_root_on_host = str(self.workspace_root)
                logger.debug(f"Mounting: {workspace_root_on_host}")
                logger.debug(f"      to: {workspace_root_in_container}")
                workspace_volume = CreateVolume(
                    volume_name=workspace_volume_name,
                    app_name=self.get_app_name(),
                    mount_path=workspace_root_in_container,
                    volume_type=VolumeType.HOST_PATH,
                    host_path=HostPathVolumeSource(
                        path=workspace_root_on_host,
                    ),
                )
                volumes.append(workspace_volume)

        # NodeSelectors for Pods for creating az sensitive volumes
        pod_node_selector: Optional[Dict[str, str]] = self.pod_node_selector
        if self.create_volume:
            # Build volume_name
            volume_name = self.volume_name
            if volume_name is None:
                volume_name = get_default_volume_name(f"{self.get_app_name()}-{container_context.workspace_name}")

            # If volume_type is AwsEbs
            if self.volume_type == AppVolumeType.AwsEbs:
                if self.ebs_volume_id is not None or self.ebs_volume is not None:
                    # To use EbsVolume as the volume_type we:
                    # 1. Need the volume_id
                    # 2. Need to make sure pods are scheduled in the
                    #       same region/az as the volume

                    # For the volume_id we can either:
                    # 1. Use self.ebs_volume_id
                    # 2. OR get it from self.ebs_volume
                    ebs_volume_id = self.ebs_volume_id
                    # Derive ebs_volume_id from self.ebs_volume if needed
                    if ebs_volume_id is None and self.ebs_volume is not None:
                        from phi.aws.resource.ec2.volume import EbsVolume

                        # Validate self.ebs_volume is of type EbsVolume
                        if not isinstance(self.ebs_volume, EbsVolume):
                            raise ValueError(f"ebs_volume must be of type EbsVolume, found {type(self.ebs_volume)}")

                        ebs_volume_id = self.ebs_volume.get_volume_id()

                    logger.debug(f"ebs_volume_id: {ebs_volume_id}")
                    if ebs_volume_id is None:
                        logger.error(f"{self.get_app_name()}: ebs_volume_id not available, skipping app")
                        return []

                    logger.debug(f"Mounting: {volume_name}")
                    logger.debug(f"      to: {self.volume_container_path}")
                    ebs_volume = CreateVolume(
                        volume_name=volume_name,
                        app_name=self.get_app_name(),
                        mount_path=self.volume_container_path,
                        volume_type=VolumeType.AWS_EBS,
                        aws_ebs=AwsElasticBlockStoreVolumeSource(
                            volume_id=ebs_volume_id,
                        ),
                    )
                    volumes.append(ebs_volume)

                    # For the aws_region/az we can either:
                    # 1. Use self.ebs_volume_region
                    # 2. OR get it from self.ebs_volume
                    ebs_volume_region = self.ebs_volume_region
                    ebs_volume_az = self.ebs_volume_az
                    # Derive the aws_region from self.ebs_volume if needed
                    if ebs_volume_region is None and self.ebs_volume is not None:
                        from phi.aws.resource.ec2.volume import EbsVolume

                        # Validate self.ebs_volume is of type EbsVolume
                        if not isinstance(self.ebs_volume, EbsVolume):
                            raise ValueError(f"ebs_volume must be of type EbsVolume, found {type(self.ebs_volume)}")

                        _aws_region_from_ebs_volume = self.ebs_volume.get_aws_region()
                        if _aws_region_from_ebs_volume is not None:
                            ebs_volume_region = _aws_region_from_ebs_volume
                    # Derive the aws_region from this App if needed

                    # Derive the availability_zone from self.ebs_volume if needed
                    if ebs_volume_az is None and self.ebs_volume is not None:
                        from phi.aws.resource.ec2.volume import EbsVolume

                        # Validate self.ebs_volume is of type EbsVolume
                        if not isinstance(self.ebs_volume, EbsVolume):
                            raise ValueError(f"ebs_volume must be of type EbsVolume, found {type(self.ebs_volume)}")

                        ebs_volume_az = self.ebs_volume.availability_zone

                    logger.debug(f"ebs_volume_region: {ebs_volume_region}")
                    logger.debug(f"ebs_volume_az: {ebs_volume_az}")

                    # VERY IMPORTANT: pods should be scheduled in the same region/az as the volume
                    # To do this, we add NodeSelectors to Pods
                    if self.schedule_pods_in_ebs_topology:
                        if pod_node_selector is None:
                            pod_node_selector = {}

                        # Add NodeSelectors to Pods, so they are scheduled in the same
                        # region and zone as the ebs_volume
                        # https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone
                        if ebs_volume_region is not None:
                            pod_node_selector["topology.kubernetes.io/region"] = ebs_volume_region
                        else:
                            raise ValueError(
                                f"{self.get_app_name()}: ebs_volume_region not provided "
                                f"but needed for scheduling pods in the same region as the ebs_volume"
                            )

                        if ebs_volume_az is not None:
                            pod_node_selector["topology.kubernetes.io/zone"] = ebs_volume_az
                        else:
                            raise ValueError(
                                f"{self.get_app_name()}: ebs_volume_az not provided "
                                f"but needed for scheduling pods in the same zone as the ebs_volume"
                            )
                else:
                    raise ValueError(f"{self.get_app_name()}: ebs_volume_id not provided")

            # If volume_type is EmptyDir
            elif self.volume_type == AppVolumeType.EmptyDir:
                empty_dir_volume = CreateVolume(
                    volume_name=volume_name,
                    app_name=self.get_app_name(),
                    mount_path=self.volume_container_path,
                    volume_type=VolumeType.EMPTY_DIR,
                )
                volumes.append(empty_dir_volume)

            # If volume_type is HostPath
            elif self.volume_type == AppVolumeType.HostPath:
                if self.volume_host_path is not None:
                    volume_host_path_str = str(self.volume_host_path)
                    logger.debug(f"Mounting: {volume_host_path_str}")
                    logger.debug(f"      to: {self.volume_container_path}")
                    host_path_volume = CreateVolume(
                        volume_name=volume_name,
                        app_name=self.get_app_name(),
                        mount_path=self.volume_container_path,
                        volume_type=VolumeType.HOST_PATH,
                        host_path=HostPathVolumeSource(
                            path=volume_host_path_str,
                        ),
                    )
                    volumes.append(host_path_volume)
                else:
                    raise ValueError(f"{self.get_app_name()}: volume_host_path not provided")
            else:
                raise ValueError(f"{self.get_app_name()}: volume_type: {self.volume_type} not supported")

        # -*- Get Container Ports
        if self.open_port:
            container_port = CreatePort(
                name=self.container_port_name,
                container_port=self.container_port,
                service_port=self.service_port,
                target_port=self.service_target_port or self.container_port_name,
            )
            ports.append(container_port)

            # Validate NODE_PORT before adding it to the container_port
            # If ServiceType == NODE_PORT then validate self.service_node_port is available
            if self.service_type == ServiceType.NODE_PORT:
                if self.service_node_port is None or self.service_node_port < 30000 or self.service_node_port > 32767:
                    raise ValueError(f"NodePort: {self.service_node_port} invalid for ServiceType: {self.service_type}")
                else:
                    container_port.node_port = self.service_node_port
            # If ServiceType == LOAD_BALANCER then validate self.service_node_port only IF available
            elif self.service_type == ServiceType.LOAD_BALANCER:
                if self.service_node_port is not None:
                    if self.service_node_port < 30000 or self.service_node_port > 32767:
                        logger.warning(
                            f"NodePort: {self.service_node_port} invalid for ServiceType: {self.service_type}"
                        )
                        logger.warning("NodePort value will be ignored")
                        self.service_node_port = None
                    else:
                        container_port.node_port = self.service_node_port
            # else validate self.service_node_port is NOT available
            elif self.service_node_port is not None:
                logger.warning(
                    f"NodePort: {self.service_node_port} provided without specifying "
                    f"ServiceType as NODE_PORT or LOAD_BALANCER"
                )
                logger.warning("NodePort value will be ignored")
                self.service_node_port = None

        # -*- Get Container Labels
        container_labels: Dict[str, str] = self.get_container_labels(common_labels)

        # -*- Get Container Args: Equivalent to docker CMD
        container_args: Optional[List[str]] = self.get_container_args()
        if container_args:
            logger.debug("Command: {}".format(" ".join(container_args)))

        # -*- Build the Container
        container = CreateContainer(
            container_name=self.get_container_name(),
            app_name=self.get_app_name(),
            image_name=self.image_name,
            image_tag=self.image_tag,
            # Equivalent to docker images CMD
            args=container_args,
            # Equivalent to docker images ENTRYPOINT
            command=[self.entrypoint] if isinstance(self.entrypoint, str) else self.entrypoint,
            image_pull_policy=self.image_pull_policy or ImagePullPolicy.IF_NOT_PRESENT,
            envs_from_configmap=[cm.cm_name for cm in config_maps] if len(config_maps) > 0 else None,
            envs_from_secret=[secret.secret_name for secret in secrets] if len(secrets) > 0 else None,
            ports=ports if len(ports) > 0 else None,
            volumes=volumes if len(volumes) > 0 else None,
            labels=container_labels,
        )
        containers.insert(0, container)

        # Set default container for kubectl commands
        # https://kubernetes.io/docs/reference/labels-annotations-taints/#kubectl-kubernetes-io-default-container
        pod_annotations = {"kubectl.kubernetes.io/default-container": container.container_name}

        # -*- Add pod annotations
        if self.pod_annotations is not None and isinstance(self.pod_annotations, dict):
            pod_annotations.update(self.pod_annotations)

        # -*- Get Deployment Labels
        deploy_labels: Dict[str, str] = self.get_deployment_labels(common_labels)

        # If using EbsVolume, restart the deployment on update
        recreate_deployment_on_update = (
            True if (self.create_volume and self.volume_type == AppVolumeType.AwsEbs) else False
        )

        # -*- Create the Deployment
        deployment = CreateDeployment(
            deploy_name=self.get_deploy_name(),
            pod_name=self.get_pod_name(),
            app_name=self.get_app_name(),
            namespace=ns_name,
            service_account_name=sa_name,
            replicas=self.replicas,
            containers=containers,
            init_containers=init_containers if len(init_containers) > 0 else None,
            pod_node_selector=pod_node_selector,
            restart_policy=self.restart_policy or RestartPolicy.ALWAYS,
            termination_grace_period_seconds=self.termination_grace_period_seconds,
            volumes=volumes if len(volumes) > 0 else None,
            labels=deploy_labels,
            pod_annotations=pod_annotations,
            topology_spread_key=self.topology_spread_key,
            topology_spread_max_skew=self.topology_spread_max_skew,
            topology_spread_when_unsatisfiable=self.topology_spread_when_unsatisfiable,
            recreate_on_update=recreate_deployment_on_update,
        )
        deployments.append(deployment)

        # -*- Create the Service
        if self.create_service:
            service_labels = self.get_service_labels(common_labels)
            service_annotations = self.get_service_annotations()
            service = CreateService(
                service_name=self.get_service_name(),
                app_name=self.get_app_name(),
                namespace=ns_name,
                service_account_name=sa_name,
                service_type=self.service_type,
                deployment=deployment,
                ports=ports if len(ports) > 0 else None,
                labels=service_labels,
                annotations=service_annotations,
                # If ServiceType == ServiceType.LoadBalancer
                health_check_node_port=self.health_check_node_port,
                internal_traffic_policy=self.internal_traffic_policy,
                load_balancer_class=self.load_balancer_class,
                load_balancer_ip=self.load_balancer_ip,
                load_balancer_source_ranges=self.get_load_balancer_source_ranges(),
                allocate_load_balancer_node_ports=self.allocate_load_balancer_node_ports,
                protocol="https" if self.enable_https else "http",
            )
            services.append(service)

        # -*- Create the Ingress
        ingress: Optional[CreateIngress] = None
        if self.create_ingress:
            ingress_annotations = self.get_ingress_annotations()
            ingress_rules = self.get_ingress_rules()
            ingress = CreateIngress(
                ingress_name=self.get_ingress_name(),
                app_name=self.get_app_name(),
                namespace=ns_name,
                service_account_name=sa_name,
                annotations=ingress_annotations,
                ingress_class_name=self.ingress_class_name,
                rules=ingress_rules,
            )

        # -*- List of K8sResources created by this App
        app_resources: List[K8sResource] = []
        if ns:
            app_resources.append(ns.create())
        if sa:
            app_resources.append(sa.create())
        if cr:
            app_resources.append(cr.create())
        if crb:
            app_resources.append(crb.create())
        if len(secrets) > 0:
            app_resources.extend([secret.create() for secret in secrets])
        if len(config_maps) > 0:
            app_resources.extend([cm.create() for cm in config_maps])
        if len(services) > 0:
            app_resources.extend([service.create() for service in services])
        if len(deployments) > 0:
            app_resources.extend([deployment.create() for deployment in deployments])
        if ingress is not None:
            app_resources.append(ingress.create())
        if self.add_resources is not None and isinstance(self.add_resources, list):
            logger.debug(f"Adding {len(self.add_resources)} Resources")
            for resource in self.add_resources:
                if isinstance(resource, CreateK8sResource):
                    app_resources.append(resource.create())
                elif isinstance(resource, K8sResource):
                    app_resources.append(resource)
                else:
                    logger.error(f"Resource not of type K8sResource or CreateK8sResource: {resource}")
        add_app_resources = self.add_app_resources(namespace=ns_name, service_account_name=sa_name)
        if len(add_app_resources) > 0:
            logger.debug(f"Adding {len(add_app_resources)} App Resources")
            for r in add_app_resources:
                if isinstance(r, CreateK8sResource):
                    app_resources.append(r.create())
                elif isinstance(r, K8sResource):
                    app_resources.append(r)
                else:
                    logger.error(f"Resource not of type K8sResource or CreateK8sResource: {r}")
        if self.yaml_resources is not None and len(self.yaml_resources) > 0:
            logger.debug(f"Adding {len(self.yaml_resources)} YAML Resources")
            for yaml_resource in self.yaml_resources:
                if isinstance(yaml_resource, YamlResource):
                    app_resources.append(yaml_resource)

        logger.debug(f"------------ {self.get_app_name()} Built ------------")
        return app_resources