From 45faa2e9e875850cac2d012b1399f9fb993596c5 Mon Sep 17 00:00:00 2001 From: Nacho Rivera Date: Tue, 5 Dec 2023 11:05:05 +0100 Subject: [PATCH] test(audit_info): refactor sqs (#3130) --- ...sqs_queues_not_publicly_accessible_test.py | 33 +++++---- ...ues_server_side_encryption_enabled_test.py | 17 +++-- .../aws/services/sqs/sqs_service_test.py | 74 ++++++------------- 3 files changed, 49 insertions(+), 75 deletions(-) diff --git a/tests/providers/aws/services/sqs/sqs_queues_not_publicly_accessible/sqs_queues_not_publicly_accessible_test.py b/tests/providers/aws/services/sqs/sqs_queues_not_publicly_accessible/sqs_queues_not_publicly_accessible_test.py index cb679d26..35eb5614 100644 --- a/tests/providers/aws/services/sqs/sqs_queues_not_publicly_accessible/sqs_queues_not_publicly_accessible_test.py +++ b/tests/providers/aws/services/sqs/sqs_queues_not_publicly_accessible/sqs_queues_not_publicly_accessible_test.py @@ -3,15 +3,16 @@ from unittest import mock from uuid import uuid4 from prowler.providers.aws.services.sqs.sqs_service import Queue - -AWS_REGION = "eu-west-1" -AWS_ACCOUNT_NUMBER = "123456789012" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, +) test_queue_name = str(uuid4()) -test_queue_url = ( - f"https://sqs.{AWS_REGION}.amazonaws.com/{AWS_ACCOUNT_NUMBER}/{test_queue_name}" +test_queue_url = f"https://sqs.{AWS_REGION_EU_WEST_1}.amazonaws.com/{AWS_ACCOUNT_NUMBER}/{test_queue_name}" +test_queue_arn = ( + f"arn:aws:sqs:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{test_queue_name}" ) -test_queue_arn = f"arn:aws:sqs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{test_queue_name}" test_restricted_policy = { "Version": "2012-10-17", @@ -115,7 +116,7 @@ class Test_sqs_queues_not_publicly_accessible: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, policy=test_restricted_policy, arn=test_queue_arn, ) @@ -136,7 +137,7 @@ class Test_sqs_queues_not_publicly_accessible: assert result[0].resource_id == test_queue_url assert result[0].resource_arn == test_queue_arn assert result[0].resource_tags == [] - assert result[0].region == AWS_REGION + assert result[0].region == AWS_REGION_EU_WEST_1 def test_queues_public(self): sqs_client = mock.MagicMock @@ -145,7 +146,7 @@ class Test_sqs_queues_not_publicly_accessible: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, policy=test_public_policy, arn=test_queue_arn, ) @@ -169,7 +170,7 @@ class Test_sqs_queues_not_publicly_accessible: assert result[0].resource_id == test_queue_url assert result[0].resource_arn == test_queue_arn assert result[0].resource_tags == [] - assert result[0].region == AWS_REGION + assert result[0].region == AWS_REGION_EU_WEST_1 def test_queues_public_with_condition_not_valid(self): sqs_client = mock.MagicMock @@ -179,7 +180,7 @@ class Test_sqs_queues_not_publicly_accessible: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, policy=test_public_policy_with_condition_same_account_not_valid, arn=test_queue_arn, ) @@ -203,7 +204,7 @@ class Test_sqs_queues_not_publicly_accessible: assert result[0].resource_id == test_queue_url assert result[0].resource_arn == test_queue_arn assert result[0].resource_tags == [] - assert result[0].region == AWS_REGION + assert result[0].region == AWS_REGION_EU_WEST_1 def test_queues_public_with_condition_valid(self): sqs_client = mock.MagicMock @@ -213,7 +214,7 @@ class Test_sqs_queues_not_publicly_accessible: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, policy=test_public_policy_with_condition_same_account, arn=test_queue_arn, ) @@ -237,7 +238,7 @@ class Test_sqs_queues_not_publicly_accessible: assert result[0].resource_id == test_queue_url assert result[0].resource_arn == test_queue_arn assert result[0].resource_tags == [] - assert result[0].region == AWS_REGION + assert result[0].region == AWS_REGION_EU_WEST_1 def test_queues_public_with_condition_invalid_other_account(self): sqs_client = mock.MagicMock @@ -247,7 +248,7 @@ class Test_sqs_queues_not_publicly_accessible: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, policy=test_public_policy_with_condition_diff_account, arn=test_queue_arn, ) @@ -271,4 +272,4 @@ class Test_sqs_queues_not_publicly_accessible: assert result[0].resource_id == test_queue_url assert result[0].resource_arn == test_queue_arn assert result[0].resource_tags == [] - assert result[0].region == AWS_REGION + assert result[0].region == AWS_REGION_EU_WEST_1 diff --git a/tests/providers/aws/services/sqs/sqs_queues_server_side_encryption_enabled/sqs_queues_server_side_encryption_enabled_test.py b/tests/providers/aws/services/sqs/sqs_queues_server_side_encryption_enabled/sqs_queues_server_side_encryption_enabled_test.py index 7afbb48a..e5f005df 100644 --- a/tests/providers/aws/services/sqs/sqs_queues_server_side_encryption_enabled/sqs_queues_server_side_encryption_enabled_test.py +++ b/tests/providers/aws/services/sqs/sqs_queues_server_side_encryption_enabled/sqs_queues_server_side_encryption_enabled_test.py @@ -3,16 +3,17 @@ from unittest import mock from uuid import uuid4 from prowler.providers.aws.services.sqs.sqs_service import Queue - -AWS_REGION = "eu-west-1" -AWS_ACCOUNT_NUMBER = "123456789012" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, +) test_kms_key_id = str(uuid4()) test_queue_name = str(uuid4()) -test_queue_url = ( - f"https://sqs.{AWS_REGION}.amazonaws.com/{AWS_ACCOUNT_NUMBER}/{test_queue_name}" +test_queue_url = f"https://sqs.{AWS_REGION_EU_WEST_1}.amazonaws.com/{AWS_ACCOUNT_NUMBER}/{test_queue_name}" +test_queue_arn = ( + f"arn:aws:sqs:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{test_queue_name}" ) -test_queue_arn = f"arn:aws:sqs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{test_queue_name}" class Test_sqs_queues_server_side_encryption_enabled: @@ -38,7 +39,7 @@ class Test_sqs_queues_server_side_encryption_enabled: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, kms_key_id=test_kms_key_id, arn=test_queue_arn, ) @@ -66,7 +67,7 @@ class Test_sqs_queues_server_side_encryption_enabled: Queue( id=test_queue_url, name=test_queue_name, - region=AWS_REGION, + region=AWS_REGION_EU_WEST_1, arn=test_queue_arn, ) ) diff --git a/tests/providers/aws/services/sqs/sqs_service_test.py b/tests/providers/aws/services/sqs/sqs_service_test.py index 72779a56..8062ae30 100644 --- a/tests/providers/aws/services/sqs/sqs_service_test.py +++ b/tests/providers/aws/services/sqs/sqs_service_test.py @@ -3,19 +3,19 @@ from unittest.mock import patch from uuid import uuid4 import botocore -from boto3 import client, session +from boto3 import client from moto import mock_sqs -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.services.sqs.sqs_service import SQS -from prowler.providers.common.models import Audit_Metadata - -AWS_ACCOUNT_NUMBER = "123456789012" -AWS_REGION = "eu-west-1" +from tests.providers.aws.audit_info_utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + set_mocked_aws_audit_info, +) test_queue = "test-queue" test_key = str(uuid4()) -test_queue_arn = f"arn:aws:sqs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{test_queue}" +test_queue_arn = f"arn:aws:sqs:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{test_queue}" test_policy = { "Version": "2012-10-17", "Statement": [ @@ -40,9 +40,11 @@ def mock_make_api_call(self, operation_name, kwarg): def mock_generate_regional_clients(service, audit_info, _): - regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) - regional_client.region = AWS_REGION - return {AWS_REGION: regional_client} + regional_client = audit_info.audit_session.client( + service, region_name=AWS_REGION_EU_WEST_1 + ) + regional_client.region = AWS_REGION_EU_WEST_1 + return {AWS_REGION_EU_WEST_1: regional_client} @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) @@ -51,62 +53,32 @@ def mock_generate_regional_clients(service, audit_info, _): new=mock_generate_regional_clients, ) class Test_SQS_Service: - # Mocked Audit Info - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info # Test SQS Service def test_service(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) sqs = SQS(audit_info) assert sqs.service == "sqs" # Test SQS client def test_client(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) sqs = SQS(audit_info) for reg_client in sqs.regional_clients.values(): assert reg_client.__class__.__name__ == "SQS" # Test SQS session def test__get_session__(self): - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) sqs = SQS(audit_info) assert sqs.session.__class__.__name__ == "Session" @mock_sqs # Test SQS list queues def test__list_queues__(self): - sqs_client = client("sqs", region_name=AWS_REGION) + sqs_client = client("sqs", region_name=AWS_REGION_EU_WEST_1) queue = sqs_client.create_queue(QueueName=test_queue, tags={"test": "test"}) - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) sqs = SQS(audit_info) assert len(sqs.queues) == 1 assert sqs.queues[0].id == queue["QueueUrl"] @@ -114,7 +86,7 @@ class Test_SQS_Service: assert sqs.queues[0].name == sqs.queues[0].arn.split(":")[-1] assert sqs.queues[0].name == sqs.queues[0].id.split("/")[-1] assert sqs.queues[0].arn == test_queue_arn - assert sqs.queues[0].region == AWS_REGION + assert sqs.queues[0].region == AWS_REGION_EU_WEST_1 assert sqs.queues[0].tags == [{"test": "test"}] # moto does not properly mock this and is hardcoded to return 1000 queues @@ -122,24 +94,24 @@ class Test_SQS_Service: # @mock_sqs # # Test SQS list queues for over 1000 queues # def test__list_queues__pagination_over_a_thousand(self): - # sqs_client = client("sqs", region_name=AWS_REGION) + # sqs_client = client("sqs", region_name=AWS_REGION_EU_WEST_1) # for i in range(0,1050): # sqs_client.create_queue(QueueName=f"{test_queue}-{i}", tags={"test": "test"}) - # audit_info = self.set_mocked_audit_info() + # audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) # sqs = SQS(audit_info) # assert len(sqs.queues) > 1000 @mock_sqs # Test SQS list queues def test__get_queue_attributes__(self): - sqs_client = client("sqs", region_name=AWS_REGION) + sqs_client = client("sqs", region_name=AWS_REGION_EU_WEST_1) queue = sqs_client.create_queue( QueueName=test_queue, ) - audit_info = self.set_mocked_audit_info() + audit_info = set_mocked_aws_audit_info([AWS_REGION_EU_WEST_1]) sqs = SQS(audit_info) assert len(sqs.queues) == 1 assert sqs.queues[0].id == queue["QueueUrl"] - assert sqs.queues[0].region == AWS_REGION + assert sqs.queues[0].region == AWS_REGION_EU_WEST_1 assert sqs.queues[0].policy assert sqs.queues[0].kms_key_id == test_key