fix(cloudwatch): ignore new lines in filters (#2912)

This commit is contained in:
Sergio Garcia
2023-10-09 11:06:29 +02:00
committed by GitHub
parent dec0ee1001
commit 9212478148
30 changed files with 1356 additions and 15 deletions

View File

@@ -30,7 +30,7 @@ class cloudwatch_changes_to_network_acls_alarm_configured(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_changes_to_network_gateways_alarm_configured(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_changes_to_network_route_tables_alarm_configured(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_changes_to_vpcs_alarm_configured(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -32,7 +32,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_change
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -32,7 +32,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_change
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_authentication_failures(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_aws_organizations_changes(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk(Chec
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_for_s3_bucket_policy_changes(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_policy_changes(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_root_usage(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_security_group_changes(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_sign_in_without_mfa(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -30,7 +30,7 @@ class cloudwatch_log_metric_filter_unauthorized_api_calls(Check):
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
if re.search(pattern, metric_filter.pattern, flags=re.DOTALL):
report.resource_id = metric_filter.log_group
report.resource_arn = metric_filter.arn
report.region = metric_filter.region

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_changes_to_network_acls_alarm_configured:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventName = "CreateNetworkAcl") ||\n ($.eventName = "CreateNetworkAclEntry") ||\n ($.eventName = "DeleteNetworkAcl") ||\n ($.eventName = "DeleteNetworkAclEntry") ||\n ($.eventName = "ReplaceNetworkAclEntry") ||\n ($.eventName = "ReplaceNetworkAclAssociation") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured import (
cloudwatch_changes_to_network_acls_alarm_configured,
)
check = cloudwatch_changes_to_network_acls_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_changes_to_network_gateways_alarm_configured:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateCustomerGateway") ||\n ($.eventName = "DeleteCustomerGateway") ||\n ($.eventName = "AttachInternetGateway") ||\n ($.eventName = "CreateInternetGateway") ||\n ($.eventName = "DeleteInternetGateway") ||\n ($.eventName = "DetachInternetGateway") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured import (
cloudwatch_changes_to_network_gateways_alarm_configured,
)
check = cloudwatch_changes_to_network_gateways_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_changes_to_network_route_tables_alarm_configured:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateRoute") ||\n ($.eventName = "CreateRouteTable") ||\n ($.eventName = "ReplaceRoute") ||\n ($.eventName = "ReplaceRouteTableAssociation")||\n ($.eventName = "DeleteRouteTable") ||\n ($.eventName = "DeleteRoute") ||\n ($.eventName = "DisassociateRouteTable") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured import (
cloudwatch_changes_to_network_route_tables_alarm_configured,
)
check = cloudwatch_changes_to_network_route_tables_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_changes_to_vpcs_alarm_configured:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventName = "CreateVpc") ||\n ($.eventName = "DeleteVpc") ||\n ($.eventName = "ModifyVpcAttribute") ||\n ($.eventName = "AcceptVpcPeeringConnection") ||\n ($.eventName = "CreateVpcPeeringConnection") ||\n ($.eventName = "DeleteVpcPeeringConnection") ||\n ($.eventName = "RejectVpcPeeringConnection") ||\n ($.eventName = "AttachClassicLinkVpc") ||\n ($.eventName = "DetachClassicLinkVpc") ||\n ($.eventName = "DisableVpcClassicLink") ||\n ($.eventName = "EnableVpcClassicLink") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured import (
cloudwatch_changes_to_vpcs_alarm_configured,
)
check = cloudwatch_changes_to_vpcs_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -486,3 +486,94 @@ class Test_cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_c
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "config.amazonaws.com") && (($.eventName="StopConfigurationRecorder")||\n($.eventName="DeleteDeliveryChannel")||\n ($.eventName="PutDeliveryChannel")||\n($.eventName="PutConfigurationRecorder"))}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled import (
cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled,
)
check = (
cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -486,3 +486,94 @@ class Test_cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_c
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateTrail") ||\n ($.eventName = "UpdateTrail") ||\n ($.eventName = "DeleteTrail") ||\n ($.eventName = "StartLogging") ||\n ($.eventName = "StopLogging")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled import (
cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled,
)
check = (
cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_authentication_failures:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "ConsoleLogin") &&\n ($.errorMessage = "Failed authentication")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures import (
cloudwatch_log_metric_filter_authentication_failures,
)
check = cloudwatch_log_metric_filter_authentication_failures()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_aws_organizations_changes:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventSource = "organizations.amazonaws.com") &&\n ($.eventName = "AcceptHandshake") ||\n ($.eventName = "AttachPolicy") ||\n ($.eventName = "CancelHandshake") ||\n ($.eventName = "CreateAccount") ||\n ($.eventName = "CreateOrganization") ||\n ($.eventName = "CreateOrganizationalUnit") ||\n ($.eventName = "CreatePolicy") ||\n ($.eventName = "DeclineHandshake") ||\n ($.eventName = "DeleteOrganization") ||\n ($.eventName = "DeleteOrganizationalUnit") ||\n ($.eventName = "DeletePolicy") ||\n ($.eventName = "EnableAllFeatures") ||\n ($.eventName = "EnablePolicyType") ||\n ($.eventName = "InviteAccountToOrganization") ||\n ($.eventName = "LeaveOrganization") ||\n ($.eventName = "DetachPolicy") ||\n ($.eventName = "DisablePolicyType") ||\n ($.eventName = "MoveAccount") ||\n ($.eventName = "RemoveAccountFromOrganization") ||\n ($.eventName = "UpdateOrganizationalUnit") || ($.eventName = "UpdatePolicy") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -486,3 +486,94 @@ class Test_cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "kms.amazonaws.com") &&\n(($.eventName="DisableKey")||\n($.eventName="ScheduleKeyDeletion")) }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk import (
cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk,
)
check = (
cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_for_s3_bucket_policy_changes:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "s3.amazonaws.com") &&\n (($.eventName = "PutBucketAcl") ||\n ($.eventName = "PutBucketPolicy") ||\n ($.eventName = "PutBucketCors") ||\n ($.eventName = "PutBucketLifecycle") ||\n ($.eventName = "PutBucketReplication") ||\n ($.eventName = "DeleteBucketPolicy") ||\n ($.eventName = "DeleteBucketCors") ||\n ($.eventName = "DeleteBucketLifecycle") ||\n ($.eventName = "DeleteBucketReplication")) }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes import (
cloudwatch_log_metric_filter_for_s3_bucket_policy_changes,
)
check = cloudwatch_log_metric_filter_for_s3_bucket_policy_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName="DeleteGroupPolicy")||\n($.eventName="DeleteRolePolicy")||\n($.eventName="DeleteUserPolicy")||\n($.eventName="PutGroupPolicy")||\n($.eventName="PutRolePolicy")||\n($.eventName="PutUserPolicy")||\n($.eventName="CreatePolicy")||\n($.eventName="DeletePolicy")||\n($.eventName="CreatePolicyVersion")||\n($.eventName="DeletePolicyVersion")||\n($.eventName="AttachRolePolicy")||\n($.eventName="DetachRolePolicy")||\n($.eventName="AttachUserPolicy")||\n($.eventName="DetachUserPolicy")||\n($.eventName="AttachGroupPolicy")||\n($.eventName="DetachGroupPolicy")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes import (
cloudwatch_log_metric_filter_policy_changes,
)
check = cloudwatch_log_metric_filter_policy_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_root_usage:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ $.userIdentity.type = "Root" &&\n $.userIdentity.invokedBy NOT EXISTS &&\n $.eventType != "AwsServiceEvent" }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage import (
cloudwatch_log_metric_filter_root_usage,
)
check = cloudwatch_log_metric_filter_root_usage()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "AuthorizeSecurityGroupIngress") ||\n ($.eventName = "AuthorizeSecurityGroupEgress") ||\n ($.eventName = "RevokeSecurityGroupIngress") ||\n ($.eventName = "RevokeSecurityGroupEgress") ||\n ($.eventName = "CreateSecurityGroup") ||\n ($.eventName = "DeleteSecurityGroup") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes import (
cloudwatch_log_metric_filter_security_group_changes,
)
check = cloudwatch_log_metric_filter_security_group_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_sign_in_without_mfa:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern="{ ($.eventName = ConsoleLogin) &&\n ($.additionalEventData.MFAUsed != Yes) }",
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa import (
cloudwatch_log_metric_filter_sign_in_without_mfa,
)
check = cloudwatch_log_metric_filter_sign_in_without_mfa()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -474,3 +474,92 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_newlines(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.errorCode = "*UnauthorizedOperation") ||\n ($.errorCode = "AccessDenied*") ||\n ($.sourceIPAddress!="delivery.logs.amazonaws.com") ||\n ($.eventName!="HeadBucket") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.common.models import Audit_Metadata
current_audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
# We need to set this check to call __describe_log_groups__
expected_checks=["cloudwatch_log_group_no_secrets_in_logs"],
completed_checks=0,
audit_progress=0,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls import (
cloudwatch_log_metric_filter_unauthorized_api_calls,
)
check = cloudwatch_log_metric_filter_unauthorized_api_calls()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"