From f9e82abadc40132473f9d3344e1a0699a84b9257 Mon Sep 17 00:00:00 2001 From: Nacho Rivera Date: Fri, 14 Apr 2023 14:31:34 +0200 Subject: [PATCH] fix(vpc tests): mock current_audit_info (#2214) --- ...point_connections_trust_boundaries_test.py | 237 +++++++++++------- ...llowed_principals_trust_boundaries_test.py | 105 +++++--- .../vpc_flow_logs_enabled_test.py | 80 +++--- ...outing_tables_with_least_privilege_test.py | 175 +++++++------ 4 files changed, 363 insertions(+), 234 deletions(-) diff --git a/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py b/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py index 10ef098a..4dab57ac 100644 --- a/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py +++ b/tests/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py @@ -1,11 +1,13 @@ import json from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_ec2 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + AWS_REGION = "us-east-1" -ACCOUNT_ID = "123456789012" +AWS_ACCOUNT_NUMBER = "123456789012" def mock_get_config_var(config_var): @@ -15,27 +17,52 @@ def mock_get_config_var(config_var): class Test_vpc_endpoint_connections_trust_boundaries: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + @mock_ec2 def test_vpc_no_endpoints(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( - vpc_endpoint_connections_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) - check = vpc_endpoint_connections_trust_boundaries() - result = check.execute() + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_ec2 def test_vpc_endpoint_with_full_access(self): @@ -63,32 +90,38 @@ class Test_vpc_endpoint_connections_trust_boundaries: } ), ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( - vpc_endpoint_connections_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) - check = vpc_endpoint_connections_trust_boundaries() - result = check.execute() + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert ( - result[0].status_extended - == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access." - ) - assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] - assert result[0].region == AWS_REGION + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION @mock_ec2 def test_vpc_endpoint_with_trusted_account(self): @@ -116,33 +149,37 @@ class Test_vpc_endpoint_connections_trust_boundaries: } ), ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] - current_audit_info.audited_account = ACCOUNT_ID + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( - vpc_endpoint_connections_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) - check = vpc_endpoint_connections_trust_boundaries() - result = check.execute() + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"Found trusted account {ACCOUNT_ID} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." - ) - assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] - assert result[0].region == AWS_REGION + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION @mock_ec2 def test_vpc_endpoint_with_untrusted_account(self): @@ -170,32 +207,37 @@ class Test_vpc_endpoint_connections_trust_boundaries: } ), ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] - current_audit_info.audited_account = ACCOUNT_ID + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( - vpc_endpoint_connections_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) - check = vpc_endpoint_connections_trust_boundaries() - result = check.execute() + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert ( - result[0].status_extended - == f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." - ) - assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) @mock_ec2 def test_vpc_endpoint_with_config_trusted_account(self): @@ -223,37 +265,38 @@ class Test_vpc_endpoint_connections_trust_boundaries: } ), ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] - current_audit_info.audited_account = ACCOUNT_ID + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var", - new=mock_get_config_var, + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( - vpc_endpoint_connections_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var", + new=mock_get_config_var, + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) - check = vpc_endpoint_connections_trust_boundaries() - result = check.execute() + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." - ) - assert ( - result[0].resource_id - == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] - ) - assert result[0].region == AWS_REGION + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py b/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py index ec053b83..5bc7c1b8 100644 --- a/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py +++ b/tests/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py @@ -1,12 +1,14 @@ from unittest import mock import botocore -from boto3 import client +from boto3 import client, session from mock import patch from moto import mock_ec2, mock_elbv2 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + AWS_REGION = "us-east-1" -ACCOUNT_ID = "123456789012" +AWS_ACCOUNT_NUMBER = "123456789012" # Mocking VPC Calls make_api_call = botocore.client.BaseClient._make_api_call @@ -24,7 +26,7 @@ def mock_make_api_call(self, operation_name, kwarg): { "ServiceId": "vpce-svc-4b919ac5", "ServiceName": "string", - "Owner": ACCOUNT_ID, + "Owner": AWS_ACCOUNT_NUMBER, "StageName": "test-stage", } ] @@ -34,27 +36,52 @@ def mock_make_api_call(self, operation_name, kwarg): @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + @mock_ec2 def test_vpc_no_endpoint_services(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( - vpc_endpoint_services_allowed_principals_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) - check = vpc_endpoint_services_allowed_principals_trust_boundaries() - result = check.execute() + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() - assert len(result) == 2 # one endpoint per region + assert len(result) == 2 # one endpoint per region @mock_ec2 @mock_elbv2 @@ -84,33 +111,35 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: # NetworkLoadBalancerArns=[lb_arn] # ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( - vpc_endpoint_services_allowed_principals_trust_boundaries, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) - check = vpc_endpoint_services_allowed_principals_trust_boundaries() - result = check.execute() + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() - assert len(result) == 2 # one per region - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals." - ) - assert ( - result[0].resource_id - == ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ - "ServiceId" - ] - ) + assert len(result) == 2 # one per region + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals." + ) + assert ( + result[0].resource_id + == ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ + "ServiceId" + ] + ) diff --git a/tests/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py b/tests/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py index 889a9926..1079acf1 100644 --- a/tests/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py +++ b/tests/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py @@ -1,34 +1,62 @@ from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_ec2 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + AWS_REGION = "us-east-1" -ACCOUNT_ID = "123456789012" +AWS_ACCOUNT_NUMBER = "123456789012" class Test_vpc_flow_logs_enabled: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_ec2 def test_vpc_only_default_vpcs(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( - vpc_flow_logs_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( + vpc_flow_logs_enabled, + ) - check = vpc_flow_logs_enabled() - result = check.execute() + check = vpc_flow_logs_enabled() + result = check.execute() - assert len(result) == 2 # Number of AWS regions, one default VPC per region + assert ( + len(result) == 2 + ) # Number of AWS regions, one default VPC per region @mock_ec2 def test_vpc_with_flow_logs(self): @@ -43,18 +71,16 @@ class Test_vpc_flow_logs_enabled: TrafficType="ALL", LogDestinationType="cloud-watch-logs", LogGroupName="test_logs", - DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role", + DeliverLogsPermissionArn="arn:aws:iam::" + + AWS_ACCOUNT_NUMBER + + ":role/test-role", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.vpc.vpc_service import VPC - - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): # Test Check from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( @@ -81,15 +107,11 @@ class Test_vpc_flow_logs_enabled: vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.vpc.vpc_service import VPC - - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): # Test Check from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( diff --git a/tests/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py b/tests/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py index 25d827ad..a25bffdc 100644 --- a/tests/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py +++ b/tests/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py @@ -1,34 +1,61 @@ from unittest import mock -from boto3 import client, resource +from boto3 import client, resource, session from moto import mock_ec2 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + AWS_REGION = "us-east-1" -ACCOUNT_ID = "123456789012" +AWS_ACCOUNT_NUMBER = "123456789012" class Test_vpc_peering_routing_tables_with_least_privilege: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + @mock_ec2 def test_vpc_no_peering_connections(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", - new=VPC(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( - vpc_peering_routing_tables_with_least_privilege, - ) + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, + ) - check = vpc_peering_routing_tables_with_least_privilege() - result = check.execute() + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_ec2 def test_vpc_comply_peering_connection_(self): @@ -58,40 +85,44 @@ class Test_vpc_peering_routing_tables_with_least_privilege: DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC, Route - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", - new=VPC(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( - vpc_peering_routing_tables_with_least_privilege, - ) - - service_client.vpc_peering_connections[0].route_tables = [ - Route( - id=main_route_table_id, - destination_cidrs=["10.12.23.44/32"], + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, ) - ] - check = vpc_peering_routing_tables_with_least_privilege() - result = check.execute() - assert len(result) == len( - ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"] - ) - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access." - ) - assert result[0].resource_id == vpc_pcx_id - assert result[0].region == AWS_REGION + service_client.vpc_peering_connections[0].route_tables = [ + Route( + id=main_route_table_id, + destination_cidrs=["10.12.23.44/32"], + ) + ] + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == len( + ec2_client.describe_vpc_peering_connections()[ + "VpcPeeringConnections" + ] + ) + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access." + ) + assert result[0].resource_id == vpc_pcx_id + assert result[0].region == AWS_REGION @mock_ec2 def test_vpc_not_comply_peering_connection_(self): @@ -121,37 +152,41 @@ class Test_vpc_peering_routing_tables_with_least_privilege: DestinationCidrBlock="10.0.0.0/16", VpcPeeringConnectionId=vpc_pcx_id ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.vpc.vpc_service import VPC, Route - current_audit_info.audited_partition = "aws" - current_audit_info.audited_regions = ["eu-west-1", "us-east-1"] + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", - new=VPC(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( - vpc_peering_routing_tables_with_least_privilege, - ) - - service_client.vpc_peering_connections[0].route_tables = [ - Route( - id=main_route_table_id, - destination_cidrs=["10.0.0.0/16"], + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, ) - ] - check = vpc_peering_routing_tables_with_least_privilege() - result = check.execute() - assert len(result) == len( - ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"] - ) - assert result[0].status == "FAIL" - assert ( - result[0].status_extended - == f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables." - ) - assert result[0].resource_id == vpc_pcx_id - assert result[0].region == AWS_REGION + service_client.vpc_peering_connections[0].route_tables = [ + Route( + id=main_route_table_id, + destination_cidrs=["10.0.0.0/16"], + ) + ] + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == len( + ec2_client.describe_vpc_peering_connections()[ + "VpcPeeringConnections" + ] + ) + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables." + ) + assert result[0].resource_id == vpc_pcx_id + assert result[0].region == AWS_REGION