From 3962c9d81610fa167596ac935098e9559078cc32 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Tue, 5 Dec 2023 15:09:14 +0100 Subject: [PATCH] test(audit_info): refactor acm, account and access analyzer (#3097) --- tests/providers/aws/audit_info_utils.py | 2 +- .../aws/lib/allowlist/allowlist_test.py | 42 ++-------- .../providers/aws/lib/service/service_test.py | 72 ++++++------------ .../accessanalyzer_service_test.py | 76 +++++++------------ .../services/account/account_service_test.py | 52 +++---------- .../aws/services/acm/acm_service_test.py | 71 +++++------------ 6 files changed, 83 insertions(+), 232 deletions(-) diff --git a/tests/providers/aws/audit_info_utils.py b/tests/providers/aws/audit_info_utils.py index d6a6e843..7a49d1da 100644 --- a/tests/providers/aws/audit_info_utils.py +++ b/tests/providers/aws/audit_info_utils.py @@ -46,7 +46,7 @@ def set_mocked_aws_audit_info( assumed_role_info=None, audited_regions=audited_regions, organizations_metadata=None, - audit_resources=None, + audit_resources=[], mfa_enabled=False, audit_metadata=Audit_Metadata( services_scanned=0, diff --git a/tests/providers/aws/lib/allowlist/allowlist_test.py b/tests/providers/aws/lib/allowlist/allowlist_test.py index d54c19c8..ff881fc0 100644 --- a/tests/providers/aws/lib/allowlist/allowlist_test.py +++ b/tests/providers/aws/lib/allowlist/allowlist_test.py @@ -1,5 +1,5 @@ import yaml -from boto3 import resource, session +from boto3 import resource from mock import MagicMock from moto import mock_dynamodb, mock_s3 @@ -13,51 +13,19 @@ from prowler.providers.aws.lib.allowlist.allowlist import ( is_excepted, parse_allowlist_file, ) -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info -from prowler.providers.common.models import Audit_Metadata from tests.providers.aws.audit_info_utils import ( AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1, + set_mocked_aws_audit_info, ) class Test_Allowlist: - # Mocked Audit Info - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - # Test S3 allowlist @mock_s3 def test_s3_allowlist(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() # Create bucket and upload allowlist yaml s3_resource = resource("s3", region_name=AWS_REGION_US_EAST_1) s3_resource.create_bucket(Bucket="test-allowlist") @@ -76,7 +44,7 @@ class Test_Allowlist: # Test DynamoDB allowlist @mock_dynamodb def test_dynamo_allowlist(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() # Create table and put item dynamodb_resource = resource("dynamodb", region_name=AWS_REGION_US_EAST_1) table_name = "test-allowlist" @@ -120,7 +88,7 @@ class Test_Allowlist: @mock_dynamodb def test_dynamo_allowlist_with_tags(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() # Create table and put item dynamodb_resource = resource("dynamodb", region_name=AWS_REGION_US_EAST_1) table_name = "test-allowlist" diff --git a/tests/providers/aws/lib/service/service_test.py b/tests/providers/aws/lib/service/service_test.py index f9f304ea..a3e5f99d 100644 --- a/tests/providers/aws/lib/service/service_test.py +++ b/tests/providers/aws/lib/service/service_test.py @@ -1,20 +1,21 @@ -from boto3 import session from mock import patch -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.lib.service.service import AWSService -from prowler.providers.common.models import Audit_Metadata - -AWS_ACCOUNT_NUMBER = "123456789012" -AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" -AWS_PARTITION = "aws" -AWS_REGION = "us-east-1" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_ARN, + AWS_ACCOUNT_NUMBER, + AWS_COMMERCIAL_PARTITION, + AWS_REGION_US_EAST_1, + set_mocked_aws_audit_info, +) def mock_generate_regional_clients(service, audit_info, _): - regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) - regional_client.region = AWS_REGION - return {AWS_REGION: regional_client} + regional_client = audit_info.audit_session.client( + service, region_name=AWS_REGION_US_EAST_1 + ) + regional_client.region = AWS_REGION_US_EAST_1 + return {AWS_REGION_US_EAST_1: regional_client} @patch( @@ -22,50 +23,23 @@ def mock_generate_regional_clients(service, audit_info, _): new=mock_generate_regional_clients, ) class Test_AWSService: - # Mocked Audit Info - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=AWS_ACCOUNT_ARN, - audited_user_id=None, - audited_partition=AWS_PARTITION, - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=[], - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - def test_AWSService_init(self): - audit_info = self.set_mocked_audit_info() - service = AWSService("s3", audit_info) + service_name = "s3" + audit_info = set_mocked_aws_audit_info() + service = AWSService(service_name, audit_info) assert service.audit_info == audit_info assert service.audited_account == AWS_ACCOUNT_NUMBER assert service.audited_account_arn == AWS_ACCOUNT_ARN - assert service.audited_partition == AWS_PARTITION + assert service.audited_partition == AWS_COMMERCIAL_PARTITION assert service.audit_resources == [] assert service.audited_checks == [] assert service.session == audit_info.audit_session - assert service.service == "s3" + assert service.service == service_name assert len(service.regional_clients) == 1 - assert service.regional_clients[AWS_REGION].__class__.__name__ == "S3" - assert service.region == AWS_REGION - assert service.client.__class__.__name__ == "S3" + assert ( + service.regional_clients[AWS_REGION_US_EAST_1].__class__.__name__ + == service_name.upper() + ) + assert service.region == AWS_REGION_US_EAST_1 + assert service.client.__class__.__name__ == service_name.upper() diff --git a/tests/providers/aws/services/accessanalyzer/accessanalyzer_service_test.py b/tests/providers/aws/services/accessanalyzer/accessanalyzer_service_test.py index fe5f5926..5ae746e6 100644 --- a/tests/providers/aws/services/accessanalyzer/accessanalyzer_service_test.py +++ b/tests/providers/aws/services/accessanalyzer/accessanalyzer_service_test.py @@ -1,19 +1,15 @@ from unittest.mock import patch import botocore -from boto3 import session -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import ( AccessAnalyzer, ) -from prowler.providers.common.models import Audit_Metadata - -# Mock Test Region -AWS_REGION = "eu-west-1" - -AWS_ACCOUNT_NUMBER = "123456789012" - +from tests.providers.aws.audit_info_utils import ( + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_audit_info, +) # Mocking Access Analyzer Calls make_api_call = botocore.client.BaseClient._make_api_call @@ -59,9 +55,11 @@ def mock_make_api_call(self, operation_name, kwarg): def mock_generate_regional_clients(service, audit_info, _): - regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) - regional_client.region = AWS_REGION - return {AWS_REGION: regional_client} + regional_client = audit_info.audit_session.client( + service, region_name=AWS_REGION_EU_WEST_1 + ) + regional_client.region = AWS_REGION_EU_WEST_1 + return {AWS_REGION_EU_WEST_1: regional_client} # Patch every AWS call using Boto3 and generate_regional_clients to have 1 client @@ -71,66 +69,46 @@ def mock_generate_regional_clients(service, audit_info, _): new=mock_generate_regional_clients, ) class Test_AccessAnalyzer_Service: - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=["us-east-1", "eu-west-1"], - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - # Test AccessAnalyzer Client def test__get_client__(self): - access_analyzer = AccessAnalyzer(self.set_mocked_audit_info()) + access_analyzer = AccessAnalyzer( + set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]) + ) assert ( - access_analyzer.regional_clients[AWS_REGION].__class__.__name__ + access_analyzer.regional_clients[AWS_REGION_EU_WEST_1].__class__.__name__ == "AccessAnalyzer" ) # Test AccessAnalyzer Session def test__get_session__(self): - access_analyzer = AccessAnalyzer(self.set_mocked_audit_info()) + access_analyzer = AccessAnalyzer( + set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]) + ) assert access_analyzer.session.__class__.__name__ == "Session" # Test AccessAnalyzer Service def test__get_service__(self): - access_analyzer = AccessAnalyzer(self.set_mocked_audit_info()) + access_analyzer = AccessAnalyzer( + set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]) + ) assert access_analyzer.service == "accessanalyzer" def test__list_analyzers__(self): - access_analyzer = AccessAnalyzer(self.set_mocked_audit_info()) + access_analyzer = AccessAnalyzer( + set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]) + ) assert len(access_analyzer.analyzers) == 1 assert access_analyzer.analyzers[0].arn == "ARN" assert access_analyzer.analyzers[0].name == "Test Analyzer" assert access_analyzer.analyzers[0].status == "ACTIVE" assert access_analyzer.analyzers[0].tags == [{"test": "test"}] assert access_analyzer.analyzers[0].type == "ACCOUNT" - assert access_analyzer.analyzers[0].region == AWS_REGION + assert access_analyzer.analyzers[0].region == AWS_REGION_EU_WEST_1 def test__list_findings__(self): - access_analyzer = AccessAnalyzer(self.set_mocked_audit_info()) + access_analyzer = AccessAnalyzer( + set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]) + ) assert len(access_analyzer.analyzers) == 1 assert len(access_analyzer.analyzers[0].findings) == 1 assert access_analyzer.analyzers[0].findings[0].status == "ARCHIVED" diff --git a/tests/providers/aws/services/account/account_service_test.py b/tests/providers/aws/services/account/account_service_test.py index 61226912..a7a7018c 100644 --- a/tests/providers/aws/services/account/account_service_test.py +++ b/tests/providers/aws/services/account/account_service_test.py @@ -1,14 +1,11 @@ import botocore -from boto3 import session from mock import patch -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.services.account.account_service import Account, Contact -from prowler.providers.common.models import Audit_Metadata - -AWS_ACCOUNT_NUMBER = "123456789012" -AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" -AWS_REGION = "us-east-1" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_NUMBER, + set_mocked_aws_audit_info, +) # Mocking Access Analyzer Calls make_api_call = botocore.client.BaseClient._make_api_call @@ -56,65 +53,34 @@ def mock_make_api_call(self, operation_name, kwargs): # Patch every AWS call using Boto3 @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) class Test_Account_Service: - # Mocked Audit Info - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=AWS_ACCOUNT_ARN, - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - # Test Account Service def test_service(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() account = Account(audit_info) assert account.service == "account" # Test Account Client def test_client(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() account = Account(audit_info) assert account.client.__class__.__name__ == "Account" # Test Account Session def test__get_session__(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() account = Account(audit_info) assert account.session.__class__.__name__ == "Session" # Test Account Session def test_audited_account(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() account = Account(audit_info) assert account.audited_account == AWS_ACCOUNT_NUMBER # Test Account Get Account Contacts def test_get_account_contacts(self): # Account client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() account = Account(audit_info) assert account.number_of_contacts == 4 assert account.contact_base == Contact( diff --git a/tests/providers/aws/services/acm/acm_service_test.py b/tests/providers/aws/services/acm/acm_service_test.py index d4119793..98aaec7d 100644 --- a/tests/providers/aws/services/acm/acm_service_test.py +++ b/tests/providers/aws/services/acm/acm_service_test.py @@ -2,26 +2,20 @@ import uuid from datetime import datetime import botocore -from boto3 import session from freezegun import freeze_time from mock import patch -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.services.acm.acm_service import ACM -from prowler.providers.common.models import Audit_Metadata - -# from moto import mock_acm - - -AWS_ACCOUNT_NUMBER = "123456789012" -AWS_REGION = "us-east-1" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_US_EAST_1, + set_mocked_aws_audit_info, +) # Mocking Access Analyzer Calls make_api_call = botocore.client.BaseClient._make_api_call -certificate_arn = ( - f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" -) +certificate_arn = f"arn:aws:acm:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" certificate_name = "test-certificate.com" certificate_type = "AMAZON_ISSUED" @@ -81,9 +75,11 @@ def mock_make_api_call(self, operation_name, kwargs): # Mock generate_regional_clients() def mock_generate_regional_clients(service, audit_info, _): - regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) - regional_client.region = AWS_REGION - return {AWS_REGION: regional_client} + regional_client = audit_info.audit_session.client( + service, region_name=AWS_REGION_US_EAST_1 + ) + regional_client.region = AWS_REGION_US_EAST_1 + return {AWS_REGION_US_EAST_1: regional_client} # Patch every AWS call using Boto3 and generate_regional_clients to have 1 client @@ -96,42 +92,11 @@ def mock_generate_regional_clients(service, audit_info, _): @freeze_time("2023-01-01") # FIXME: Pending Moto PR to update ACM responses class Test_ACM_Service: - # Mocked Audit Info - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - # Test ACM Service # @mock_acm def test_service(self): # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) assert acm.service == "acm" @@ -139,7 +104,7 @@ class Test_ACM_Service: # @mock_acm def test_client(self): # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) for regional_client in acm.regional_clients.values(): assert regional_client.__class__.__name__ == "ACM" @@ -148,7 +113,7 @@ class Test_ACM_Service: # @mock_acm def test__get_session__(self): # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) assert acm.session.__class__.__name__ == "Session" @@ -156,7 +121,7 @@ class Test_ACM_Service: # @mock_acm def test_audited_account(self): # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) assert acm.audited_account == AWS_ACCOUNT_NUMBER @@ -171,7 +136,7 @@ class Test_ACM_Service: # ) # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) assert len(acm.certificates) == 1 assert acm.certificates[0].arn == certificate_arn @@ -179,7 +144,7 @@ class Test_ACM_Service: assert acm.certificates[0].type == certificate_type assert acm.certificates[0].expiration_days == 365 assert acm.certificates[0].transparency_logging is False - assert acm.certificates[0].region == AWS_REGION + assert acm.certificates[0].region == AWS_REGION_US_EAST_1 # Test ACM List Tags # @mock_acm @@ -192,7 +157,7 @@ class Test_ACM_Service: # ) # ACM client for this test class - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info() acm = ACM(audit_info) assert len(acm.certificates) == 1 assert acm.certificates[0].tags == [