fix(checks): added validation for non-existing VPC endpoint policy (#1859)

Co-authored-by: sergargar <sergargar@users.noreply.github.com>
This commit is contained in:
Oleksandr Mykytenko
2023-02-08 13:13:22 +02:00
committed by GitHub
parent 4c64dc7885
commit 3e807af2b2

View File

@@ -10,41 +10,42 @@ class vpc_endpoint_connections_trust_boundaries(Check):
trusted_account_ids = get_config_var("trusted_account_ids")
for endpoint in vpc_client.vpc_endpoints:
# Check VPC endpoint policy
for statement in endpoint.policy_document["Statement"]:
if "*" == statement["Principal"]:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
findings.append(report)
break
else:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if endpoint.policy_document:
for statement in endpoint.policy_document["Statement"]:
if "*" == statement["Principal"]:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
if principal_arn == "*":
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
findings.append(report)
break
else:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
account_id = principal_arn.split(":")[4]
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
if principal_arn == "*":
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
findings.append(report)
account_id = principal_arn.split(":")[4]
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
findings.append(report)
return findings