diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_lock/__init__.py b/prowler/providers/aws/services/s3/s3_bucket_object_lock/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.metadata.json b/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.metadata.json new file mode 100644 index 00000000..ec917a24 --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "s3_bucket_object_lock", + "CheckTitle": "Check if S3 buckets have object lock enabled", + "CheckType": [ + "Data Protection" + ], + "ServiceName": "s3", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:s3:::bucket-name", + "Severity": "low", + "ResourceType": "AwsS3Bucket", + "Description": "Check if S3 buckets have object lock enabled", + "Risk": "Store objects using a write-once-read-many (WORM) model to help you prevent objects from being deleted or overwritten for a fixed amount of time or indefinitely. That helps to prevent ransomware attacks.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "https://docs.bridgecrew.io/docs/ensure-that-s3-bucket-has-lock-configuration-enabled-by-default#cli-command", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/S3/object-lock.html", + "Terraform": "https://docs.bridgecrew.io/docs/ensure-that-s3-bucket-has-lock-configuration-enabled-by-default#terraform" + }, + "Recommendation": { + "Text": "Ensure that your Amazon S3 buckets have Object Lock feature enabled in order to prevent the objects they store from being deleted.", + "Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock-overview.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.py b/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.py new file mode 100644 index 00000000..2bfde6a8 --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock.py @@ -0,0 +1,26 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.s3.s3_client import s3_client + + +class s3_bucket_object_lock(Check): + def execute(self): + findings = [] + for bucket in s3_client.buckets: + report = Check_Report_AWS(self.metadata()) + report.region = bucket.region + report.resource_id = bucket.name + report.resource_arn = bucket.arn + report.resource_tags = bucket.tags + if bucket.object_lock: + report.status = "PASS" + report.status_extended = ( + f"S3 Bucket {bucket.name} has Object Lock enabled." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"S3 Bucket {bucket.name} has Object Lock disabled." + ) + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/s3/s3_service.py b/prowler/providers/aws/services/s3/s3_service.py index a0b678b6..d69d9b28 100644 --- a/prowler/providers/aws/services/s3/s3_service.py +++ b/prowler/providers/aws/services/s3/s3_service.py @@ -28,6 +28,7 @@ class S3: self.__threading_call__(self.__get_public_access_block__) self.__threading_call__(self.__get_bucket_encryption__) self.__threading_call__(self.__get_bucket_ownership_controls__) + self.__threading_call__(self.__get_object_lock_configuration__) self.__threading_call__(self.__get_bucket_tagging__) def __get_session__(self): @@ -254,6 +255,25 @@ class S3: f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def __get_object_lock_configuration__(self, bucket): + logger.info("S3 - Get buckets ownership controls...") + try: + regional_client = self.regional_clients[bucket.region] + regional_client.get_object_lock_configuration(Bucket=bucket.name) + bucket.object_lock = True + except Exception as error: + if "ObjectLockConfigurationNotFoundError" in str(error): + bucket.object_lock = False + else: + if regional_client: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + def __get_bucket_tagging__(self, bucket): logger.info("S3 - Get buckets logging...") try: @@ -349,5 +369,6 @@ class Bucket(BaseModel): region: str logging_target_bucket: Optional[str] ownership: Optional[str] + object_lock: bool = False mfa_delete: bool = False tags: Optional[list] = [] diff --git a/tests/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock_test.py b/tests/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock_test.py new file mode 100644 index 00000000..23529fc5 --- /dev/null +++ b/tests/providers/aws/services/s3/s3_bucket_object_lock/s3_bucket_object_lock_test.py @@ -0,0 +1,144 @@ +from re import search +from unittest import mock + +from boto3 import client, session +from moto import mock_s3 + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_REGION = "us-east-1" + + +class Test_s3_bucket_object_lock: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + region_name=AWS_REGION, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=AWS_REGION, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + + @mock_s3 + def test_no_buckets(self): + from prowler.providers.aws.services.s3.s3_service import S3 + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock.s3_client", + new=S3(audit_info), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock import ( + s3_bucket_object_lock, + ) + + check = s3_bucket_object_lock() + result = check.execute() + + assert len(result) == 0 + + @mock_s3 + def test_bucket_no_object_lock(self): + s3_client_us_east_1 = client("s3", region_name="us-east-1") + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket(Bucket=bucket_name_us) + + from prowler.providers.aws.services.s3.s3_service import S3 + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock.s3_client", + new=S3(audit_info), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock import ( + s3_bucket_object_lock, + ) + + check = s3_bucket_object_lock() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "Object Lock disabled", + result[0].status_extended, + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{audit_info.audited_partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == "us-east-1" + assert result[0].resource_tags == [] + + @mock_s3 + def test_bucket_object_lock_enabled(self): + s3_client_us_east_1 = client("s3", region_name="us-east-1") + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, + ObjectOwnership="BucketOwnerEnforced", + ObjectLockEnabledForBucket=True, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock.s3_client", + new=S3(audit_info), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_object_lock.s3_bucket_object_lock import ( + s3_bucket_object_lock, + ) + + check = s3_bucket_object_lock() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "Object Lock enabled", + result[0].status_extended, + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{audit_info.audited_partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == "us-east-1" + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/s3/s3_service_test.py b/tests/providers/aws/services/s3/s3_service_test.py index 9a5de43e..199dd115 100644 --- a/tests/providers/aws/services/s3/s3_service_test.py +++ b/tests/providers/aws/services/s3/s3_service_test.py @@ -86,6 +86,7 @@ class Test_S3_Service: s3.buckets[0].arn == f"arn:{audit_info.audited_partition}:s3:::{bucket_name}" ) + assert not s3.buckets[0].object_lock # Test S3 Get Bucket Versioning @mock_s3 @@ -379,3 +380,27 @@ class Test_S3_Service: assert s3control.account_public_access_block.ignore_public_acls assert s3control.account_public_access_block.block_public_policy assert s3control.account_public_access_block.restrict_public_buckets + + # Test S3 Get Bucket Object Lock + @mock_s3 + def test__get_object_lock_configuration__(self): + # Generate S3 Client + s3_client = client("s3") + # Create S3 Bucket + bucket_name = "test-bucket" + s3_client.create_bucket( + Bucket=bucket_name, + ObjectOwnership="BucketOwnerEnforced", + ObjectLockEnabledForBucket=True, + ) + + # S3 client for this test class + audit_info = self.set_mocked_audit_info() + s3 = S3(audit_info) + assert len(s3.buckets) == 1 + assert s3.buckets[0].name == bucket_name + assert ( + s3.buckets[0].arn + == f"arn:{audit_info.audited_partition}:s3:::{bucket_name}" + ) + assert s3.buckets[0].object_lock