File size: 14,230 Bytes
36574ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
import json
import os
from cdk_config import CDK_PREFIX, VPC_NAME, AWS_REGION, PUBLIC_SUBNETS_TO_USE, PRIVATE_SUBNETS_TO_USE, CODEBUILD_ROLE_NAME, ECS_TASK_ROLE_NAME, ECS_TASK_EXECUTION_ROLE_NAME, S3_LOG_CONFIG_BUCKET_NAME, S3_OUTPUT_BUCKET_NAME, ECR_CDK_REPO_NAME, CODEBUILD_PROJECT_NAME, ALB_NAME, COGNITO_USER_POOL_NAME, COGNITO_USER_POOL_CLIENT_NAME, COGNITO_USER_POOL_CLIENT_SECRET_NAME, WEB_ACL_NAME, CONTEXT_FILE, PUBLIC_SUBNET_CIDR_BLOCKS, PRIVATE_SUBNET_CIDR_BLOCKS, PUBLIC_SUBNET_AVAILABILITY_ZONES, PRIVATE_SUBNET_AVAILABILITY_ZONES, CDK_FOLDER, CDK_CONFIG_PATH # Import necessary config
from cdk_functions import ( # Import your check functions (assuming they use Boto3)
get_vpc_id_by_name,
check_subnet_exists_by_name,
check_for_existing_role,
check_s3_bucket_exists,
check_ecr_repo_exists,
check_codebuild_project_exists,
check_alb_exists,
check_for_existing_user_pool,
check_for_existing_user_pool_client,
check_for_secret,
check_cloudfront_distribution_exists,
check_web_acl_exists,
_get_existing_subnets_in_vpc,
validate_subnet_creation_parameters
# Add other check functions as needed
)
from typing import List, Dict, Any
cdk_folder = CDK_FOLDER #<FULL_PATH_TO_CDK_FOLDER_HERE>
# Full path needed to find config file
os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH
# --- Helper to parse environment variables into lists ---
def _get_env_list(env_var_name: str) -> List[str]:
"""Parses a comma-separated environment variable into a list of strings."""
value = env_var_name[1:-1].strip().replace('\"', '').replace("\'","")
if not value:
return []
# Split by comma and filter out any empty strings that might result from extra commas
return [s.strip() for s in value.split(',') if s.strip()]
if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list): PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE)
if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list): PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE)
if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list): PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS)
if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance(PUBLIC_SUBNET_AVAILABILITY_ZONES, list): PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES)
if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list): PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS)
if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance(PRIVATE_SUBNET_AVAILABILITY_ZONES, list): PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES)
# Check for the existence of elements in your AWS environment to see if it's necessary to create new versions of the same
def check_and_set_context():
context_data = {}
# --- Find the VPC ID first ---
print("VPC_NAME:", VPC_NAME)
vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME)
# If you expect only one, or one per AZ and you're creating one per AZ in CDK:
if nat_gateways:
# For simplicity, let's just check if *any* NAT exists in the VPC
# A more robust check would match by subnet, AZ, or a specific tag.
context_data["exists:NatGateway"] = True
context_data["id:NatGateway"] = nat_gateways[0]['NatGatewayId'] # Store the ID of the first one found
else:
context_data["exists:NatGateway"] = False
context_data["id:NatGateway"] = None
if not vpc_id:
# If the VPC doesn't exist, you might not be able to check/create subnets.
# Decide how to handle this: raise an error, set a flag, etc.
raise RuntimeError(f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks.")
context_data["vpc_id"] = vpc_id # Store VPC ID in context
# SUBNET CHECKS
context_data: Dict[str, Any] = {}
all_proposed_subnets_data: List[Dict[str, str]] = []
# Flag to indicate if full validation mode (with CIDR/AZs) is active
full_validation_mode = False
# Determine if full validation mode is possible/desired
# It's 'desired' if CIDR/AZs are provided, and their lengths match the name lists.
public_ready_for_full_validation = (
len(PUBLIC_SUBNETS_TO_USE) > 0 and
len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE) and
len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE)
)
private_ready_for_full_validation = (
len(PRIVATE_SUBNETS_TO_USE) > 0 and
len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE) and
len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE)
)
# Activate full validation if *any* type of subnet (public or private) has its full details provided.
# You might adjust this logic if you require ALL subnet types to have CIDRs, or NONE.
if public_ready_for_full_validation or private_ready_for_full_validation:
full_validation_mode = True
# If some are ready but others aren't, print a warning or raise an error based on your strictness
if public_ready_for_full_validation and not private_ready_for_full_validation and PRIVATE_SUBNETS_TO_USE:
print("Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs.")
if private_ready_for_full_validation and not public_ready_for_full_validation and PUBLIC_SUBNETS_TO_USE:
print("Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs.")
# Prepare data for validate_subnet_creation_parameters for all subnets that have full details
if public_ready_for_full_validation:
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
all_proposed_subnets_data.append({
'name': name,
'cidr': PUBLIC_SUBNET_CIDR_BLOCKS[i],
'az': PUBLIC_SUBNET_AVAILABILITY_ZONES[i]
})
if private_ready_for_full_validation:
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
all_proposed_subnets_data.append({
'name': name,
'cidr': PRIVATE_SUBNET_CIDR_BLOCKS[i],
'az': PRIVATE_SUBNET_AVAILABILITY_ZONES[i]
})
print(f"Target VPC ID for Boto3 lookup: {vpc_id}")
# Fetch all existing subnets in the target VPC once to avoid repeated API calls
try:
existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id)
except Exception as e:
print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}")
raise SystemExit(1) # Exit immediately if we can't get baseline data
print("\n--- Running Name-Only Subnet Existence Check Mode ---")
# Fallback: check only by name using the existing data
checked_public_subnets = {}
if PUBLIC_SUBNETS_TO_USE:
for subnet_name in PUBLIC_SUBNETS_TO_USE:
print("subnet_name:", subnet_name)
exists, subnet_id = check_subnet_exists_by_name(subnet_name, existing_aws_subnets)
checked_public_subnets[subnet_name] = {"exists": exists, "id": subnet_id}
# If the subnet exists, remove it from the proposed subnets list
if checked_public_subnets[subnet_name]["exists"] == True:
all_proposed_subnets_data = [
subnet for subnet in all_proposed_subnets_data
if subnet['name'] != subnet_name
]
context_data["checked_public_subnets"] = checked_public_subnets
checked_private_subnets = {}
if PRIVATE_SUBNETS_TO_USE:
for subnet_name in PRIVATE_SUBNETS_TO_USE:
print("subnet_name:", subnet_name)
exists, subnet_id = check_subnet_exists_by_name(subnet_name, existing_aws_subnets)
checked_private_subnets[subnet_name] = {"exists": exists, "id": subnet_id}
# If the subnet exists, remove it from the proposed subnets list
if checked_private_subnets[subnet_name]["exists"] == True:
all_proposed_subnets_data = [
subnet for subnet in all_proposed_subnets_data
if subnet['name'] != subnet_name
]
context_data["checked_private_subnets"] = checked_private_subnets
print("\nName-only existence subnet check complete.\n")
if full_validation_mode:
print("\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---")
try:
validate_subnet_creation_parameters(vpc_id, all_proposed_subnets_data, existing_aws_subnets)
print("\nPre-synth validation successful. Proceeding with CDK synth.\n")
# Populate context_data for downstream CDK construct creation
context_data["public_subnets_to_create"] = []
if public_ready_for_full_validation:
for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
context_data["public_subnets_to_create"].append({
'name': name,
'cidr': PUBLIC_SUBNET_CIDR_BLOCKS[i],
'az': PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
'is_public': True
})
context_data["private_subnets_to_create"] = []
if private_ready_for_full_validation:
for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
context_data["private_subnets_to_create"].append({
'name': name,
'cidr': PRIVATE_SUBNET_CIDR_BLOCKS[i],
'az': PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
'is_public': False
})
except (ValueError, Exception) as e:
print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n")
raise SystemExit(1) # Exit if validation fails
# Example checks and setting context values
# IAM Roles
role_name = CODEBUILD_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists # Use boolean
if exists:
_, role_arn, _ = check_for_existing_role(role_name) # Get ARN if needed
context_data[f"arn:{role_name}"] = role_arn
role_name = ECS_TASK_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists
if exists:
_, role_arn, _ = check_for_existing_role(role_name)
context_data[f"arn:{role_name}"] = role_arn
role_name = ECS_TASK_EXECUTION_ROLE_NAME
exists, _, _ = check_for_existing_role(role_name)
context_data[f"exists:{role_name}"] = exists
if exists:
_, role_arn, _ = check_for_existing_role(role_name)
context_data[f"arn:{role_name}"] = role_arn
# S3 Buckets
bucket_name = S3_LOG_CONFIG_BUCKET_NAME
exists, _ = check_s3_bucket_exists(bucket_name)
context_data[f"exists:{bucket_name}"] = exists
if exists:
# You might not need the ARN if using from_bucket_name
pass
output_bucket_name = S3_OUTPUT_BUCKET_NAME
exists, _ = check_s3_bucket_exists(output_bucket_name)
context_data[f"exists:{output_bucket_name}"] = exists
if exists:
pass
# ECR Repository
repo_name = ECR_CDK_REPO_NAME
exists, _ = check_ecr_repo_exists(repo_name)
context_data[f"exists:{repo_name}"] = exists
if exists:
pass # from_repository_name is sufficient
# CodeBuild Project
project_name = CODEBUILD_PROJECT_NAME
exists, _ = check_codebuild_project_exists(project_name)
context_data[f"exists:{project_name}"] = exists
if exists:
# Need a way to get the ARN from the check function
_, project_arn = check_codebuild_project_exists(project_name) # Assuming it returns ARN
context_data[f"arn:{project_name}"] = project_arn
# ALB (by name lookup)
alb_name = ALB_NAME
exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION)
context_data[f"exists:{alb_name}"] = exists
if exists:
_, alb_object = check_alb_exists(alb_name, region_name=AWS_REGION) # Assuming check returns object
print("alb_object:", alb_object)
context_data[f"arn:{alb_name}"] = alb_object['LoadBalancerArn']
# Cognito User Pool (by name)
user_pool_name = COGNITO_USER_POOL_NAME
exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name)
context_data[f"exists:{user_pool_name}"] = exists
if exists:
context_data[f"id:{user_pool_name}"] = user_pool_id
# Cognito User Pool Client (by name and pool ID) - requires User Pool ID from check
if user_pool_id:
user_pool_id_for_client_check = user_pool_id #context_data.get(f"id:{user_pool_name}") # Use ID from context
user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME
if user_pool_id_for_client_check:
exists, client_id, _ = check_for_existing_user_pool_client(user_pool_client_name, user_pool_id_for_client_check)
context_data[f"exists:{user_pool_client_name}"] = exists
if exists:
context_data[f"id:{user_pool_client_name}"] = client_id
# Secrets Manager Secret (by name)
secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME
exists, _ = check_for_secret(secret_name)
context_data[f"exists:{secret_name}"] = exists
# You might not need the ARN if using from_secret_name_v2
# WAF Web ACL (by name and scope)
web_acl_name = WEB_ACL_NAME
exists, _ = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT") # Assuming check returns object
context_data[f"exists:{web_acl_name}"] = exists
if exists:
_, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT")
context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn
# Write the context data to the file
with open(CONTEXT_FILE, "w") as f:
json.dump(context_data, f, indent=2)
print(f"Context data written to {CONTEXT_FILE}")
|