diff --git a/prowler/providers/aws/services/awslambda/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.py b/prowler/providers/aws/services/awslambda/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.py index 436b72b7..0c46d18d 100644 --- a/prowler/providers/aws/services/awslambda/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.py +++ b/prowler/providers/aws/services/awslambda/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled/awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.py @@ -21,7 +21,7 @@ class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check) lambda_recorded_cloudtrail = False for trail in cloudtrail_client.trails: for data_event in trail.data_events: - for resource in data_event["DataResources"]: + for resource in data_event.event_selector["DataResources"]: if ( resource["Type"] == "AWS::Lambda::Function" and function.arn in resource["Values"] diff --git a/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled.py b/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled.py index fe34579a..025d0fb0 100644 --- a/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled.py +++ b/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled.py @@ -15,23 +15,37 @@ class cloudtrail_s3_dataevents_read_enabled(Check): report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations." for trail in cloudtrail_client.trails: for data_event in trail.data_events: - # Check if trail has a data event for all S3 Buckets for read - if ( - data_event["ReadWriteType"] == "ReadOnly" - or data_event["ReadWriteType"] == "All" - ): - for resource in data_event["DataResources"]: - if "AWS::S3::Object" == resource["Type"] and ( - f"arn:{cloudtrail_client.audited_partition}:s3" - in resource["Values"] - or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*" - in resource["Values"] + # classic event selectors + if not data_event.is_advanced: + # Check if trail has a data event for all S3 Buckets for read + if ( + data_event.event_selector["ReadWriteType"] == "ReadOnly" + or data_event.event_selector["ReadWriteType"] == "All" + ): + for resource in data_event.event_selector["DataResources"]: + if "AWS::S3::Object" == resource["Type"] and ( + f"arn:{cloudtrail_client.audited_partition}:s3" + in resource["Values"] + or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*" + in resource["Values"] + ): + report.region = trail.region + report.resource_id = trail.name + report.resource_arn = trail.arn + report.status = "PASS" + report.status_extended = f"Trail {trail.name} has a classic data event selector to record all S3 object-level API operations." + # advanced event selectors + elif data_event.is_advanced: + for field_selector in data_event.event_selector["FieldSelectors"]: + if ( + field_selector["Field"] == "resources.type" + and field_selector["Equals"][0] == "AWS::S3::Object" ): report.region = trail.region report.resource_id = trail.name report.resource_arn = trail.arn report.status = "PASS" - report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations." + report.status_extended = f"Trail {trail.name} has an advanced data event selector to record all S3 object-level API operations." findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled.py b/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled.py index 5809a3fe..dbf714bf 100644 --- a/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled.py +++ b/prowler/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled.py @@ -15,23 +15,36 @@ class cloudtrail_s3_dataevents_write_enabled(Check): report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations." for trail in cloudtrail_client.trails: for data_event in trail.data_events: - # Check if trail has a data event for all S3 Buckets for write - if ( - data_event["ReadWriteType"] == "All" - or data_event["ReadWriteType"] == "WriteOnly" - ): - for resource in data_event["DataResources"]: - if "AWS::S3::Object" == resource["Type"] and ( - f"arn:{cloudtrail_client.audited_partition}:s3" - in resource["Values"] - or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*" - in resource["Values"] + # classic event selectors + if not data_event.is_advanced: + # Check if trail has a data event for all S3 Buckets for write + if ( + data_event.event_selector["ReadWriteType"] == "All" + or data_event.event_selector["ReadWriteType"] == "WriteOnly" + ): + for resource in data_event.event_selector["DataResources"]: + if "AWS::S3::Object" == resource["Type"] and ( + f"arn:{cloudtrail_client.audited_partition}:s3" + in resource["Values"] + or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*" + in resource["Values"] + ): + report.region = trail.region + report.resource_id = trail.name + report.resource_arn = trail.arn + report.status = "PASS" + report.status_extended = f"Trail {trail.name} has a classic data event selector to record all S3 object-level API operations." + # advanced event selectors + elif data_event.is_advanced: + for field_selector in data_event.event_selector["FieldSelectors"]: + if ( + field_selector["Field"] == "resources.type" + and field_selector["Equals"][0] == "AWS::S3::Object" ): report.region = trail.region report.resource_id = trail.name report.resource_arn = trail.arn report.status = "PASS" - report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations." - + report.status_extended = f"Trail {trail.name} has an advanced data event selector to record all S3 object-level API operations." findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudtrail/cloudtrail_service.py b/prowler/providers/aws/services/cloudtrail/cloudtrail_service.py index f58c7032..3139916d 100644 --- a/prowler/providers/aws/services/cloudtrail/cloudtrail_service.py +++ b/prowler/providers/aws/services/cloudtrail/cloudtrail_service.py @@ -1,6 +1,7 @@ -import datetime import threading -from dataclasses import dataclass +from datetime import datetime + +from pydantic import BaseModel from prowler.lib.logger import logger from prowler.lib.scan_filters.scan_filters import is_resource_filtered @@ -71,18 +72,7 @@ class Cloudtrail: if trails_count == 0: self.trails.append( Trail( - name=None, - is_multiregion=None, - home_region=None, - arn=None, region=regional_client.region, - is_logging=None, - log_file_validation_enabled=None, - latest_cloudwatch_delivery_time=None, - s3_bucket=None, - kms_key=None, - log_group_arn=None, - data_events=[], ) ) @@ -116,54 +106,48 @@ class Cloudtrail: for region, client in self.regional_clients.items(): if trail.region == region and trail.name: data_events = client.get_event_selectors(TrailName=trail.arn) - if "EventSelectors" in data_events: + # check if key exists and array associated to that key is not empty + if ( + "EventSelectors" in data_events + and data_events["EventSelectors"] + ): for event in data_events["EventSelectors"]: - trail.data_events.append(event) + event_selector = Event_Selector( + is_advanced=False, event_selector=event + ) + trail.data_events.append(event_selector) + # check if key exists and array associated to that key is not empty + elif ( + "AdvancedEventSelectors" in data_events + and data_events["AdvancedEventSelectors"] + ): + for event in data_events["AdvancedEventSelectors"]: + event_selector = Event_Selector( + is_advanced=True, event_selector=event + ) + trail.data_events.append(event_selector) + except Exception as error: logger.error( f"{client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) -@dataclass -class Trail: - name: str - is_multiregion: bool - home_region: str - arn: str - region: str - is_logging: bool - log_file_validation_enabled: bool - latest_cloudwatch_delivery_time: datetime - s3_bucket: str - kms_key: str - log_group_arn: str - data_events: list +class Event_Selector(BaseModel): + is_advanced: bool + event_selector: dict - def __init__( - self, - name, - is_multiregion, - home_region, - arn, - region, - is_logging, - log_file_validation_enabled, - latest_cloudwatch_delivery_time, - s3_bucket, - kms_key, - log_group_arn, - data_events, - ): - self.name = name - self.is_multiregion = is_multiregion - self.home_region = home_region - self.arn = arn - self.region = region - self.is_logging = is_logging - self.log_file_validation_enabled = log_file_validation_enabled - self.latest_cloudwatch_delivery_time = latest_cloudwatch_delivery_time - self.s3_bucket = s3_bucket - self.kms_key = kms_key - self.log_group_arn = log_group_arn - self.data_events = data_events + +class Trail(BaseModel): + name: str = None + is_multiregion: bool = None + home_region: str = None + arn: str = None + region: str + is_logging: bool = None + log_file_validation_enabled: bool = None + latest_cloudwatch_delivery_time: datetime = None + s3_bucket: str = None + kms_key: str = None + log_group_arn: str = None + data_events: list[Event_Selector] = [] diff --git a/tests/providers/aws/services/cloudtrail/cloudtrail_multi_region_enabled/cloudtrail_multi_region_enabled_test.py b/tests/providers/aws/services/cloudtrail/cloudtrail_multi_region_enabled/cloudtrail_multi_region_enabled_test.py index 8f7a40ca..ca6dfe27 100644 --- a/tests/providers/aws/services/cloudtrail/cloudtrail_multi_region_enabled/cloudtrail_multi_region_enabled_test.py +++ b/tests/providers/aws/services/cloudtrail/cloudtrail_multi_region_enabled/cloudtrail_multi_region_enabled_test.py @@ -1,44 +1,69 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_cloudtrail, mock_s3 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = 123456789012 + class Test_cloudtrail_multi_region_enabled: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_cloudtrail def test_no_trails(self): - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( - cloudtrail_multi_region_enabled, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): - regions = [] - for region in service_client.regional_clients.keys(): - regions.append(region) - - check = cloudtrail_multi_region_enabled() - result = check.execute() - assert len(result) == len(regions) - for report in result: - assert report.status == "FAIL" - assert search( - "No CloudTrail trails enabled and logging were found", - report.status_extended, + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( + cloudtrail_multi_region_enabled, ) - assert report.resource_id == "No trails" - assert report.resource_arn == "No trails" + + check = cloudtrail_multi_region_enabled() + result = check.execute() + assert len(result) == len(current_audit_info.audited_regions) + for report in result: + assert report.status == "FAIL" + assert search( + "No CloudTrail trails enabled and logging were found", + report.status_extended, + ) + assert report.resource_id == "No trails" + assert report.resource_arn == "No trails" @mock_cloudtrail @mock_s3 @@ -63,37 +88,37 @@ class Test_cloudtrail_multi_region_enabled: Name=trail_name_eu, S3BucketName=bucket_name_eu, IsMultiRegionTrail=False ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( - cloudtrail_multi_region_enabled, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): - regions = [] - for region in service_client.regional_clients.keys(): - regions.append(region) - - check = cloudtrail_multi_region_enabled() - result = check.execute() - assert len(result) == len(regions) - for report in result: - assert report.status == "FAIL" - assert search( - "No CloudTrail trails enabled and logging were found", - report.status_extended, + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( + cloudtrail_multi_region_enabled, ) - assert report.resource_id == "No trails" - assert report.resource_arn == "No trails" + + check = cloudtrail_multi_region_enabled() + result = check.execute() + assert len(result) == len(current_audit_info.audited_regions) + for report in result: + assert report.status == "FAIL" + assert search( + "No CloudTrail trails enabled and logging were found", + report.status_extended, + ) + assert report.resource_id == "No trails" + assert report.resource_arn == "No trails" @mock_cloudtrail @mock_s3 @@ -120,42 +145,42 @@ class Test_cloudtrail_multi_region_enabled: _ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us) _ = cloudtrail_client_us_east_1.get_trail_status(Name=trail_name_us) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), - ) as service_client: - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( - cloudtrail_multi_region_enabled, - ) + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import ( + cloudtrail_multi_region_enabled, + ) - regions = [] - for region in service_client.regional_clients.keys(): - regions.append(region) - - check = cloudtrail_multi_region_enabled() - result = check.execute() - assert len(result) == len(regions) - for report in result: - if report.resource_id == trail_name_us: - assert report.status == "PASS" - assert search( - "is not multiregion and it is logging", report.status_extended - ) - assert report.resource_id == trail_name_us - assert report.resource_arn == trail_us["TrailARN"] - else: - assert report.status == "FAIL" - assert search( - "No CloudTrail trails enabled and logging were found", - report.status_extended, - ) - assert report.resource_id == "No trails" - assert report.resource_arn == "No trails" + check = cloudtrail_multi_region_enabled() + result = check.execute() + assert len(result) == len(current_audit_info.audited_regions) + for report in result: + if report.resource_id == trail_name_us: + assert report.status == "PASS" + assert search( + "is not multiregion and it is logging", + report.status_extended, + ) + assert report.resource_id == trail_name_us + assert report.resource_arn == trail_us["TrailARN"] + else: + assert report.status == "FAIL" + assert search( + "No CloudTrail trails enabled and logging were found", + report.status_extended, + ) + assert report.resource_id == "No trails" + assert report.resource_arn == "No trails" diff --git a/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled_test.py b/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled_test.py index 0c465e71..fd382901 100644 --- a/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled_test.py +++ b/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_read_enabled/cloudtrail_s3_dataevents_read_enabled_test.py @@ -1,11 +1,36 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_cloudtrail, mock_s3 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = 123456789012 + class Test_cloudtrail_s3_dataevents_read_enabled: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1"], + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_cloudtrail @mock_s3 def test_trail_without_data_events(self): @@ -18,33 +43,37 @@ class Test_cloudtrail_s3_dataevents_read_enabled: Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( - cloudtrail_s3_dataevents_read_enabled, - ) - check = cloudtrail_s3_dataevents_read_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( + cloudtrail_s3_dataevents_read_enabled, + ) - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "No CloudTrail trails have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == "No trails" - assert result[0].resource_arn == "No trails" + check = cloudtrail_s3_dataevents_read_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "No CloudTrail trails have a data event to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == "No trails" + assert result[0].resource_arn == "No trails" @mock_cloudtrail @mock_s3 @@ -69,37 +98,42 @@ class Test_cloudtrail_s3_dataevents_read_enabled: } ], )["EventSelectors"] - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( - cloudtrail_s3_dataevents_read_enabled, - ) - check = cloudtrail_s3_dataevents_read_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( + cloudtrail_s3_dataevents_read_enabled, + ) - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "No CloudTrail trails have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == "No trails" - assert result[0].resource_arn == "No trails" + check = cloudtrail_s3_dataevents_read_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "No CloudTrail trails have a data event to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == "No trails" + assert result[0].resource_arn == "No trails" @mock_cloudtrail @mock_s3 - def test_trail_with_s3_data_events(self): + def test_trail_with_s3_classic_data_events(self): cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1") s3_client_us_east_1 = client("s3", region_name="us-east-1") trail_name_us = "trail_test_us" @@ -120,30 +154,91 @@ class Test_cloudtrail_s3_dataevents_read_enabled: } ], )["EventSelectors"] - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( - cloudtrail_s3_dataevents_read_enabled, - ) - check = cloudtrail_s3_dataevents_read_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( + cloudtrail_s3_dataevents_read_enabled, + ) - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == trail_name_us - assert result[0].resource_arn == trail_us["TrailARN"] + check = cloudtrail_s3_dataevents_read_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has a classic data event selector to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == trail_name_us + assert result[0].resource_arn == trail_us["TrailARN"] + + @mock_cloudtrail + @mock_s3 + def test_trail_with_s3_advanced_data_events(self): + cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1") + s3_client_us_east_1 = client("s3", region_name="us-east-1") + trail_name_us = "trail_test_us" + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket(Bucket=bucket_name_us) + trail_us = cloudtrail_client_us_east_1.create_trail( + Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False + ) + _ = cloudtrail_client_us_east_1.put_event_selectors( + TrailName=trail_name_us, + AdvancedEventSelectors=[ + { + "Name": "test", + "FieldSelectors": [ + {"Field": "eventCategory", "Equals": ["Data"]}, + {"Field": "resources.type", "Equals": ["AWS::S3::Object"]}, + ], + }, + ], + )["AdvancedEventSelectors"] + + from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( + Cloudtrail, + ) + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( + cloudtrail_s3_dataevents_read_enabled, + ) + + check = cloudtrail_s3_dataevents_read_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has an advanced data event selector to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == trail_name_us + assert result[0].resource_arn == trail_us["TrailARN"] diff --git a/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled_test.py b/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled_test.py index 0d43a76b..b54dcb01 100644 --- a/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled_test.py +++ b/tests/providers/aws/services/cloudtrail/cloudtrail_s3_dataevents_write_enabled/cloudtrail_s3_dataevents_write_enabled_test.py @@ -1,11 +1,36 @@ from re import search from unittest import mock -from boto3 import client +from boto3 import client, session from moto import mock_cloudtrail, mock_s3 +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_ACCOUNT_NUMBER = 123456789012 + class Test_cloudtrail_s3_dataevents_write_enabled: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1"], + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + @mock_cloudtrail @mock_s3 def test_trail_without_data_events(self): @@ -18,33 +43,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled: Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( - cloudtrail_s3_dataevents_write_enabled, - ) - check = cloudtrail_s3_dataevents_write_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( + cloudtrail_s3_dataevents_write_enabled, + ) - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "No CloudTrail trails have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == "No trails" - assert result[0].resource_arn == "No trails" + check = cloudtrail_s3_dataevents_write_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "No CloudTrail trails have a data event to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == "No trails" + assert result[0].resource_arn == "No trails" @mock_cloudtrail @mock_s3 @@ -69,33 +98,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled: } ], )["EventSelectors"] - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( - cloudtrail_s3_dataevents_write_enabled, - ) - check = cloudtrail_s3_dataevents_write_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( + cloudtrail_s3_dataevents_write_enabled, + ) - assert len(result) == 1 - assert result[0].status == "FAIL" - assert search( - "No CloudTrail trails have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == "No trails" - assert result[0].resource_arn == "No trails" + check = cloudtrail_s3_dataevents_write_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "No CloudTrail trails have a data event to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == "No trails" + assert result[0].resource_arn == "No trails" @mock_cloudtrail @mock_s3 @@ -120,30 +153,90 @@ class Test_cloudtrail_s3_dataevents_write_enabled: } ], )["EventSelectors"] - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info + from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( Cloudtrail, ) - current_audit_info.audited_partition = "aws" + current_audit_info = self.set_mocked_audit_info() with mock.patch( - "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", - new=Cloudtrail(current_audit_info), + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, ): - # Test Check - from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( - cloudtrail_s3_dataevents_write_enabled, - ) - check = cloudtrail_s3_dataevents_write_enabled() - result = check.execute() + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import ( + cloudtrail_s3_dataevents_write_enabled, + ) - assert len(result) == 1 - assert result[0].status == "PASS" - assert search( - "have a data event to record all S3 object-level API operations.", - result[0].status_extended, - ) - assert result[0].resource_id == trail_name_us - assert result[0].resource_arn == trail_us["TrailARN"] + check = cloudtrail_s3_dataevents_write_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has a classic data event selector to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == trail_name_us + assert result[0].resource_arn == trail_us["TrailARN"] + + @mock_cloudtrail + @mock_s3 + def test_trail_with_s3_advanced_data_events(self): + cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1") + s3_client_us_east_1 = client("s3", region_name="us-east-1") + trail_name_us = "trail_test_us" + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket(Bucket=bucket_name_us) + trail_us = cloudtrail_client_us_east_1.create_trail( + Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False + ) + _ = cloudtrail_client_us_east_1.put_event_selectors( + TrailName=trail_name_us, + AdvancedEventSelectors=[ + { + "Name": "test", + "FieldSelectors": [ + {"Field": "eventCategory", "Equals": ["Data"]}, + {"Field": "resources.type", "Equals": ["AWS::S3::Object"]}, + ], + }, + ], + )["AdvancedEventSelectors"] + from prowler.providers.aws.services.cloudtrail.cloudtrail_service import ( + Cloudtrail, + ) + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + + with mock.patch( + "prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client", + new=Cloudtrail(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import ( + cloudtrail_s3_dataevents_read_enabled, + ) + + check = cloudtrail_s3_dataevents_read_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "has an advanced data event selector to record all S3 object-level API operations.", + result[0].status_extended, + ) + assert result[0].resource_id == trail_name_us + assert result[0].resource_arn == trail_us["TrailARN"] diff --git a/tests/providers/aws/services/cloudtrail/cloudtrail_service_test.py b/tests/providers/aws/services/cloudtrail/cloudtrail_service_test.py index bc1461fd..2cee4286 100644 --- a/tests/providers/aws/services/cloudtrail/cloudtrail_service_test.py +++ b/tests/providers/aws/services/cloudtrail/cloudtrail_service_test.py @@ -129,7 +129,7 @@ class Test_Cloudtrail_Service: ) audit_info = self.set_mocked_audit_info() cloudtrail = Cloudtrail(audit_info) - assert len(cloudtrail.trails) == 2 + assert len(cloudtrail.trails) == len(audit_info.audited_regions) for trail in cloudtrail.trails: if trail.name: if trail.name == trail_name_us: @@ -143,7 +143,7 @@ class Test_Cloudtrail_Service: @mock_cloudtrail @mock_s3 - def test_get_event_selectors(self): + def test_get_classic_event_selectors(self): cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1") s3_client_us_east_1 = client("s3", region_name="us-east-1") trail_name_us = "trail_test_us" @@ -170,7 +170,7 @@ class Test_Cloudtrail_Service: )["EventSelectors"] audit_info = self.set_mocked_audit_info() cloudtrail = Cloudtrail(audit_info) - assert len(cloudtrail.trails) == 2 + assert len(cloudtrail.trails) == len(audit_info.audited_regions) for trail in cloudtrail.trails: if trail.name: if trail.name == trail_name_us: @@ -181,4 +181,52 @@ class Test_Cloudtrail_Service: assert trail.log_file_validation_enabled assert not trail.latest_cloudwatch_delivery_time assert trail.s3_bucket == bucket_name_us - assert trail.data_events == data_events_response + assert ( + trail.data_events[0].event_selector == data_events_response[0] + ) + assert not trail.data_events[0].is_advanced + + @mock_cloudtrail + @mock_s3 + def test_get_advanced_event_selectors(self): + cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1") + s3_client_us_east_1 = client("s3", region_name="us-east-1") + trail_name_us = "trail_test_us" + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket(Bucket=bucket_name_us) + cloudtrail_client_us_east_1.create_trail( + Name=trail_name_us, + S3BucketName=bucket_name_us, + IsMultiRegionTrail=False, + EnableLogFileValidation=True, + ) + cloudtrail_client_us_east_1.start_logging(Name=trail_name_us) + data_events_response = cloudtrail_client_us_east_1.put_event_selectors( + TrailName=trail_name_us, + AdvancedEventSelectors=[ + { + "Name": "test", + "FieldSelectors": [ + {"Field": "eventCategory", "Equals": ["Data"]}, + {"Field": "resources.type", "Equals": ["AWS::S3::Object"]}, + ], + }, + ], + )["AdvancedEventSelectors"] + audit_info = self.set_mocked_audit_info() + cloudtrail = Cloudtrail(audit_info) + assert len(cloudtrail.trails) == len(audit_info.audited_regions) + for trail in cloudtrail.trails: + if trail.name: + if trail.name == trail_name_us: + assert not trail.is_multiregion + assert trail.home_region == "us-east-1" + assert trail.region == "us-east-1" + assert trail.is_logging + assert trail.log_file_validation_enabled + assert not trail.latest_cloudwatch_delivery_time + assert trail.s3_bucket == bucket_name_us + assert ( + trail.data_events[0].event_selector == data_events_response[0] + ) + assert trail.data_events[0].is_advanced