refactor(vpc_endpoint_connections_trust_boundaries) (#2667)

This commit is contained in:
Pepe Fagoaga
2023-08-03 09:56:09 +02:00
committed by GitHub
parent c335334402
commit 5763bca317
2 changed files with 45 additions and 76 deletions

View File

@@ -24,29 +24,24 @@ class vpc_endpoint_connections_trust_boundaries(Check):
if not access_from_trusted_accounts: if not access_from_trusted_accounts:
break break
if "*" == statement["Principal"]: if "*" == statement["Principal"]:
access_from_trusted_accounts = False
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = endpoint.region report.region = endpoint.region
report.resource_id = endpoint.id report.resource_id = endpoint.id
report.resource_arn = endpoint.arn report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags report.resource_tags = endpoint.tags
for account_id in trusted_account_ids: if "Condition" in statement:
if ( for account_id in trusted_account_ids:
"Condition" in statement if is_account_only_allowed_in_condition(
and is_account_only_allowed_in_condition(
statement["Condition"], account_id statement["Condition"], account_id
) ):
): access_from_trusted_accounts = True
access_from_trusted_accounts = True else:
else: access_from_trusted_accounts = False
access_from_trusted_accounts = False break
break
if ( if not access_from_trusted_accounts:
not access_from_trusted_accounts
or len(trusted_account_ids) == 0
):
access_from_trusted_accounts = False
report.status = "FAIL" report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts." report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else: else:
@@ -63,30 +58,25 @@ class vpc_endpoint_connections_trust_boundaries(Check):
else: else:
principals = statement["Principal"]["AWS"] principals = statement["Principal"]["AWS"]
for principal_arn in principals: for principal_arn in principals:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags
if principal_arn == "*": if principal_arn == "*":
report = Check_Report_AWS(self.metadata()) access_from_trusted_accounts = False
report.region = endpoint.region if "Condition" in statement:
report.resource_id = endpoint.id for account_id in trusted_account_ids:
report.resource_arn = endpoint.arn if is_account_only_allowed_in_condition(
report.resource_tags = endpoint.tags
for account_id in trusted_account_ids:
if (
"Condition" in statement
and is_account_only_allowed_in_condition(
statement["Condition"], account_id statement["Condition"], account_id
) ):
): access_from_trusted_accounts = True
access_from_trusted_accounts = True else:
else: access_from_trusted_accounts = False
access_from_trusted_accounts = False break
break
if ( if not access_from_trusted_accounts:
not access_from_trusted_accounts
or len(trusted_account_ids) == 0
):
access_from_trusted_accounts = False
report.status = "FAIL" report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts." report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else: else:
@@ -104,50 +94,29 @@ class vpc_endpoint_connections_trust_boundaries(Check):
account_id = principal_arn.split(":")[4] account_id = principal_arn.split(":")[4]
else: else:
account_id = match.string account_id = match.string
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags
findings.append(report)
else:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags
if account_id not in trusted_account_ids:
access_from_trusted_accounts = False
if "Condition" in statement:
for account_id in trusted_account_ids: for account_id in trusted_account_ids:
if ( if is_account_only_allowed_in_condition(
"Condition" in statement statement["Condition"], account_id
and is_account_only_allowed_in_condition(
statement["Condition"], account_id
)
): ):
access_from_trusted_accounts = True access_from_trusted_accounts = True
else: else:
access_from_trusted_accounts = False access_from_trusted_accounts = False
break break
if ( if not access_from_trusted_accounts:
not access_from_trusted_accounts report.status = "FAIL"
or len(trusted_account_ids) == 0 report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
): else:
access_from_trusted_accounts = False report.status = "PASS"
report.status = "FAIL" report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts."
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else:
report.status = "PASS"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts."
findings.append(report) findings.append(report)
if not access_from_trusted_accounts: if not access_from_trusted_accounts:
break break
return findings return findings

View File

@@ -186,7 +186,7 @@ class Test_vpc_endpoint_connections_trust_boundaries:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
) )
assert ( assert (
result[0].resource_id result[0].resource_id
@@ -244,7 +244,7 @@ class Test_vpc_endpoint_connections_trust_boundaries:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
) )
assert ( assert (
result[0].resource_id result[0].resource_id
@@ -368,7 +368,7 @@ class Test_vpc_endpoint_connections_trust_boundaries:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
) )
assert ( assert (
result[0].resource_id result[0].resource_id
@@ -430,7 +430,7 @@ class Test_vpc_endpoint_connections_trust_boundaries:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
) )
assert ( assert (
result[0].resource_id result[0].resource_id