File size: 14,230 Bytes
36574ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import json
import os
from cdk_config import CDK_PREFIX, VPC_NAME, AWS_REGION, PUBLIC_SUBNETS_TO_USE, PRIVATE_SUBNETS_TO_USE, CODEBUILD_ROLE_NAME, ECS_TASK_ROLE_NAME, ECS_TASK_EXECUTION_ROLE_NAME, S3_LOG_CONFIG_BUCKET_NAME, S3_OUTPUT_BUCKET_NAME, ECR_CDK_REPO_NAME, CODEBUILD_PROJECT_NAME, ALB_NAME, COGNITO_USER_POOL_NAME, COGNITO_USER_POOL_CLIENT_NAME, COGNITO_USER_POOL_CLIENT_SECRET_NAME, WEB_ACL_NAME, CONTEXT_FILE, PUBLIC_SUBNET_CIDR_BLOCKS, PRIVATE_SUBNET_CIDR_BLOCKS, PUBLIC_SUBNET_AVAILABILITY_ZONES, PRIVATE_SUBNET_AVAILABILITY_ZONES, CDK_FOLDER, CDK_CONFIG_PATH  # Import necessary config
from cdk_functions import ( # Import your check functions (assuming they use Boto3)
    get_vpc_id_by_name,
    check_subnet_exists_by_name,
    check_for_existing_role,
    check_s3_bucket_exists,
    check_ecr_repo_exists,
    check_codebuild_project_exists,
    check_alb_exists,
    check_for_existing_user_pool,
    check_for_existing_user_pool_client,
    check_for_secret,
    check_cloudfront_distribution_exists,
    check_web_acl_exists,
    _get_existing_subnets_in_vpc,
    validate_subnet_creation_parameters
    # Add other check functions as needed
)

from typing import List, Dict, Any

cdk_folder = CDK_FOLDER #<FULL_PATH_TO_CDK_FOLDER_HERE>

# Full path needed to find config file
os.environ["CDK_CONFIG_PATH"] = cdk_folder + CDK_CONFIG_PATH

# --- Helper to parse environment variables into lists ---
def _get_env_list(env_var_name: str) -> List[str]:
    """Parses a comma-separated environment variable into a list of strings."""
    value = env_var_name[1:-1].strip().replace('\"', '').replace("\'","")
    if not value:
        return []
    # Split by comma and filter out any empty strings that might result from extra commas
    return [s.strip() for s in value.split(',') if s.strip()]


if PUBLIC_SUBNETS_TO_USE and not isinstance(PUBLIC_SUBNETS_TO_USE, list): PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE)
if PRIVATE_SUBNETS_TO_USE and not isinstance(PRIVATE_SUBNETS_TO_USE, list): PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE)
if PUBLIC_SUBNET_CIDR_BLOCKS and not isinstance(PUBLIC_SUBNET_CIDR_BLOCKS, list): PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list(PUBLIC_SUBNET_CIDR_BLOCKS)
if PUBLIC_SUBNET_AVAILABILITY_ZONES and not isinstance(PUBLIC_SUBNET_AVAILABILITY_ZONES, list): PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list(PUBLIC_SUBNET_AVAILABILITY_ZONES)
if PRIVATE_SUBNET_CIDR_BLOCKS and not isinstance(PRIVATE_SUBNET_CIDR_BLOCKS, list): PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list(PRIVATE_SUBNET_CIDR_BLOCKS)
if PRIVATE_SUBNET_AVAILABILITY_ZONES and not isinstance(PRIVATE_SUBNET_AVAILABILITY_ZONES, list): PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list(PRIVATE_SUBNET_AVAILABILITY_ZONES)

# Check for the existence of elements in your AWS environment to see if it's necessary to create new versions of the same

def check_and_set_context():
    context_data = {}

    # --- Find the VPC ID first ---
    print("VPC_NAME:", VPC_NAME)
    vpc_id, nat_gateways = get_vpc_id_by_name(VPC_NAME)

    # If you expect only one, or one per AZ and you're creating one per AZ in CDK:
    if nat_gateways:
        # For simplicity, let's just check if *any* NAT exists in the VPC
        # A more robust check would match by subnet, AZ, or a specific tag.
        context_data["exists:NatGateway"] = True
        context_data["id:NatGateway"] = nat_gateways[0]['NatGatewayId'] # Store the ID of the first one found
    else:
        context_data["exists:NatGateway"] = False
        context_data["id:NatGateway"] = None

    if not vpc_id:
        # If the VPC doesn't exist, you might not be able to check/create subnets.
        # Decide how to handle this: raise an error, set a flag, etc.
        raise RuntimeError(f"Required VPC '{VPC_NAME}' not found. Cannot proceed with subnet checks.")

    context_data["vpc_id"] = vpc_id # Store VPC ID in context

    # SUBNET CHECKS
    context_data: Dict[str, Any] = {}
    all_proposed_subnets_data: List[Dict[str, str]] = []

    # Flag to indicate if full validation mode (with CIDR/AZs) is active
    full_validation_mode = False

    # Determine if full validation mode is possible/desired
    # It's 'desired' if CIDR/AZs are provided, and their lengths match the name lists.
    public_ready_for_full_validation = (
        len(PUBLIC_SUBNETS_TO_USE) > 0 and
        len(PUBLIC_SUBNET_CIDR_BLOCKS) == len(PUBLIC_SUBNETS_TO_USE) and
        len(PUBLIC_SUBNET_AVAILABILITY_ZONES) == len(PUBLIC_SUBNETS_TO_USE)
    )
    private_ready_for_full_validation = (
        len(PRIVATE_SUBNETS_TO_USE) > 0 and
        len(PRIVATE_SUBNET_CIDR_BLOCKS) == len(PRIVATE_SUBNETS_TO_USE) and
        len(PRIVATE_SUBNET_AVAILABILITY_ZONES) == len(PRIVATE_SUBNETS_TO_USE)
    )

    # Activate full validation if *any* type of subnet (public or private) has its full details provided.
    # You might adjust this logic if you require ALL subnet types to have CIDRs, or NONE.
    if public_ready_for_full_validation or private_ready_for_full_validation:
        full_validation_mode = True

        # If some are ready but others aren't, print a warning or raise an error based on your strictness
        if public_ready_for_full_validation and not private_ready_for_full_validation and PRIVATE_SUBNETS_TO_USE:
            print("Warning: Public subnets have CIDRs/AZs, but private subnets do not. Only public will be fully validated/created with CIDRs.")
        if private_ready_for_full_validation and not public_ready_for_full_validation and PUBLIC_SUBNETS_TO_USE:
            print("Warning: Private subnets have CIDRs/AZs, but public subnets do not. Only private will be fully validated/created with CIDRs.")

        # Prepare data for validate_subnet_creation_parameters for all subnets that have full details
        if public_ready_for_full_validation:
            for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
                all_proposed_subnets_data.append({
                    'name': name,
                    'cidr': PUBLIC_SUBNET_CIDR_BLOCKS[i],
                    'az': PUBLIC_SUBNET_AVAILABILITY_ZONES[i]
                })
        if private_ready_for_full_validation:
            for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
                all_proposed_subnets_data.append({
                    'name': name,
                    'cidr': PRIVATE_SUBNET_CIDR_BLOCKS[i],
                    'az': PRIVATE_SUBNET_AVAILABILITY_ZONES[i]
                })


    print(f"Target VPC ID for Boto3 lookup: {vpc_id}")

    # Fetch all existing subnets in the target VPC once to avoid repeated API calls
    try:
        existing_aws_subnets = _get_existing_subnets_in_vpc(vpc_id)
    except Exception as e:
        print(f"Failed to fetch existing VPC subnets. Aborting. Error: {e}")
        raise SystemExit(1) # Exit immediately if we can't get baseline data
    
    print("\n--- Running Name-Only Subnet Existence Check Mode ---")
    # Fallback: check only by name using the existing data
    checked_public_subnets = {}
    if PUBLIC_SUBNETS_TO_USE:
        for subnet_name in PUBLIC_SUBNETS_TO_USE:
            print("subnet_name:", subnet_name)
            exists, subnet_id = check_subnet_exists_by_name(subnet_name, existing_aws_subnets)
            checked_public_subnets[subnet_name] = {"exists": exists, "id": subnet_id}

            # If the subnet exists, remove it from the proposed subnets list
            if checked_public_subnets[subnet_name]["exists"] == True:
                all_proposed_subnets_data = [
                    subnet for subnet in all_proposed_subnets_data 
                    if subnet['name'] != subnet_name
                ]

    context_data["checked_public_subnets"] = checked_public_subnets

    checked_private_subnets = {}
    if PRIVATE_SUBNETS_TO_USE:
        for subnet_name in PRIVATE_SUBNETS_TO_USE:
            print("subnet_name:", subnet_name)
            exists, subnet_id = check_subnet_exists_by_name(subnet_name, existing_aws_subnets)
            checked_private_subnets[subnet_name] = {"exists": exists, "id": subnet_id}

            # If the subnet exists, remove it from the proposed subnets list
            if checked_private_subnets[subnet_name]["exists"] == True:
                all_proposed_subnets_data = [
                    subnet for subnet in all_proposed_subnets_data 
                    if subnet['name'] != subnet_name
                ]

    context_data["checked_private_subnets"] = checked_private_subnets



    print("\nName-only existence subnet check complete.\n")

    if full_validation_mode:
        print("\n--- Running in Full Subnet Validation Mode (CIDR/AZs provided) ---")
        try:
            validate_subnet_creation_parameters(vpc_id, all_proposed_subnets_data, existing_aws_subnets)
            print("\nPre-synth validation successful. Proceeding with CDK synth.\n")

            # Populate context_data for downstream CDK construct creation
            context_data["public_subnets_to_create"] = []
            if public_ready_for_full_validation:
                for i, name in enumerate(PUBLIC_SUBNETS_TO_USE):
                    context_data["public_subnets_to_create"].append({
                        'name': name,
                        'cidr': PUBLIC_SUBNET_CIDR_BLOCKS[i],
                        'az': PUBLIC_SUBNET_AVAILABILITY_ZONES[i],
                        'is_public': True
                    })
            context_data["private_subnets_to_create"] = []
            if private_ready_for_full_validation:
                for i, name in enumerate(PRIVATE_SUBNETS_TO_USE):
                    context_data["private_subnets_to_create"].append({
                        'name': name,
                        'cidr': PRIVATE_SUBNET_CIDR_BLOCKS[i],
                        'az': PRIVATE_SUBNET_AVAILABILITY_ZONES[i],
                        'is_public': False
                    })

        except (ValueError, Exception) as e:
            print(f"\nFATAL ERROR: Subnet parameter validation failed: {e}\n")
            raise SystemExit(1) # Exit if validation fails

    # Example checks and setting context values
    # IAM Roles
    role_name = CODEBUILD_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists # Use boolean
    if exists:
         _, role_arn, _ = check_for_existing_role(role_name) # Get ARN if needed
         context_data[f"arn:{role_name}"] = role_arn

    role_name = ECS_TASK_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists
    if exists:
         _, role_arn, _ = check_for_existing_role(role_name)
         context_data[f"arn:{role_name}"] = role_arn

    role_name = ECS_TASK_EXECUTION_ROLE_NAME
    exists, _, _ = check_for_existing_role(role_name)
    context_data[f"exists:{role_name}"] = exists
    if exists:
         _, role_arn, _ = check_for_existing_role(role_name)
         context_data[f"arn:{role_name}"] = role_arn

    # S3 Buckets
    bucket_name = S3_LOG_CONFIG_BUCKET_NAME
    exists, _ = check_s3_bucket_exists(bucket_name)
    context_data[f"exists:{bucket_name}"] = exists
    if exists:
        # You might not need the ARN if using from_bucket_name
        pass

    output_bucket_name = S3_OUTPUT_BUCKET_NAME
    exists, _ = check_s3_bucket_exists(output_bucket_name)
    context_data[f"exists:{output_bucket_name}"] = exists
    if exists:
         pass

    # ECR Repository
    repo_name = ECR_CDK_REPO_NAME
    exists, _ = check_ecr_repo_exists(repo_name)
    context_data[f"exists:{repo_name}"] = exists
    if exists:
         pass # from_repository_name is sufficient

    # CodeBuild Project
    project_name = CODEBUILD_PROJECT_NAME
    exists, _ = check_codebuild_project_exists(project_name)
    context_data[f"exists:{project_name}"] = exists
    if exists:
         # Need a way to get the ARN from the check function
         _, project_arn = check_codebuild_project_exists(project_name) # Assuming it returns ARN
         context_data[f"arn:{project_name}"] = project_arn

    # ALB (by name lookup)
    alb_name = ALB_NAME
    exists, _ = check_alb_exists(alb_name, region_name=AWS_REGION)
    context_data[f"exists:{alb_name}"] = exists
    if exists:
        _, alb_object = check_alb_exists(alb_name, region_name=AWS_REGION) # Assuming check returns object
        print("alb_object:", alb_object)
        context_data[f"arn:{alb_name}"] = alb_object['LoadBalancerArn']


    # Cognito User Pool (by name)
    user_pool_name = COGNITO_USER_POOL_NAME
    exists, user_pool_id, _ = check_for_existing_user_pool(user_pool_name)
    context_data[f"exists:{user_pool_name}"] = exists
    if exists:
        context_data[f"id:{user_pool_name}"] = user_pool_id

    # Cognito User Pool Client (by name and pool ID) - requires User Pool ID from check
    if user_pool_id:
        user_pool_id_for_client_check = user_pool_id #context_data.get(f"id:{user_pool_name}") # Use ID from context
        user_pool_client_name = COGNITO_USER_POOL_CLIENT_NAME
        if user_pool_id_for_client_check:
            exists, client_id, _ = check_for_existing_user_pool_client(user_pool_client_name, user_pool_id_for_client_check)
            context_data[f"exists:{user_pool_client_name}"] = exists
            if exists:
                context_data[f"id:{user_pool_client_name}"] = client_id

    # Secrets Manager Secret (by name)
    secret_name = COGNITO_USER_POOL_CLIENT_SECRET_NAME
    exists, _ = check_for_secret(secret_name)
    context_data[f"exists:{secret_name}"] = exists
    # You might not need the ARN if using from_secret_name_v2


    # WAF Web ACL (by name and scope)
    web_acl_name = WEB_ACL_NAME
    exists, _ = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT") # Assuming check returns object
    context_data[f"exists:{web_acl_name}"] = exists
    if exists:
        _, existing_web_acl = check_web_acl_exists(web_acl_name, scope="CLOUDFRONT")
        context_data[f"arn:{web_acl_name}"] = existing_web_acl.attr_arn

    # Write the context data to the file
    with open(CONTEXT_FILE, "w") as f:
        json.dump(context_data, f, indent=2)

    print(f"Context data written to {CONTEXT_FILE}")