diff --git a/prowler/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py b/prowler/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py index 0b69fa55..9f04cf54 100644 --- a/prowler/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py +++ b/prowler/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py @@ -1,3 +1,5 @@ +from re import compile + from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.vpc.vpc_client import vpc_client @@ -21,7 +23,14 @@ class vpc_endpoint_services_allowed_principals_trust_boundaries(Check): findings.append(report) else: for principal in service.allowed_principals: - account_id = principal.split(":")[4] + # Account ID can be an ARN or just a 12-digit string + pattern = compile(r"^[0-9]{12}$") + match = pattern.match(principal) + if not match: + account_id = principal.split(":")[4] + else: + account_id = match.string + report = Check_Report_AWS(self.metadata()) report.region = service.region if ( diff --git a/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py b/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py index 57e5ec48..5695ad9b 100644 --- a/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py +++ b/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py @@ -1,85 +1,27 @@ from unittest import mock -import botocore -from boto3 import client, session -from mock import patch +from boto3 import client from moto import mock_ec2, mock_elbv2 -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info -from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.audit_info_utils import ( + AWS_REGION_US_EAST_1, + set_mocked_aws_audit_info, +) -AWS_REGION = "us-east-1" AWS_ACCOUNT_NUMBER = "123456789012" - -# Mocking VPC Calls -make_api_call = botocore.client.BaseClient._make_api_call +AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" +AWS_ACCOUNT_NUMBER_2 = "111122223333" +AWS_ACCOUNT_ARN_2 = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER_2}:root" -def mock_make_api_call(self, operation_name, kwarg): - """ - We have to mock every AWS API call using Boto3 - - Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 - """ - if operation_name == "DescribeVpcEndpointServices": - return { - "ServiceDetails": [ - { - "ServiceId": "vpce-svc-4b919ac5", - "ServiceName": "string", - "Owner": AWS_ACCOUNT_NUMBER, - "StageName": "test-stage", - } - ] - } - return make_api_call(self, operation_name, kwarg) - - -@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=[AWS_REGION], - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - - return audit_info - @mock_ec2 - def test_vpc_no_endpoint_services(self): - # VPC Endpoint Services - ec2_client = client("ec2", region_name=AWS_REGION) - endpoint_id = ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ - "ServiceId" - ] - endpoint_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" - + def test_no_vpc_endpoint_services(self): from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info = self.set_mocked_audit_info() + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) # Set config variable current_audit_info.audit_config = {"trusted_account_ids": []} @@ -99,23 +41,14 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: check = vpc_endpoint_services_allowed_principals_trust_boundaries() result = check.execute() - assert len(result) == 1 # one endpoint per region - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"VPC Endpoint Service {endpoint_id} has no allowed principals." - ) - assert result[0].resource_id == endpoint_id - assert result[0].resource_arn == endpoint_arn - assert result[0].resource_tags is None - assert result[0].region == AWS_REGION + assert len(result) == 0 @mock_ec2 @mock_elbv2 def test_vpc_endpoint_service_without_allowed_principals(self): # Create VPC Mocked Resources - ec2_client = client("ec2", region_name=AWS_REGION) - elbv2_client = client("elbv2", region_name=AWS_REGION) + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1) vpc = ec2_client.create_vpc( CidrBlock="172.28.7.0/24", InstanceTenancy="default" @@ -123,7 +56,7 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: subnet = ec2_client.create_subnet( VpcId=vpc["Vpc"]["VpcId"], CidrBlock="172.28.7.192/26", - AvailabilityZone=f"{AWS_REGION}a", + AvailabilityZone=f"{AWS_REGION_US_EAST_1}a", ) lb_name = "lb_vpce-test" lb_arn = elbv2_client.create_load_balancer( @@ -133,18 +66,17 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: Type="network", )["LoadBalancers"][0]["LoadBalancerArn"] - _ = ec2_client.create_vpc_endpoint_service_configuration( + endpoint_id = ec2_client.create_vpc_endpoint_service_configuration( NetworkLoadBalancerArns=[lb_arn] - ) + )["ServiceConfiguration"]["ServiceId"] - endpoint_id = ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ - "ServiceId" - ] - endpoint_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" + endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info = self.set_mocked_audit_info() + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) # Set config variable current_audit_info.audit_config = {"trusted_account_ids": []} @@ -168,9 +100,289 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: assert result[0].status == "PASS" assert ( result[0].status_extended - == f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals." + == f"VPC Endpoint Service {endpoint_id} has no allowed principals." ) assert result[0].resource_id == endpoint_id assert result[0].resource_arn == endpoint_arn - assert result[0].resource_tags is None - assert result[0].region == AWS_REGION + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_ec2 + @mock_elbv2 + def test_vpc_endpoint_service_with_allowed_principal_account_arn(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION_US_EAST_1}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + endpoint_id = ec2_client.create_vpc_endpoint_service_configuration( + NetworkLoadBalancerArns=[lb_arn] + )["ServiceConfiguration"]["ServiceId"] + + # Add allowed principals + ec2_client.modify_vpc_endpoint_service_permissions( + ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_ARN] + ) + + endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) + # Set config variable + current_audit_info.audit_config = {"trusted_account_ids": []} + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint Service {endpoint_id}." + ) + assert result[0].resource_id == endpoint_id + assert result[0].resource_arn == endpoint_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_ec2 + @mock_elbv2 + def test_vpc_endpoint_service_with_allowed_principal_account_number(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION_US_EAST_1}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + endpoint_id = ec2_client.create_vpc_endpoint_service_configuration( + NetworkLoadBalancerArns=[lb_arn] + )["ServiceConfiguration"]["ServiceId"] + + # Add allowed principals + ec2_client.modify_vpc_endpoint_service_permissions( + ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER] + ) + + endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) + # Set config variable + current_audit_info.audit_config = {"trusted_account_ids": []} + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint Service {endpoint_id}." + ) + assert result[0].resource_id == endpoint_id + assert result[0].resource_arn == endpoint_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_ec2 + @mock_elbv2 + def test_vpc_endpoint_service_with_principal_not_allowed(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION_US_EAST_1}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + endpoint_id = ec2_client.create_vpc_endpoint_service_configuration( + NetworkLoadBalancerArns=[lb_arn] + )["ServiceConfiguration"]["ServiceId"] + + # Add allowed principals + ec2_client.modify_vpc_endpoint_service_permissions( + ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER_2] + ) + + endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) + # Set config variable + current_audit_info.audit_config = {"trusted_account_ids": []} + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Found untrusted account {AWS_ACCOUNT_NUMBER_2} in VPC Endpoint Service {endpoint_id}." + ) + assert result[0].resource_id == endpoint_id + assert result[0].resource_arn == endpoint_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_ec2 + @mock_elbv2 + def test_vpc_endpoint_service_with_principal_different_than_account_but_allowed_in_config( + self, + ): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION_US_EAST_1}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + endpoint_id = ec2_client.create_vpc_endpoint_service_configuration( + NetworkLoadBalancerArns=[lb_arn] + )["ServiceConfiguration"]["ServiceId"] + + # Add allowed principals + ec2_client.modify_vpc_endpoint_service_permissions( + ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER_2] + ) + + endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}" + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = set_mocked_aws_audit_info( + audited_regions=[AWS_REGION_US_EAST_1] + ) + # Set config variable + current_audit_info.audit_config = { + "trusted_account_ids": [AWS_ACCOUNT_NUMBER_2] + } + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {AWS_ACCOUNT_NUMBER_2} in VPC Endpoint Service {endpoint_id}." + ) + assert result[0].resource_id == endpoint_id + assert result[0].resource_arn == endpoint_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION_US_EAST_1