mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
fix(cloudtrail): included advanced data events selectors (#1814)
This commit is contained in:
@@ -1,44 +1,69 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from moto import mock_cloudtrail, mock_s3
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_ACCOUNT_NUMBER = 123456789012
|
||||
|
||||
|
||||
class Test_cloudtrail_multi_region_enabled:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
@mock_cloudtrail
|
||||
def test_no_trails(self):
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
|
||||
regions = []
|
||||
for region in service_client.regional_clients.keys():
|
||||
regions.append(region)
|
||||
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(regions)
|
||||
for report in result:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(current_audit_info.audited_regions)
|
||||
for report in result:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
@@ -63,37 +88,37 @@ class Test_cloudtrail_multi_region_enabled:
|
||||
Name=trail_name_eu, S3BucketName=bucket_name_eu, IsMultiRegionTrail=False
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
|
||||
regions = []
|
||||
for region in service_client.regional_clients.keys():
|
||||
regions.append(region)
|
||||
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(regions)
|
||||
for report in result:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(current_audit_info.audited_regions)
|
||||
for report in result:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
@@ -120,42 +145,42 @@ class Test_cloudtrail_multi_region_enabled:
|
||||
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
|
||||
_ = cloudtrail_client_us_east_1.get_trail_status(Name=trail_name_us)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
|
||||
cloudtrail_multi_region_enabled,
|
||||
)
|
||||
|
||||
regions = []
|
||||
for region in service_client.regional_clients.keys():
|
||||
regions.append(region)
|
||||
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(regions)
|
||||
for report in result:
|
||||
if report.resource_id == trail_name_us:
|
||||
assert report.status == "PASS"
|
||||
assert search(
|
||||
"is not multiregion and it is logging", report.status_extended
|
||||
)
|
||||
assert report.resource_id == trail_name_us
|
||||
assert report.resource_arn == trail_us["TrailARN"]
|
||||
else:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
check = cloudtrail_multi_region_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == len(current_audit_info.audited_regions)
|
||||
for report in result:
|
||||
if report.resource_id == trail_name_us:
|
||||
assert report.status == "PASS"
|
||||
assert search(
|
||||
"is not multiregion and it is logging",
|
||||
report.status_extended,
|
||||
)
|
||||
assert report.resource_id == trail_name_us
|
||||
assert report.resource_arn == trail_us["TrailARN"]
|
||||
else:
|
||||
assert report.status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails enabled and logging were found",
|
||||
report.status_extended,
|
||||
)
|
||||
assert report.resource_id == "No trails"
|
||||
assert report.resource_arn == "No trails"
|
||||
|
||||
@@ -1,11 +1,36 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from moto import mock_cloudtrail, mock_s3
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_ACCOUNT_NUMBER = 123456789012
|
||||
|
||||
|
||||
class Test_cloudtrail_s3_dataevents_read_enabled:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_trail_without_data_events(self):
|
||||
@@ -18,33 +43,37 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
|
||||
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
@@ -69,37 +98,42 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
|
||||
}
|
||||
],
|
||||
)["EventSelectors"]
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_trail_with_s3_data_events(self):
|
||||
def test_trail_with_s3_classic_data_events(self):
|
||||
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
|
||||
s3_client_us_east_1 = client("s3", region_name="us-east-1")
|
||||
trail_name_us = "trail_test_us"
|
||||
@@ -120,30 +154,91 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
|
||||
}
|
||||
],
|
||||
)["EventSelectors"]
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has a classic data event selector to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_trail_with_s3_advanced_data_events(self):
|
||||
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
|
||||
s3_client_us_east_1 = client("s3", region_name="us-east-1")
|
||||
trail_name_us = "trail_test_us"
|
||||
bucket_name_us = "bucket_test_us"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
|
||||
trail_us = cloudtrail_client_us_east_1.create_trail(
|
||||
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
|
||||
)
|
||||
_ = cloudtrail_client_us_east_1.put_event_selectors(
|
||||
TrailName=trail_name_us,
|
||||
AdvancedEventSelectors=[
|
||||
{
|
||||
"Name": "test",
|
||||
"FieldSelectors": [
|
||||
{"Field": "eventCategory", "Equals": ["Data"]},
|
||||
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
)["AdvancedEventSelectors"]
|
||||
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has an advanced data event selector to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
|
||||
@@ -1,11 +1,36 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from moto import mock_cloudtrail, mock_s3
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_ACCOUNT_NUMBER = 123456789012
|
||||
|
||||
|
||||
class Test_cloudtrail_s3_dataevents_write_enabled:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_trail_without_data_events(self):
|
||||
@@ -18,33 +43,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
|
||||
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
@@ -69,33 +98,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
|
||||
}
|
||||
],
|
||||
)["EventSelectors"]
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == "No trails"
|
||||
assert result[0].resource_arn == "No trails"
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
@@ -120,30 +153,90 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
|
||||
}
|
||||
],
|
||||
)["EventSelectors"]
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
|
||||
cloudtrail_s3_dataevents_write_enabled,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"have a data event to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
check = cloudtrail_s3_dataevents_write_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has a classic data event selector to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_trail_with_s3_advanced_data_events(self):
|
||||
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
|
||||
s3_client_us_east_1 = client("s3", region_name="us-east-1")
|
||||
trail_name_us = "trail_test_us"
|
||||
bucket_name_us = "bucket_test_us"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
|
||||
trail_us = cloudtrail_client_us_east_1.create_trail(
|
||||
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
|
||||
)
|
||||
_ = cloudtrail_client_us_east_1.put_event_selectors(
|
||||
TrailName=trail_name_us,
|
||||
AdvancedEventSelectors=[
|
||||
{
|
||||
"Name": "test",
|
||||
"FieldSelectors": [
|
||||
{"Field": "eventCategory", "Equals": ["Data"]},
|
||||
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
)["AdvancedEventSelectors"]
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
|
||||
Cloudtrail,
|
||||
)
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
|
||||
new=Cloudtrail(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
|
||||
cloudtrail_s3_dataevents_read_enabled,
|
||||
)
|
||||
|
||||
check = cloudtrail_s3_dataevents_read_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has an advanced data event selector to record all S3 object-level API operations.",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == trail_name_us
|
||||
assert result[0].resource_arn == trail_us["TrailARN"]
|
||||
|
||||
@@ -129,7 +129,7 @@ class Test_Cloudtrail_Service:
|
||||
)
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudtrail = Cloudtrail(audit_info)
|
||||
assert len(cloudtrail.trails) == 2
|
||||
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
|
||||
for trail in cloudtrail.trails:
|
||||
if trail.name:
|
||||
if trail.name == trail_name_us:
|
||||
@@ -143,7 +143,7 @@ class Test_Cloudtrail_Service:
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_get_event_selectors(self):
|
||||
def test_get_classic_event_selectors(self):
|
||||
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
|
||||
s3_client_us_east_1 = client("s3", region_name="us-east-1")
|
||||
trail_name_us = "trail_test_us"
|
||||
@@ -170,7 +170,7 @@ class Test_Cloudtrail_Service:
|
||||
)["EventSelectors"]
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudtrail = Cloudtrail(audit_info)
|
||||
assert len(cloudtrail.trails) == 2
|
||||
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
|
||||
for trail in cloudtrail.trails:
|
||||
if trail.name:
|
||||
if trail.name == trail_name_us:
|
||||
@@ -181,4 +181,52 @@ class Test_Cloudtrail_Service:
|
||||
assert trail.log_file_validation_enabled
|
||||
assert not trail.latest_cloudwatch_delivery_time
|
||||
assert trail.s3_bucket == bucket_name_us
|
||||
assert trail.data_events == data_events_response
|
||||
assert (
|
||||
trail.data_events[0].event_selector == data_events_response[0]
|
||||
)
|
||||
assert not trail.data_events[0].is_advanced
|
||||
|
||||
@mock_cloudtrail
|
||||
@mock_s3
|
||||
def test_get_advanced_event_selectors(self):
|
||||
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
|
||||
s3_client_us_east_1 = client("s3", region_name="us-east-1")
|
||||
trail_name_us = "trail_test_us"
|
||||
bucket_name_us = "bucket_test_us"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
|
||||
cloudtrail_client_us_east_1.create_trail(
|
||||
Name=trail_name_us,
|
||||
S3BucketName=bucket_name_us,
|
||||
IsMultiRegionTrail=False,
|
||||
EnableLogFileValidation=True,
|
||||
)
|
||||
cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
|
||||
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
|
||||
TrailName=trail_name_us,
|
||||
AdvancedEventSelectors=[
|
||||
{
|
||||
"Name": "test",
|
||||
"FieldSelectors": [
|
||||
{"Field": "eventCategory", "Equals": ["Data"]},
|
||||
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
)["AdvancedEventSelectors"]
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudtrail = Cloudtrail(audit_info)
|
||||
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
|
||||
for trail in cloudtrail.trails:
|
||||
if trail.name:
|
||||
if trail.name == trail_name_us:
|
||||
assert not trail.is_multiregion
|
||||
assert trail.home_region == "us-east-1"
|
||||
assert trail.region == "us-east-1"
|
||||
assert trail.is_logging
|
||||
assert trail.log_file_validation_enabled
|
||||
assert not trail.latest_cloudwatch_delivery_time
|
||||
assert trail.s3_bucket == bucket_name_us
|
||||
assert (
|
||||
trail.data_events[0].event_selector == data_events_response[0]
|
||||
)
|
||||
assert trail.data_events[0].is_advanced
|
||||
|
||||
Reference in New Issue
Block a user