From 93a8f6e7590f3b00011c5577919c283247e02cab Mon Sep 17 00:00:00 2001 From: Nacho Rivera Date: Mon, 17 Apr 2023 10:06:25 +0200 Subject: [PATCH] fix(rds tests): mocked audit_info object (#2222) --- .../rds_instance_backup_enabled_test.py | 133 +++++++---- .../rds_instance_deletion_protection_test.py | 131 +++++++---- ...stance_enhanced_monitoring_enabled_test.py | 134 +++++++---- ...stance_integration_cloudwatch_logs_test.py | 130 +++++++---- ...ance_minor_version_upgrade_enabled_test.py | 130 +++++++---- .../rds_instance_multi_az_test.py | 129 +++++++---- .../rds_instance_no_public_access_test.py | 130 +++++++---- .../rds_instance_storage_encrypted_test.py | 129 +++++++---- .../rds_instance_transport_encrypted_test.py | 130 +++++++---- .../rds_snapshots_public_access_test.py | 212 +++++++++++------- 10 files changed, 885 insertions(+), 503 deletions(-) diff --git a/tests/providers/aws/services/rds/rds_instance_backup_enabled/rds_instance_backup_enabled_test.py b/tests/providers/aws/services/rds/rds_instance_backup_enabled/rds_instance_backup_enabled_test.py index 9cfd1670..7e44ad20 100644 --- a/tests/providers/aws/services/rds/rds_instance_backup_enabled/rds_instance_backup_enabled_test.py +++ b/tests/providers/aws/services/rds/rds_instance_backup_enabled/rds_instance_backup_enabled_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_backup_enabled: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( - rds_instance_backup_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( + rds_instance_backup_enabled, + ) - check = rds_instance_backup_enabled() - result = check.execute() + check = rds_instance_backup_enabled() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_backup(self): @@ -39,32 +69,36 @@ class Test_rds_instance_backup_enabled: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", - new=RDS(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( - rds_instance_backup_enabled, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", + new=RDS(audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( + rds_instance_backup_enabled, + ) - service_client.db_instances[0].backup_retention_period = 0 + service_client.db_instances[0].backup_retention_period = 0 - check = rds_instance_backup_enabled() - result = check.execute() + check = rds_instance_backup_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "has not backup enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "has not backup enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_backup(self): @@ -77,27 +111,30 @@ class Test_rds_instance_backup_enabled: DBInstanceClass="db.m1.small", BackupRetentionPeriod=10, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( - rds_instance_backup_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_backup_enabled.rds_instance_backup_enabled import ( + rds_instance_backup_enabled, + ) - check = rds_instance_backup_enabled() - result = check.execute() + check = rds_instance_backup_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "has backup enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has backup enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_deletion_protection/rds_instance_deletion_protection_test.py b/tests/providers/aws/services/rds/rds_instance_deletion_protection/rds_instance_deletion_protection_test.py index b6361a52..f6d69d22 100644 --- a/tests/providers/aws/services/rds/rds_instance_deletion_protection/rds_instance_deletion_protection_test.py +++ b/tests/providers/aws/services/rds/rds_instance_deletion_protection/rds_instance_deletion_protection_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_deletion_protection: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( - rds_instance_deletion_protection, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( + rds_instance_deletion_protection, + ) - check = rds_instance_deletion_protection() - result = check.execute() + check = rds_instance_deletion_protection() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_deletion_protection(self): @@ -39,30 +69,33 @@ class Test_rds_instance_deletion_protection: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" - + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( - rds_instance_deletion_protection, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( + rds_instance_deletion_protection, + ) - check = rds_instance_deletion_protection() - result = check.execute() + check = rds_instance_deletion_protection() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "deletion protection is not enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "deletion protection is not enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_encryption(self): @@ -75,27 +108,31 @@ class Test_rds_instance_deletion_protection: DBInstanceClass="db.m1.small", DeletionProtection=True, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( - rds_instance_deletion_protection, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import ( + rds_instance_deletion_protection, + ) - check = rds_instance_deletion_protection() - result = check.execute() + check = rds_instance_deletion_protection() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "deletion protection is enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "deletion protection is enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_enhanced_monitoring_enabled/rds_instance_enhanced_monitoring_enabled_test.py b/tests/providers/aws/services/rds/rds_instance_enhanced_monitoring_enabled/rds_instance_enhanced_monitoring_enabled_test.py index ce693b03..17e561d2 100644 --- a/tests/providers/aws/services/rds/rds_instance_enhanced_monitoring_enabled/rds_instance_enhanced_monitoring_enabled_test.py +++ b/tests/providers/aws/services/rds/rds_instance_enhanced_monitoring_enabled/rds_instance_enhanced_monitoring_enabled_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_enhanced_monitoring_enabled: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( - rds_instance_enhanced_monitoring_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( + rds_instance_enhanced_monitoring_enabled, + ) - check = rds_instance_enhanced_monitoring_enabled() - result = check.execute() + check = rds_instance_enhanced_monitoring_enabled() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_monitoring(self): @@ -39,30 +69,34 @@ class Test_rds_instance_enhanced_monitoring_enabled: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( - rds_instance_enhanced_monitoring_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( + rds_instance_enhanced_monitoring_enabled, + ) - check = rds_instance_enhanced_monitoring_enabled() - result = check.execute() + check = rds_instance_enhanced_monitoring_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "does not have enhanced monitoring enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "does not have enhanced monitoring enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_monitoring(self): @@ -74,28 +108,32 @@ class Test_rds_instance_enhanced_monitoring_enabled: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", - new=RDS(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( - rds_instance_enhanced_monitoring_enabled, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled.rds_client", + new=RDS(audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.rds.rds_instance_enhanced_monitoring_enabled.rds_instance_enhanced_monitoring_enabled import ( + rds_instance_enhanced_monitoring_enabled, + ) - service_client.db_instances[0].enhanced_monitoring_arn = "log-stream" - check = rds_instance_enhanced_monitoring_enabled() - result = check.execute() + service_client.db_instances[0].enhanced_monitoring_arn = "log-stream" + check = rds_instance_enhanced_monitoring_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "has enhanced monitoring enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has enhanced monitoring enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_integration_cloudwatch_logs/rds_instance_integration_cloudwatch_logs_test.py b/tests/providers/aws/services/rds/rds_instance_integration_cloudwatch_logs/rds_instance_integration_cloudwatch_logs_test.py index 7818689a..3077996c 100644 --- a/tests/providers/aws/services/rds/rds_instance_integration_cloudwatch_logs/rds_instance_integration_cloudwatch_logs_test.py +++ b/tests/providers/aws/services/rds/rds_instance_integration_cloudwatch_logs/rds_instance_integration_cloudwatch_logs_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_integration_cloudwatch_logs: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( - rds_instance_integration_cloudwatch_logs, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( + rds_instance_integration_cloudwatch_logs, + ) - check = rds_instance_integration_cloudwatch_logs() - result = check.execute() + check = rds_instance_integration_cloudwatch_logs() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_logs(self): @@ -39,30 +69,34 @@ class Test_rds_instance_integration_cloudwatch_logs: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( - rds_instance_integration_cloudwatch_logs, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( + rds_instance_integration_cloudwatch_logs, + ) - check = rds_instance_integration_cloudwatch_logs() - result = check.execute() + check = rds_instance_integration_cloudwatch_logs() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "does not have CloudWatch Logs enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "does not have CloudWatch Logs enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_logs(self): @@ -75,27 +109,31 @@ class Test_rds_instance_integration_cloudwatch_logs: DBInstanceClass="db.m1.small", EnableCloudwatchLogsExports=["audit", "error"], ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( - rds_instance_integration_cloudwatch_logs, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_integration_cloudwatch_logs.rds_instance_integration_cloudwatch_logs import ( + rds_instance_integration_cloudwatch_logs, + ) - check = rds_instance_integration_cloudwatch_logs() - result = check.execute() + check = rds_instance_integration_cloudwatch_logs() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "is shipping audit error to CloudWatch Logs", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is shipping audit error to CloudWatch Logs", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_minor_version_upgrade_enabled/rds_instance_minor_version_upgrade_enabled_test.py b/tests/providers/aws/services/rds/rds_instance_minor_version_upgrade_enabled/rds_instance_minor_version_upgrade_enabled_test.py index 5f9d0f59..2154702e 100644 --- a/tests/providers/aws/services/rds/rds_instance_minor_version_upgrade_enabled/rds_instance_minor_version_upgrade_enabled_test.py +++ b/tests/providers/aws/services/rds/rds_instance_minor_version_upgrade_enabled/rds_instance_minor_version_upgrade_enabled_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_minor_version_upgrade_enabled: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( - rds_instance_minor_version_upgrade_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( + rds_instance_minor_version_upgrade_enabled, + ) - check = rds_instance_minor_version_upgrade_enabled() - result = check.execute() + check = rds_instance_minor_version_upgrade_enabled() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_auto_upgrade(self): @@ -39,30 +69,34 @@ class Test_rds_instance_minor_version_upgrade_enabled: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( - rds_instance_minor_version_upgrade_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( + rds_instance_minor_version_upgrade_enabled, + ) - check = rds_instance_minor_version_upgrade_enabled() - result = check.execute() + check = rds_instance_minor_version_upgrade_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "does not have minor version upgrade enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "does not have minor version upgrade enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_auto_upgrade(self): @@ -75,27 +109,31 @@ class Test_rds_instance_minor_version_upgrade_enabled: DBInstanceClass="db.m1.small", AutoMinorVersionUpgrade=True, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( - rds_instance_minor_version_upgrade_enabled, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_minor_version_upgrade_enabled.rds_instance_minor_version_upgrade_enabled import ( + rds_instance_minor_version_upgrade_enabled, + ) - check = rds_instance_minor_version_upgrade_enabled() - result = check.execute() + check = rds_instance_minor_version_upgrade_enabled() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "has minor version upgrade enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has minor version upgrade enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_multi_az/rds_instance_multi_az_test.py b/tests/providers/aws/services/rds/rds_instance_multi_az/rds_instance_multi_az_test.py index ee53e8ae..475d3d0e 100644 --- a/tests/providers/aws/services/rds/rds_instance_multi_az/rds_instance_multi_az_test.py +++ b/tests/providers/aws/services/rds/rds_instance_multi_az/rds_instance_multi_az_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_multi_az: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( - rds_instance_multi_az, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( + rds_instance_multi_az, + ) - check = rds_instance_multi_az() - result = check.execute() + check = rds_instance_multi_az() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_multi_az(self): @@ -39,30 +69,33 @@ class Test_rds_instance_multi_az: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( - rds_instance_multi_az, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( + rds_instance_multi_az, + ) - check = rds_instance_multi_az() - result = check.execute() + check = rds_instance_multi_az() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "does not have multi-AZ enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "does not have multi-AZ enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_multi_az(self): @@ -75,27 +108,31 @@ class Test_rds_instance_multi_az: DBInstanceClass="db.m1.small", MultiAZ=True, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( - rds_instance_multi_az, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import ( + rds_instance_multi_az, + ) - check = rds_instance_multi_az() - result = check.execute() + check = rds_instance_multi_az() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "has multi-AZ enabled", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has multi-AZ enabled", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py b/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py index bfd857eb..5a0f87b9 100644 --- a/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py +++ b/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_no_public_access: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( - rds_instance_no_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( + rds_instance_no_public_access, + ) - check = rds_instance_no_public_access() - result = check.execute() + check = rds_instance_no_public_access() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_private(self): @@ -39,30 +69,34 @@ class Test_rds_instance_no_public_access: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( - rds_instance_no_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( + rds_instance_no_public_access, + ) - check = rds_instance_no_public_access() - result = check.execute() + check = rds_instance_no_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "is not Publicly Accessible", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is not Publicly Accessible", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_public(self): @@ -75,27 +109,31 @@ class Test_rds_instance_no_public_access: DBInstanceClass="db.m1.small", PubliclyAccessible=True, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( - rds_instance_no_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( + rds_instance_no_public_access, + ) - check = rds_instance_no_public_access() - result = check.execute() + check = rds_instance_no_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "is set as Publicly Accessible", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "is set as Publicly Accessible", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_storage_encrypted/rds_instance_storage_encrypted_test.py b/tests/providers/aws/services/rds/rds_instance_storage_encrypted/rds_instance_storage_encrypted_test.py index ecac6dcd..ad58f67a 100644 --- a/tests/providers/aws/services/rds/rds_instance_storage_encrypted/rds_instance_storage_encrypted_test.py +++ b/tests/providers/aws/services/rds/rds_instance_storage_encrypted/rds_instance_storage_encrypted_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_storage_encrypted: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( - rds_instance_storage_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( + rds_instance_storage_encrypted, + ) - check = rds_instance_storage_encrypted() - result = check.execute() + check = rds_instance_storage_encrypted() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_encryption(self): @@ -39,30 +69,33 @@ class Test_rds_instance_storage_encrypted: DBName="staging-postgres", DBInstanceClass="db.m1.small", ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( - rds_instance_storage_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( + rds_instance_storage_encrypted, + ) - check = rds_instance_storage_encrypted() - result = check.execute() + check = rds_instance_storage_encrypted() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "is not encrypted", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "is not encrypted", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_encryption(self): @@ -75,27 +108,31 @@ class Test_rds_instance_storage_encrypted: DBInstanceClass="db.m1.small", StorageEncrypted=True, ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( - rds_instance_storage_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_storage_encrypted.rds_instance_storage_encrypted import ( + rds_instance_storage_encrypted, + ) - check = rds_instance_storage_encrypted() - result = check.execute() + check = rds_instance_storage_encrypted() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "is encrypted", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is encrypted", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_instance_transport_encrypted/rds_instance_transport_encrypted_test.py b/tests/providers/aws/services/rds/rds_instance_transport_encrypted/rds_instance_transport_encrypted_test.py index 6f29193a..b8d6e5cf 100644 --- a/tests/providers/aws/services/rds/rds_instance_transport_encrypted/rds_instance_transport_encrypted_test.py +++ b/tests/providers/aws/services/rds/rds_instance_transport_encrypted/rds_instance_transport_encrypted_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_instance_transport_encrypted: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_instances(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( - rds_instance_transport_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( + rds_instance_transport_encrypted, + ) - check = rds_instance_transport_encrypted() - result = check.execute() + check = rds_instance_transport_encrypted() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_instance_no_ssl(self): @@ -56,30 +86,34 @@ class Test_rds_instance_transport_encrypted: }, ], ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( - rds_instance_transport_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( + rds_instance_transport_encrypted, + ) - check = rds_instance_transport_encrypted() - result = check.execute() + check = rds_instance_transport_encrypted() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "connections are not encrypted", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "connections are not encrypted", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" @mock_rds def test_rds_instance_with_ssl(self): @@ -108,27 +142,31 @@ class Test_rds_instance_transport_encrypted: }, ], ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( - rds_instance_transport_encrypted, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import ( + rds_instance_transport_encrypted, + ) - check = rds_instance_transport_encrypted() - result = check.execute() + check = rds_instance_transport_encrypted() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "connections use SSL encryption", - result[0].status_extended, - ) - assert result[0].resource_id == "db-master-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "connections use SSL encryption", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" diff --git a/tests/providers/aws/services/rds/rds_snapshots_public_access/rds_snapshots_public_access_test.py b/tests/providers/aws/services/rds/rds_snapshots_public_access/rds_snapshots_public_access_test.py index 0cc4deef..bf5f5627 100644 --- a/tests/providers/aws/services/rds/rds_snapshots_public_access/rds_snapshots_public_access_test.py +++ b/tests/providers/aws/services/rds/rds_snapshots_public_access/rds_snapshots_public_access_test.py @@ -1,33 +1,63 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_rds +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" class Test_rds_snapshots_public_access: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_rds def test_rds_no_snapshots(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( - rds_snapshots_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( + rds_snapshots_public_access, + ) - check = rds_snapshots_public_access() - result = check.execute() + check = rds_snapshots_public_access() + result = check.execute() - assert len(result) == 0 + assert len(result) == 0 @mock_rds def test_rds_private_snapshot(self): @@ -43,30 +73,34 @@ class Test_rds_snapshots_public_access: conn.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( - rds_snapshots_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( + rds_snapshots_public_access, + ) - check = rds_snapshots_public_access() - result = check.execute() + check = rds_snapshots_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "is not shared", - result[0].status_extended, - ) - assert result[0].resource_id == "snapshot-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is not shared", + result[0].status_extended, + ) + assert result[0].resource_id == "snapshot-1" @mock_rds def test_rds_public_snapshot(self): @@ -82,31 +116,35 @@ class Test_rds_snapshots_public_access: conn.create_db_snapshot( DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1" ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", - new=RDS(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( - rds_snapshots_public_access, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", + new=RDS(audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( + rds_snapshots_public_access, + ) - service_client.db_snapshots[0].public = True - check = rds_snapshots_public_access() - result = check.execute() + service_client.db_snapshots[0].public = True + check = rds_snapshots_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "is public", - result[0].status_extended, - ) - assert result[0].resource_id == "snapshot-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "is public", + result[0].status_extended, + ) + assert result[0].resource_id == "snapshot-1" @mock_rds def test_rds_cluster_private_snapshot(self): @@ -123,30 +161,33 @@ class Test_rds_snapshots_public_access: conn.create_db_cluster_snapshot( DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", - new=RDS(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, ): - # Test Check - from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( - rds_snapshots_public_access, - ) + with mock.patch( + "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( + rds_snapshots_public_access, + ) - check = rds_snapshots_public_access() - result = check.execute() + check = rds_snapshots_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "is not shared", - result[0].status_extended, - ) - assert result[0].resource_id == "snapshot-1" + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is not shared", + result[0].status_extended, + ) + assert result[0].resource_id == "snapshot-1" @mock_rds def test_rds_cluster_public_snapshot(self): @@ -163,28 +204,31 @@ class Test_rds_snapshots_public_access: conn.create_db_cluster_snapshot( DBClusterIdentifier="db-primary-1", DBClusterSnapshotIdentifier="snapshot-1" ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.rds.rds_service import RDS - current_audit_info.audited_partition = "aws" + audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", - new=RDS(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( - rds_snapshots_public_access, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access.rds_client", + new=RDS(audit_info), + ) as service_client: + # Test Check + from prowler.providers.aws.services.rds.rds_snapshots_public_access.rds_snapshots_public_access import ( + rds_snapshots_public_access, + ) - service_client.db_cluster_snapshots[0].public = True - check = rds_snapshots_public_access() - result = check.execute() + service_client.db_cluster_snapshots[0].public = True + check = rds_snapshots_public_access() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "is public", - result[0].status_extended, - ) - assert result[0].resource_id == "snapshot-1" + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "is public", + result[0].status_extended, + ) + assert result[0].resource_id == "snapshot-1"