From fc53b28997074db26bb8e04e566651599f8f4d28 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Mon, 14 Aug 2023 21:48:05 +0200 Subject: [PATCH] test(s3): Mock S3Control when used (#2722) --- .../s3_account_level_public_access_blocks.py | 9 +- ...account_level_public_access_blocks_test.py | 80 ++++----- ...3_bucket_level_public_access_block_test.py | 22 +-- ..._bucket_policy_public_write_access_test.py | 156 ++++++++++++++++-- 4 files changed, 198 insertions(+), 69 deletions(-) diff --git a/prowler/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks.py b/prowler/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks.py index 73bda630..3bdabb55 100644 --- a/prowler/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks.py +++ b/prowler/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks.py @@ -1,5 +1,4 @@ from prowler.lib.check.models import Check, Check_Report_AWS -from prowler.providers.aws.services.s3.s3_client import s3_client from prowler.providers.aws.services.s3.s3control_client import s3control_client @@ -8,17 +7,17 @@ class s3_account_level_public_access_blocks(Check): findings = [] report = Check_Report_AWS(self.metadata()) report.status = "FAIL" - report.status_extended = f"Block Public Access is not configured for the account {s3_client.audited_account}." + report.status_extended = f"Block Public Access is not configured for the account {s3control_client.audited_account}." report.region = s3control_client.region - report.resource_id = s3_client.audited_account - report.resource_arn = s3_client.audited_account_arn + report.resource_id = s3control_client.audited_account + report.resource_arn = s3control_client.audited_account_arn if ( s3control_client.account_public_access_block and s3control_client.account_public_access_block.ignore_public_acls and s3control_client.account_public_access_block.restrict_public_buckets ): report.status = "PASS" - report.status_extended = f"Block Public Access is configured for the account {s3_client.audited_account}." + report.status_extended = f"Block Public Access is configured for the account {s3control_client.audited_account}." findings.append(report) diff --git a/tests/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks_test.py b/tests/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks_test.py index a4e28fe8..d7c5aecb 100644 --- a/tests/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks_test.py +++ b/tests/providers/aws/services/s3/s3_account_level_public_access_blocks/s3_account_level_public_access_blocks_test.py @@ -58,7 +58,7 @@ class Test_s3_account_level_public_access_blocks: "RestrictPublicBuckets": True, }, ) - from prowler.providers.aws.services.s3.s3_service import S3, S3Control + from prowler.providers.aws.services.s3.s3_service import S3Control audit_info = self.set_mocked_audit_info() @@ -67,30 +67,26 @@ class Test_s3_account_level_public_access_blocks: new=audit_info, ): with mock.patch( - "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3_client", - new=S3(audit_info), + "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3control_client", + new=S3Control(audit_info), ): - with mock.patch( - "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3control_client", - new=S3Control(audit_info), - ): - # Test Check - from prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks import ( - s3_account_level_public_access_blocks, - ) + # Test Check + from prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks import ( + s3_account_level_public_access_blocks, + ) - check = s3_account_level_public_access_blocks() - result = check.execute() + check = s3_account_level_public_access_blocks() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == f"Block Public Access is configured for the account {AWS_ACCOUNT_NUMBER}." - ) - assert result[0].resource_id == AWS_ACCOUNT_NUMBER - assert result[0].resource_arn == AWS_ACCOUNT_ARN - assert result[0].region == AWS_REGION + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Block Public Access is configured for the account {AWS_ACCOUNT_NUMBER}." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION @mock_s3 @mock_s3control @@ -106,7 +102,7 @@ class Test_s3_account_level_public_access_blocks: "RestrictPublicBuckets": False, }, ) - from prowler.providers.aws.services.s3.s3_service import S3, S3Control + from prowler.providers.aws.services.s3.s3_service import S3Control audit_info = self.set_mocked_audit_info() @@ -115,27 +111,23 @@ class Test_s3_account_level_public_access_blocks: new=audit_info, ): with mock.patch( - "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3_client", - new=S3(audit_info), + "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3control_client", + new=S3Control(audit_info), ): - with mock.patch( - "prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3control_client", - new=S3Control(audit_info), - ): - # Test Check - from prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks import ( - s3_account_level_public_access_blocks, - ) + # Test Check + from prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks import ( + s3_account_level_public_access_blocks, + ) - check = s3_account_level_public_access_blocks() - result = check.execute() + check = s3_account_level_public_access_blocks() + result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert ( - result[0].status_extended - == f"Block Public Access is not configured for the account {AWS_ACCOUNT_NUMBER}." - ) - assert result[0].resource_id == AWS_ACCOUNT_NUMBER - assert result[0].resource_arn == AWS_ACCOUNT_ARN - assert result[0].region == AWS_REGION + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Block Public Access is not configured for the account {AWS_ACCOUNT_NUMBER}." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert result[0].resource_arn == AWS_ACCOUNT_ARN + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/s3/s3_bucket_level_public_access_block/s3_bucket_level_public_access_block_test.py b/tests/providers/aws/services/s3/s3_bucket_level_public_access_block/s3_bucket_level_public_access_block_test.py index f657b28c..fb976993 100644 --- a/tests/providers/aws/services/s3/s3_bucket_level_public_access_block/s3_bucket_level_public_access_block_test.py +++ b/tests/providers/aws/services/s3/s3_bucket_level_public_access_block/s3_bucket_level_public_access_block_test.py @@ -1,4 +1,3 @@ -from re import search from unittest import mock from boto3 import client, session @@ -46,6 +45,7 @@ class Test_s3_bucket_level_public_access_block: return audit_info @mock_s3 + @mock_s3control def test_no_buckets(self): from prowler.providers.aws.services.s3.s3_service import S3, S3Control @@ -124,9 +124,9 @@ class Test_s3_bucket_level_public_access_block: assert len(result) == 1 assert result[0].status == "FAIL" - assert search( - "Block Public Access is not configured", - result[0].status_extended, + assert ( + result[0].status_extended + == f"Block Public Access is not configured for the S3 Bucket {bucket_name_us}." ) assert result[0].resource_id == bucket_name_us assert ( @@ -186,10 +186,11 @@ class Test_s3_bucket_level_public_access_block: assert len(result) == 1 assert result[0].status == "PASS" - assert search( - "Block Public Access is configured", - result[0].status_extended, + assert ( + result[0].status_extended + == f"Block Public Access is configured for the S3 Bucket {bucket_name_us}." ) + assert result[0].resource_id == bucket_name_us assert ( result[0].resource_arn @@ -248,10 +249,11 @@ class Test_s3_bucket_level_public_access_block: assert len(result) == 1 assert result[0].status == "PASS" - assert search( - f"Block Public Access is configured for the S3 Bucket {bucket_name_us} at account {AWS_ACCOUNT_NUMBER} level.", - result[0].status_extended, + assert ( + result[0].status_extended + == f"Block Public Access is configured for the S3 Bucket {bucket_name_us} at account {AWS_ACCOUNT_NUMBER} level." ) + assert result[0].resource_id == bucket_name_us assert ( result[0].resource_arn diff --git a/tests/providers/aws/services/s3/s3_bucket_policy_public_write_access/s3_bucket_policy_public_write_access_test.py b/tests/providers/aws/services/s3/s3_bucket_policy_public_write_access/s3_bucket_policy_public_write_access_test.py index 6f8ffc41..bb517994 100644 --- a/tests/providers/aws/services/s3/s3_bucket_policy_public_write_access/s3_bucket_policy_public_write_access_test.py +++ b/tests/providers/aws/services/s3/s3_bucket_policy_public_write_access/s3_bucket_policy_public_write_access_test.py @@ -1,4 +1,3 @@ -from re import search from unittest import mock from boto3 import client, session @@ -45,6 +44,7 @@ class Test_s3_bucket_policy_public_write_access: ) return audit_info + @mock_s3control @mock_s3 def test_bucket_no_policy(self): s3_client_us_east_1 = client("s3", region_name="us-east-1") @@ -73,9 +73,9 @@ class Test_s3_bucket_policy_public_write_access: assert len(result) == 1 assert result[0].status == "PASS" - assert search( - "does not have a bucket policy", - result[0].status_extended, + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} does not have a bucket policy." ) assert result[0].resource_id == bucket_name_us assert ( @@ -84,6 +84,141 @@ class Test_s3_bucket_policy_public_write_access: ) assert result[0].region == "us-east-1" + @mock_s3control + @mock_s3 + def test_bucket_policy_but_account_RestrictPublicBuckets(self): + s3_client_us_east_1 = client("s3", region_name="us-east-1") + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + + encryption_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "DenyIncorrectEncryptionHeader","Effect": "Deny","Principal": "*","Action": "s3:PutObject","Resource": "arn:aws:s3:::bucket_test_us/*","Condition": {"StringNotEquals": {"s3:x-amz-server-side-encryption": "aws:kms"}}}]}' + s3_client_us_east_1.put_bucket_policy( + Bucket=bucket_name_us, + Policy=encryption_policy, + ) + + s3control_client = client("s3control", region_name=AWS_REGION) + s3control_client.put_public_access_block( + AccountId=AWS_ACCOUNT_NUMBER, + PublicAccessBlockConfiguration={ + "BlockPublicAcls": False, + "IgnorePublicAcls": False, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": True, + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3, S3Control + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3_client", + new=S3(audit_info), + ), mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3control_client", + new=S3Control(audit_info), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access import ( + s3_bucket_policy_public_write_access, + ) + + check = s3_bucket_policy_public_write_access() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "All S3 public access blocked at account level." + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{audit_info.audited_partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == "us-east-1" + + @mock_s3control + @mock_s3 + def test_bucket_policy_but_bucket_RestrictPublicBuckets(self): + s3_client_us_east_1 = client("s3", region_name="us-east-1") + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + + encryption_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "DenyIncorrectEncryptionHeader","Effect": "Deny","Principal": "*","Action": "s3:PutObject","Resource": "arn:aws:s3:::bucket_test_us/*","Condition": {"StringNotEquals": {"s3:x-amz-server-side-encryption": "aws:kms"}}}]}' + s3_client_us_east_1.put_bucket_policy( + Bucket=bucket_name_us, + Policy=encryption_policy, + ) + + s3_client_us_east_1.put_public_access_block( + Bucket=bucket_name_us, + PublicAccessBlockConfiguration={ + "BlockPublicAcls": False, + "IgnorePublicAcls": False, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": True, + }, + ) + + s3control_client = client("s3control", region_name=AWS_REGION) + s3control_client.put_public_access_block( + AccountId=AWS_ACCOUNT_NUMBER, + PublicAccessBlockConfiguration={ + "BlockPublicAcls": False, + "IgnorePublicAcls": False, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": False, + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3, S3Control + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3_client", + new=S3(audit_info), + ), mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3control_client", + new=S3Control(audit_info), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access import ( + s3_bucket_policy_public_write_access, + ) + + check = s3_bucket_policy_public_write_access() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"S3 public access blocked at bucket level for {bucket_name_us}." + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{audit_info.audited_partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == "us-east-1" + + @mock_s3control @mock_s3 @mock_s3control def test_bucket_comply_policy(self): @@ -136,9 +271,9 @@ class Test_s3_bucket_policy_public_write_access: assert len(result) == 1 assert result[0].status == "PASS" - assert search( - "does not allow public write access in the bucket policy", - result[0].status_extended, + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} does not allow public write access in the bucket policy." ) assert result[0].resource_id == bucket_name_us assert ( @@ -147,6 +282,7 @@ class Test_s3_bucket_policy_public_write_access: ) assert result[0].region == "us-east-1" + @mock_s3control @mock_s3 @mock_s3control def test_bucket_public_write_policy(self): @@ -198,9 +334,9 @@ class Test_s3_bucket_policy_public_write_access: assert len(result) == 1 assert result[0].status == "FAIL" - assert search( - "allows public write access in the bucket policy", - result[0].status_extended, + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} allows public write access in the bucket policy." ) assert result[0].resource_id == bucket_name_us assert (