From 3e807af2b20ae1ed80d9cce9b44b6b7280e837f2 Mon Sep 17 00:00:00 2001 From: Oleksandr Mykytenko Date: Wed, 8 Feb 2023 13:13:22 +0200 Subject: [PATCH] fix(checks): added validation for non-existing VPC endpoint policy (#1859) Co-authored-by: sergargar --- ...c_endpoint_connections_trust_boundaries.py | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py b/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py index 7eb73700..1c4f3540 100644 --- a/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py +++ b/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py @@ -10,41 +10,42 @@ class vpc_endpoint_connections_trust_boundaries(Check): trusted_account_ids = get_config_var("trusted_account_ids") for endpoint in vpc_client.vpc_endpoints: # Check VPC endpoint policy - for statement in endpoint.policy_document["Statement"]: - if "*" == statement["Principal"]: - report = Check_Report_AWS(self.metadata()) - report.region = endpoint.region - report.status = "FAIL" - report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." - report.resource_id = endpoint.id - findings.append(report) - break - - else: - if type(statement["Principal"]["AWS"]) == str: - principals = [statement["Principal"]["AWS"]] - else: - principals = statement["Principal"]["AWS"] - for principal_arn in principals: + if endpoint.policy_document: + for statement in endpoint.policy_document["Statement"]: + if "*" == statement["Principal"]: report = Check_Report_AWS(self.metadata()) report.region = endpoint.region - if principal_arn == "*": - report.status = "FAIL" - report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." - report.resource_id = endpoint.id + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." + report.resource_id = endpoint.id + findings.append(report) + break + + else: + if type(statement["Principal"]["AWS"]) == str: + principals = [statement["Principal"]["AWS"]] else: - account_id = principal_arn.split(":")[4] - if ( - account_id in trusted_account_ids - or account_id in vpc_client.audited_account - ): - report.status = "PASS" - report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + principals = statement["Principal"]["AWS"] + for principal_arn in principals: + report = Check_Report_AWS(self.metadata()) + report.region = endpoint.region + if principal_arn == "*": + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." report.resource_id = endpoint.id else: - report.status = "FAIL" - report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." - report.resource_id = endpoint.id - findings.append(report) + account_id = principal_arn.split(":")[4] + if ( + account_id in trusted_account_ids + or account_id in vpc_client.audited_account + ): + report.status = "PASS" + report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + report.resource_id = endpoint.id + else: + report.status = "FAIL" + report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + report.resource_id = endpoint.id + findings.append(report) return findings