From 5bf3f7071766c923dcf64d96342644ac7f566a02 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Thu, 3 Aug 2023 09:16:58 +0200 Subject: [PATCH] fix(vpc_endpoint_connections_trust_boundaries): Handle AWS Account ID as Principal (#2611) --- .../policy_condition_parser.py | 60 +-- ...c_endpoint_connections_trust_boundaries.py | 116 ++++- .../policy_condition_parser_test.py | 268 +++++++++++- ...point_connections_trust_boundaries_test.py | 397 +++++++++++++++++- 4 files changed, 782 insertions(+), 59 deletions(-) diff --git a/prowler/providers/aws/lib/policy_condition_parser/policy_condition_parser.py b/prowler/providers/aws/lib/policy_condition_parser/policy_condition_parser.py index 9c9f9cc0..6c4d1956 100644 --- a/prowler/providers/aws/lib/policy_condition_parser/policy_condition_parser.py +++ b/prowler/providers/aws/lib/policy_condition_parser/policy_condition_parser.py @@ -1,39 +1,45 @@ +# lista de cuentas y te devuelva las vĂ¡lidas def is_account_only_allowed_in_condition( condition_statement: dict, source_account: str ): is_condition_valid = False valid_condition_options = { - "StringEquals": "aws:SourceAccount", - "ArnLike": "aws:SourceArn", - "ArnEquals": "aws:SourceArn", + "StringEquals": [ + "aws:SourceAccount", + "s3:ResourceAccount", + "aws:PrincipalAccount", + ], + "StringLike": ["aws:SourceArn", "aws:PrincipalArn"], + "ArnLike": ["aws:SourceArn", "aws:PrincipalArn"], + "ArnEquals": ["aws:SourceArn", "aws:PrincipalArn"], } + for condition_operator, condition_operator_key in valid_condition_options.items(): if condition_operator in condition_statement: - if condition_operator_key in condition_statement[condition_operator]: - # values are a list - if isinstance( - condition_statement[condition_operator][condition_operator_key], - list, - ): - # if there is an arn/account without the source account -> we do not consider it safe - # here by default we assume is true and look for false entries - is_condition_valid = True - for item in condition_statement[condition_operator][ - condition_operator_key - ]: - if source_account not in item: - is_condition_valid = False - break - # value is a string - elif isinstance( - condition_statement[condition_operator][condition_operator_key], str - ): - if ( - source_account - in condition_statement[condition_operator][ - condition_operator_key - ] + for value in condition_operator_key: + if value in condition_statement[condition_operator]: + # values are a list + if isinstance( + condition_statement[condition_operator][value], + list, ): + # if there is an arn/account without the source account -> we do not consider it safe + # here by default we assume is true and look for false entries is_condition_valid = True + for item in condition_statement[condition_operator][value]: + if source_account not in item: + is_condition_valid = False + break + + # value is a string + elif isinstance( + condition_statement[condition_operator][value], + str, + ): + if ( + source_account + in condition_statement[condition_operator][value] + ): + is_condition_valid = True return is_condition_valid diff --git a/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py b/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py index de38a9ae..6d282282 100644 --- a/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py +++ b/prowler/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py @@ -1,5 +1,10 @@ +from re import compile + from prowler.config.config import get_config_var from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import ( + is_account_only_allowed_in_condition, +) from prowler.providers.aws.services.vpc.vpc_client import vpc_client @@ -8,20 +13,49 @@ class vpc_endpoint_connections_trust_boundaries(Check): findings = [] # Get trusted account_ids from prowler.config.yaml trusted_account_ids = get_config_var("trusted_account_ids") + # Always include the same account as trusted + trusted_account_ids.append(vpc_client.audited_account) for endpoint in vpc_client.vpc_endpoints: # Check VPC endpoint policy if endpoint.policy_document: + access_from_trusted_accounts = True for statement in endpoint.policy_document["Statement"]: + # If one policy allows access from a non-trusted account + if not access_from_trusted_accounts: + break if "*" == statement["Principal"]: report = Check_Report_AWS(self.metadata()) report.region = endpoint.region - report.status = "FAIL" - report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." report.resource_id = endpoint.id report.resource_arn = endpoint.arn report.resource_tags = endpoint.tags + + for account_id in trusted_account_ids: + if ( + "Condition" in statement + and is_account_only_allowed_in_condition( + statement["Condition"], account_id + ) + ): + access_from_trusted_accounts = True + else: + access_from_trusted_accounts = False + break + + if ( + not access_from_trusted_accounts + or len(trusted_account_ids) == 0 + ): + access_from_trusted_accounts = False + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts." + else: + report.status = "PASS" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts." + findings.append(report) - break + if not access_from_trusted_accounts: + break else: if isinstance(statement["Principal"]["AWS"], str): @@ -29,31 +63,91 @@ class vpc_endpoint_connections_trust_boundaries(Check): else: principals = statement["Principal"]["AWS"] for principal_arn in principals: - report = Check_Report_AWS(self.metadata()) - report.region = endpoint.region if principal_arn == "*": - report.status = "FAIL" - report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." + report = Check_Report_AWS(self.metadata()) + report.region = endpoint.region report.resource_id = endpoint.id report.resource_arn = endpoint.arn report.resource_tags = endpoint.tags + + for account_id in trusted_account_ids: + if ( + "Condition" in statement + and is_account_only_allowed_in_condition( + statement["Condition"], account_id + ) + ): + access_from_trusted_accounts = True + else: + access_from_trusted_accounts = False + break + + if ( + not access_from_trusted_accounts + or len(trusted_account_ids) == 0 + ): + access_from_trusted_accounts = False + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts." + else: + report.status = "PASS" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts." + + findings.append(report) + if not access_from_trusted_accounts: + break else: - account_id = principal_arn.split(":")[4] + # Account ID can be an ARN or just a 12-digit string + pattern = compile(r"^[0-9]{12}$") + match = pattern.match(principal_arn) + if not match: + account_id = principal_arn.split(":")[4] + else: + account_id = match.string if ( account_id in trusted_account_ids or account_id in vpc_client.audited_account ): + report = Check_Report_AWS(self.metadata()) + report.region = endpoint.region report.status = "PASS" report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." report.resource_id = endpoint.id report.resource_arn = endpoint.arn report.resource_tags = endpoint.tags + findings.append(report) else: - report.status = "FAIL" - report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + report = Check_Report_AWS(self.metadata()) + report.region = endpoint.region report.resource_id = endpoint.id report.resource_arn = endpoint.arn report.resource_tags = endpoint.tags - findings.append(report) + + for account_id in trusted_account_ids: + if ( + "Condition" in statement + and is_account_only_allowed_in_condition( + statement["Condition"], account_id + ) + ): + access_from_trusted_accounts = True + else: + access_from_trusted_accounts = False + break + + if ( + not access_from_trusted_accounts + or len(trusted_account_ids) == 0 + ): + access_from_trusted_accounts = False + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts." + else: + report.status = "PASS" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts." + + findings.append(report) + if not access_from_trusted_accounts: + break return findings diff --git a/tests/providers/aws/lib/policy_condition_parser/policy_condition_parser_test.py b/tests/providers/aws/lib/policy_condition_parser/policy_condition_parser_test.py index 60f20b46..7a289b83 100644 --- a/tests/providers/aws/lib/policy_condition_parser/policy_condition_parser_test.py +++ b/tests/providers/aws/lib/policy_condition_parser/policy_condition_parser_test.py @@ -6,19 +6,19 @@ AWS_ACCOUNT_NUMBER = "123456789012" class Test_policy_condition_parser: - def test_condition_parser_string_equals_list(self): + def test_condition_parser_string_equals_aws_SourceAccount_list(self): condition_statement = {"StringEquals": {"aws:SourceAccount": ["123456789012"]}} assert is_account_only_allowed_in_condition( condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_string_equals_str(self): + def test_condition_parser_string_equals_aws_SourceAccount_str(self): condition_statement = {"StringEquals": {"aws:SourceAccount": "123456789012"}} assert is_account_only_allowed_in_condition( condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_string_equals_list_not_valid(self): + def test_condition_parser_string_equals_aws_SourceAccount_list_not_valid(self): condition_statement = { "StringEquals": {"aws:SourceAccount": ["123456789012", "111222333444"]} } @@ -26,13 +26,67 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_string_equals_str_not_valid(self): + def test_condition_parser_string_equals_aws_SourceAccount_str_not_valid(self): condition_statement = {"StringEquals": {"aws:SourceAccount": "111222333444"}} assert not is_account_only_allowed_in_condition( condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnlike_list(self): + def test_condition_parser_string_equals_s3_ResourceAccount_list(self): + condition_statement = {"StringEquals": {"s3:ResourceAccount": ["123456789012"]}} + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_s3_ResourceAccount_str(self): + condition_statement = {"StringEquals": {"s3:ResourceAccount": "123456789012"}} + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_s3_ResourceAccount_list_not_valid(self): + condition_statement = { + "StringEquals": {"s3:ResourceAccount": ["123456789012", "111222333444"]} + } + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_s3_ResourceAccount_str_not_valid(self): + condition_statement = {"StringEquals": {"s3:ResourceAccount": "111222333444"}} + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_aws_PrincipalAccount_list(self): + condition_statement = { + "StringEquals": {"aws:PrincipalAccount": ["123456789012"]} + } + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_aws_PrincipalAccount_str(self): + condition_statement = {"StringEquals": {"aws:PrincipalAccount": "123456789012"}} + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_aws_PrincipalAccount_list_not_valid(self): + condition_statement = { + "StringEquals": {"aws:PrincipalAccount": ["123456789012", "111222333444"]} + } + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_equals_aws_PrincipalAccount_str_not_valid(self): + condition_statement = {"StringEquals": {"aws:PrincipalAccount": "111222333444"}} + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_like_aws_SourceArn_list(self): condition_statement = { "ArnLike": {"aws:SourceArn": ["arn:aws:cloudtrail:*:123456789012:trail/*"]} } @@ -41,7 +95,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnlike_list_not_valid(self): + def test_condition_parser_arn_like_aws_SourceArn_list_not_valid(self): condition_statement = { "ArnLike": { "aws:SourceArn": [ @@ -55,7 +109,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnlike_str(self): + def test_condition_parser_arn_like_aws_SourceArn_str(self): condition_statement = { "ArnLike": {"aws:SourceArn": "arn:aws:cloudtrail:*:123456789012:trail/*"} } @@ -64,7 +118,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnlike_str_not_valid(self): + def test_condition_parser_arn_like_aws_SourceArn_str_not_valid(self): condition_statement = { "ArnLike": {"aws:SourceArn": "arn:aws:cloudtrail:*:111222333444:trail/*"} } @@ -73,7 +127,50 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnequals_list(self): + def test_condition_parser_arn_like_aws_PrincipalArn_list(self): + condition_statement = { + "ArnLike": { + "aws:PrincipalArn": ["arn:aws:cloudtrail:*:123456789012:trail/*"] + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_like_aws_PrincipalArn_list_not_valid(self): + condition_statement = { + "ArnLike": { + "aws:PrincipalArn": [ + "arn:aws:cloudtrail:*:123456789012:trail/*", + "arn:aws:cloudtrail:*:111222333444:trail/*", + ] + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_like_aws_PrincipalArn_str(self): + condition_statement = { + "ArnLike": {"aws:PrincipalArn": "arn:aws:cloudtrail:*:123456789012:trail/*"} + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_like_aws_PrincipalArn_str_not_valid(self): + condition_statement = { + "ArnLike": {"aws:PrincipalArn": "arn:aws:cloudtrail:*:111222333444:trail/*"} + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_equals_aws_SourceArn_list(self): condition_statement = { "ArnEquals": { "aws:SourceArn": [ @@ -86,7 +183,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnequals_list_not_valid(self): + def test_condition_parser_arn_equals_aws_SourceArn_list_not_valid(self): condition_statement = { "ArnEquals": { "aws:SourceArn": [ @@ -100,7 +197,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnequals_str(self): + def test_condition_parser_arn_equals_aws_SourceArn_str(self): condition_statement = { "ArnEquals": { "aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" @@ -111,7 +208,7 @@ class Test_policy_condition_parser: condition_statement, AWS_ACCOUNT_NUMBER ) - def test_condition_parser_arnequals_str_not_valid(self): + def test_condition_parser_arn_equals_aws_SourceArn_str_not_valid(self): condition_statement = { "ArnEquals": { "aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test" @@ -121,3 +218,150 @@ class Test_policy_condition_parser: assert not is_account_only_allowed_in_condition( condition_statement, AWS_ACCOUNT_NUMBER ) + + def test_condition_parser_arn_equals_aws_PrincipalArn_list(self): + condition_statement = { + "ArnEquals": { + "aws:PrincipalArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + ] + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_equals_aws_PrincipalArn_list_not_valid(self): + condition_statement = { + "ArnEquals": { + "aws:PrincipalArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test", + "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test", + ] + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_equals_aws_PrincipalArn_str(self): + condition_statement = { + "ArnEquals": { + "aws:PrincipalArn": "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_arn_equals_aws_PrincipalArn_str_not_valid(self): + condition_statement = { + "ArnEquals": { + "aws:PrincipalArn": "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test" + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_SourceArn_list(self): + condition_statement = { + "StringLike": { + "aws:SourceArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + ] + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_SourceArn_list_not_valid(self): + condition_statement = { + "StringLike": { + "aws:SourceArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test", + "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test", + ] + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_SourceArn_str(self): + condition_statement = { + "StringLike": { + "aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_SourceArn_str_not_valid(self): + condition_statement = { + "StringLike": { + "aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test" + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_PrincipalArn_list(self): + condition_statement = { + "StringLike": { + "aws:PrincipalArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + ] + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_PrincipalArn_list_not_valid(self): + condition_statement = { + "StringLike": { + "aws:PrincipalArn": [ + "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test", + "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test", + ] + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_PrincipalArn_str(self): + condition_statement = { + "StringLike": { + "aws:PrincipalArn": "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test" + } + } + + assert is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) + + def test_condition_parser_string_like_aws_PrincipalArn_str_not_valid(self): + condition_statement = { + "StringLike": { + "aws:PrincipalArn": "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test" + } + } + + assert not is_account_only_allowed_in_condition( + condition_statement, AWS_ACCOUNT_NUMBER + ) diff --git a/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py b/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py index 176e7204..a56de713 100644 --- a/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py +++ b/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py @@ -9,11 +9,13 @@ from prowler.providers.common.models import Audit_Metadata AWS_REGION = "us-east-1" AWS_ACCOUNT_NUMBER = "123456789012" +TRUSTED_AWS_ACCOUNT_NUMBER = "111122223333" +NON_TRUSTED_AWS_ACCOUNT_NUMBER = "000011112222" def mock_get_config_var(config_var): if config_var == "trusted_account_ids": - return ["123456789010"] + return [TRUSTED_AWS_ACCOUNT_NUMBER] return [] @@ -124,7 +126,7 @@ class Test_vpc_endpoint_connections_trust_boundaries: assert result[0].status == "FAIL" assert ( result[0].status_extended - == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access." + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can be accessed from non-trusted accounts." ) assert ( result[0].resource_id @@ -133,7 +135,7 @@ class Test_vpc_endpoint_connections_trust_boundaries: assert result[0].region == AWS_REGION @mock_ec2 - def test_vpc_endpoint_with_trusted_account(self): + def test_vpc_endpoint_with_trusted_account_arn(self): # Create VPC Mocked Resources ec2_client = client("ec2", region_name=AWS_REGION) @@ -150,7 +152,67 @@ class Test_vpc_endpoint_connections_trust_boundaries: "Statement": [ { "Effect": "Allow", - "Principal": {"AWS": "arn:aws:iam::123456789012:root"}, + "Principal": { + "AWS": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" + }, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_trusted_account_id(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": AWS_ACCOUNT_NUMBER}, "Action": "*", "Resource": "*", } @@ -208,7 +270,9 @@ class Test_vpc_endpoint_connections_trust_boundaries: "Statement": [ { "Effect": "Allow", - "Principal": {"AWS": "arn:aws:iam::123456789010:root"}, + "Principal": { + "AWS": f"arn:aws:iam::{NON_TRUSTED_AWS_ACCOUNT_NUMBER}:root" + }, "Action": "*", "Resource": "*", } @@ -241,7 +305,7 @@ class Test_vpc_endpoint_connections_trust_boundaries: assert result[0].status == "FAIL" assert ( result[0].status_extended - == f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can be accessed from non-trusted accounts." ) assert ( result[0].resource_id @@ -249,7 +313,7 @@ class Test_vpc_endpoint_connections_trust_boundaries: ) @mock_ec2 - def test_vpc_endpoint_with_config_trusted_account(self): + def test_vpc_endpoint_with_config_trusted_account_with_arn(self): # Create VPC Mocked Resources ec2_client = client("ec2", region_name=AWS_REGION) @@ -266,7 +330,9 @@ class Test_vpc_endpoint_connections_trust_boundaries: "Statement": [ { "Effect": "Allow", - "Principal": {"AWS": "arn:aws:iam::123456789010:root"}, + "Principal": { + "AWS": f"arn:aws:iam::{TRUSTED_AWS_ACCOUNT_NUMBER}:root" + }, "Action": "*", "Resource": "*", } @@ -302,10 +368,323 @@ class Test_vpc_endpoint_connections_trust_boundaries: assert result[0].status == "PASS" assert ( result[0].status_extended - == f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + == f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." ) assert ( result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] ) assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_config_trusted_account(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": [TRUSTED_AWS_ACCOUNT_NUMBER]}, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var", + new=mock_get_config_var, + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_two_account_ids_one_trusted_one_not(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": [ + NON_TRUSTED_AWS_ACCOUNT_NUMBER, + TRUSTED_AWS_ACCOUNT_NUMBER, + ] + }, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can be accessed from non-trusted accounts." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_aws_principal_all(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can be accessed from non-trusted accounts." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_aws_principal_all_but_restricted_condition_with_SourceAccount( + self, + ): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Action": "*", + "Effect": "Allow", + "Principal": "*", + "Resource": "*", + "Condition": { + "StringEquals": { + "aws:SourceAccount": AWS_ACCOUNT_NUMBER + } + }, + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_aws_principal_all_but_restricted_condition_with_PrincipalAccount( + self, + ): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Action": "*", + "Effect": "Allow", + "Principal": "*", + "Resource": "*", + "Condition": { + "StringEquals": { + "aws:PrincipalAccount": AWS_ACCOUNT_NUMBER + } + }, + } + ] + } + ), + ) + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION